Prevents the stop of the stats collection
- Uses executor in place of a timer which can become unable
to schedule futher tasks if the exceptions are not caught
- Flow stats collector will wait at most 5 flows stats interval
for a reply before to request new stats
Change-Id: Ic50efd7b8afa1f182ba32c04aa971d3ef400529e
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index c8bf707..638c195 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -25,9 +25,12 @@
import org.projectfloodlight.openflow.types.TableId;
import org.slf4j.Logger;
-import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -40,7 +43,7 @@
private final Logger log = getLogger(getClass());
- private static final int SECONDS = 1000;
+ private static final int MS = 1000;
// Number of ticks which defines the pause window.
private static final int PAUSE_WINDOW = 2;
@@ -56,27 +59,30 @@
private static final int MAX_LOAD_RATE = 500;
private final OpenFlowSwitch sw;
- private Timer timer;
+ private ScheduledExecutorService executorService;
private TimerTask pauseTask;
+ private ScheduledFuture<?> scheduledPauseTask;
private TimerTask pollTask;
+ private ScheduledFuture<?> scheduledPollTask;
private SlidingWindowCounter loadCounter;
// Defines whether the collector is in pause or not for high load
private final AtomicBoolean paused = new AtomicBoolean();
// Defines whether the collector is in waiting or not for a previous stats reply
- private final AtomicBoolean waiting = new AtomicBoolean();
+ private static final int WAITING_ATTEMPTS = 5;
+ private final AtomicInteger waiting = new AtomicInteger(0);
private int pollInterval;
/**
* Creates a new collector for the given switch and poll frequency.
*
- * @param timer timer to use for scheduling
- * @param sw switch to pull
+ * @param executorService executor used for scheduling
+ * @param sw switch to pull
* @param pollInterval poll frequency in seconds
*/
- FlowStatsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) {
- this.timer = timer;
+ FlowStatsCollector(ScheduledExecutorService executorService, OpenFlowSwitch sw, int pollInterval) {
+ this.executorService = executorService;
this.sw = checkNotNull(sw, "Null switch");
this.pollInterval = pollInterval;
}
@@ -91,11 +97,16 @@
if (pollTask != null) {
pollTask.cancel();
}
+ if (scheduledPollTask != null) {
+ scheduledPollTask.cancel(false);
+ }
// If we went through start - let's schedule it
if (loadCounter != null) {
pollTask = new PollTimerTask();
- timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+ scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, pollInterval * MS,
+ pollInterval * MS, TimeUnit.MILLISECONDS);
}
+ waiting.set(0);
}
/**
@@ -109,7 +120,7 @@
}
// Let's reset also waiting, the reply can be discarded/lost
// during a change of mastership
- waiting.set(false);
+ waiting.set(0);
}
/**
@@ -169,7 +180,12 @@
@Override
public void run() {
// Check whether we are still waiting a previous reply
- if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
+ if (waiting.getAndDecrement() > 0) {
+ log.debug("Skipping stats collection for {} waiting for previous reply", sw.getStringId());
+ return;
+ }
+ // Check whether we are the master of the switch
+ if (sw.getRole() == RoleState.MASTER) {
// Check whether the switch is under high load from this master. This is done here in case a large
// batch was pushed immediately prior to this task running.
if (isHighLoad()) {
@@ -195,7 +211,7 @@
sw.sendMsg(request);
// Other flow stats will not be asked
// if we don't see first the reply of this request
- waiting.set(true);
+ waiting.set(WAITING_ATTEMPTS);
}
}
}
@@ -205,9 +221,11 @@
log.debug("Starting Stats collection thread for {}", sw.getStringId());
loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
pauseTask = new PauseTimerTask();
- timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
+ scheduledPauseTask = executorService.scheduleAtFixedRate(pauseTask, 1 * MS,
+ 1 * MS, TimeUnit.MILLISECONDS);
pollTask = new PollTimerTask();
- timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
+ scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, 1 * MS,
+ pollInterval * MS, TimeUnit.MILLISECONDS);
}
private synchronized void pause() {
@@ -219,6 +237,10 @@
pollTask.cancel();
pollTask = null;
}
+ if (scheduledPollTask != null) {
+ scheduledPollTask.cancel(false);
+ scheduledPollTask = null;
+ }
}
private synchronized void resume() {
@@ -227,7 +249,8 @@
loadCounter.getWindowRate(PAUSE_WINDOW),
loadCounter.getWindowRate(HIGH_WINDOW));
pollTask = new PollTimerTask();
- timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+ scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, pollInterval * MS,
+ pollInterval * MS, TimeUnit.MILLISECONDS);
}
public synchronized void stop() {
@@ -235,11 +258,19 @@
pauseTask.cancel();
pauseTask = null;
}
+ if (scheduledPauseTask != null) {
+ scheduledPauseTask.cancel(false);
+ scheduledPauseTask = null;
+ }
if (pollTask != null) {
log.debug("Stopping Stats collection thread for {}", sw.getStringId());
pollTask.cancel();
pollTask = null;
}
+ if (scheduledPollTask != null) {
+ scheduledPollTask.cancel(false);
+ scheduledPollTask = null;
+ }
if (loadCounter != null) {
loadCounter.destroy();
loadCounter = null;
@@ -247,7 +278,7 @@
}
public void received() {
- waiting.set(false);
+ waiting.set(0);
}
}
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 19545b2..caeacd7 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -96,13 +96,16 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.Timer;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -146,8 +149,8 @@
private Cache<Long, InternalCacheEntry> pendingBatches;
- private final Timer timer = new Timer("onos-openflow-collector");
-
+ private ScheduledExecutorService executorService = newScheduledThreadPool(1,
+ groupedThreads("onos/of", "collector-%d", log));
// Old simple collector set
private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newConcurrentMap();
@@ -169,6 +172,8 @@
providerService = providerRegistry.register(this);
controller.addListener(listener);
controller.addEventListener(listener);
+ // Evicts the tasks if cancelled
+ ((ScheduledThreadPoolExecutor) executorService).setRemoveOnCancelPolicy(true);
modified(context);
@@ -186,6 +191,7 @@
stopCollectors();
providerRegistry.unregister(this);
providerService = null;
+ executorService.shutdown();
log.info("Stopped");
}
@@ -251,13 +257,13 @@
stopCollectorIfNeeded(afsCollectors.put(new Dpid(sw.getId()), fsc));
fsc.start();
} else {
- FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
+ FlowStatsCollector fsc = new FlowStatsCollector(executorService, sw, flowPollFrequency);
stopCollectorIfNeeded(simpleCollectors.put(new Dpid(sw.getId()), fsc));
fsc.start();
}
}
if (sw.features().getCapabilities().contains(OFCapabilities.TABLE_STATS)) {
- TableStatisticsCollector tsc = new TableStatisticsCollector(timer, sw, flowPollFrequency);
+ TableStatisticsCollector tsc = new TableStatisticsCollector(executorService, sw, flowPollFrequency);
stopCollectorIfNeeded(tableStatsCollectors.put(new Dpid(sw.getId()), tsc));
tsc.start();
}
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/TableStatisticsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/TableStatisticsCollector.java
index a3644d2..4e71818 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/TableStatisticsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/TableStatisticsCollector.java
@@ -15,14 +15,15 @@
*/
package org.onosproject.provider.of.flow.impl;
-import org.onlab.util.SharedExecutors;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFTableStatsRequest;
import org.slf4j.Logger;
-import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import static org.slf4j.LoggerFactory.getLogger;
@@ -33,23 +34,24 @@
private final Logger log = getLogger(getClass());
- public static final long SECONDS = 1000L;
+ public static final int MS = 1000;
private final OpenFlowSwitch sw;
- private Timer timer;
+ private ScheduledExecutorService executorService;
private TimerTask task;
+ private ScheduledFuture<?> scheduledTask;
private int pollInterval;
/**
* Creates a new table statistics collector for the given switch and poll frequency.
*
- * @param timer timer to use for scheduling
- * @param sw switch to pull
+ * @param executorService executor used for scheduling
+ * @param sw switch to pull
* @param pollInterval poll frequency in seconds
*/
- TableStatisticsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) {
- this.timer = timer;
+ TableStatisticsCollector(ScheduledExecutorService executorService, OpenFlowSwitch sw, int pollInterval) {
+ this.executorService = executorService;
this.sw = sw;
this.pollInterval = pollInterval;
}
@@ -61,9 +63,15 @@
*/
synchronized void adjustPollInterval(int pollInterval) {
this.pollInterval = pollInterval;
- task.cancel();
+ if (task != null) {
+ task.cancel();
+ }
task = new InternalTimerTask();
- timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000L);
+ if (scheduledTask != null) {
+ scheduledTask.cancel(false);
+ }
+ scheduledTask = executorService.scheduleAtFixedRate(task, pollInterval * MS,
+ pollInterval * MS, TimeUnit.MILLISECONDS);
}
private class InternalTimerTask extends TimerTask {
@@ -82,14 +90,20 @@
// Initially start polling quickly. Then drop down to configured value
log.debug("Starting Table Stats collection thread for {}", sw.getStringId());
task = new InternalTimerTask();
- SharedExecutors.getTimer().scheduleAtFixedRate(task, 1 * SECONDS,
- pollInterval * SECONDS);
+ scheduledTask = executorService.scheduleAtFixedRate(task, 1 * MS,
+ pollInterval * MS, TimeUnit.MILLISECONDS);
}
public synchronized void stop() {
log.debug("Stopping Table Stats collection thread for {}", sw.getStringId());
- task.cancel();
+ if (task != null) {
+ task.cancel();
+ }
task = null;
+ if (scheduledTask != null) {
+ scheduledTask.cancel(false);
+ }
+ scheduledTask = null;
}
}