Modify OpenFlow provider to avoid stats collection when switch is under high load
* Track load via sliding window
* Pause stats collection when load is significantly above average
* Resume stats collection when load returns to normal
* Pause stats collection when reply is in-flight
Change-Id: I3159b4f806a6405ca6be494534497348716cc921
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index 1aabf24..c8bf707 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -16,6 +16,8 @@
package org.onosproject.provider.of.flow.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.util.SlidingWindowCounter;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
@@ -25,7 +27,10 @@
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -35,11 +40,31 @@
private final Logger log = getLogger(getClass());
- public static final long SECONDS = 1000L;
+ private static final int SECONDS = 1000;
+
+ // Number of ticks which defines the pause window.
+ private static final int PAUSE_WINDOW = 2;
+ // Number of ticks which defines the high load window
+ private static final int HIGH_WINDOW = 60;
+ // Number of ticks which defines the low load window
+ private static final int LOW_WINDOW = 15;
+ // Multiplier factor of the load
+ private static final int LOAD_FACTOR = 2;
+ // Event/s defining the min load rate
+ private static final int MIN_LOAD_RATE = 50;
+ // Event/s defining the max load rate
+ private static final int MAX_LOAD_RATE = 500;
private final OpenFlowSwitch sw;
private Timer timer;
- private TimerTask task;
+ private TimerTask pauseTask;
+ private TimerTask pollTask;
+
+ private SlidingWindowCounter loadCounter;
+ // Defines whether the collector is in pause or not for high load
+ private final AtomicBoolean paused = new AtomicBoolean();
+ // Defines whether the collector is in waiting or not for a previous stats reply
+ private final AtomicBoolean waiting = new AtomicBoolean();
private int pollInterval;
@@ -63,15 +88,104 @@
*/
synchronized void adjustPollInterval(int pollInterval) {
this.pollInterval = pollInterval;
- task.cancel();
- task = new InternalTimerTask();
- timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000L);
+ if (pollTask != null) {
+ pollTask.cancel();
+ }
+ // If we went through start - let's schedule it
+ if (loadCounter != null) {
+ pollTask = new PollTimerTask();
+ timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+ }
}
- private class InternalTimerTask extends TimerTask {
+ /**
+ * Resets the collector's event count.
+ */
+ @Override
+ public synchronized void resetEvents() {
+ loadCounter.clear();
+ if (paused.compareAndSet(true, false)) {
+ resume();
+ }
+ // Let's reset also waiting, the reply can be discarded/lost
+ // during a change of mastership
+ waiting.set(false);
+ }
+
+ /**
+ * Records a number of flow events that have occurred.
+ *
+ * @param events the number of events that occurred
+ */
+ @Override
+ public void recordEvents(int events) {
+ SlidingWindowCounter loadCounter = this.loadCounter;
+ if (loadCounter != null) {
+ loadCounter.incrementCount(events);
+ }
+ }
+
+ /**
+ * Returns a boolean indicating whether the switch is under high load.
+ * <p>
+ * The switch is considered under high load if the average rate over the last two seconds is
+ * greater than twice the overall rate or 50 flows/sec.
+ *
+ * @return indicates whether the switch is under high load
+ */
+ private boolean isHighLoad() {
+ return loadCounter.getWindowRate(PAUSE_WINDOW)
+ > max(min(loadCounter.getWindowRate(HIGH_WINDOW) * LOAD_FACTOR, MAX_LOAD_RATE), MIN_LOAD_RATE);
+ }
+
+ /**
+ * Returns a boolean indicating whether the switch is under low load.
+ * <p>
+ * The switch is considered under low load if the average rate over the last 15 seconds is
+ * less than the overall rate.
+ *
+ * @return indicates whether the switch is under low load
+ */
+ private boolean isLowLoad() {
+ return loadCounter.getWindowRate(LOW_WINDOW) < loadCounter.getWindowRate(HIGH_WINDOW);
+ }
+
+ private class PauseTimerTask extends TimerTask {
@Override
public void run() {
- if (sw.getRole() == RoleState.MASTER) {
+ if (isHighLoad()) {
+ if (paused.compareAndSet(false, true)) {
+ pause();
+ }
+ } else if (isLowLoad()) {
+ if (paused.compareAndSet(true, false)) {
+ resume();
+ }
+ }
+ }
+ }
+
+ private class PollTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ // Check whether we are still waiting a previous reply
+ if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
+ // Check whether the switch is under high load from this master. This is done here in case a large
+ // batch was pushed immediately prior to this task running.
+ if (isHighLoad()) {
+ log.debug("Skipping stats collection for {} due to high load; rate: {}; overall: {}",
+ sw.getStringId(),
+ loadCounter.getWindowRate(PAUSE_WINDOW),
+ loadCounter.getWindowRate(HIGH_WINDOW));
+ return;
+ } else {
+ log.debug(
+ "Permitting stats collection for {}; rate: {}; overall: {}",
+ sw.getStringId(),
+ loadCounter.getWindowRate(PAUSE_WINDOW),
+ loadCounter.getWindowRate(HIGH_WINDOW));
+ }
+
log.trace("Collecting stats for {}", sw.getStringId());
OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
.setMatch(sw.factory().matchWildcardAll())
@@ -79,6 +193,9 @@
.setOutPort(OFPort.NO_MASK)
.build();
sw.sendMsg(request);
+ // Other flow stats will not be asked
+ // if we don't see first the reply of this request
+ waiting.set(true);
}
}
}
@@ -86,17 +203,51 @@
public synchronized void start() {
// Initially start polling quickly. Then drop down to configured value
log.debug("Starting Stats collection thread for {}", sw.getStringId());
- task = new InternalTimerTask();
- timer.scheduleAtFixedRate(task, 1 * SECONDS,
- pollInterval * SECONDS);
+ loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
+ pauseTask = new PauseTimerTask();
+ timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
+ pollTask = new PollTimerTask();
+ timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
+ }
+
+ private synchronized void pause() {
+ if (pollTask != null) {
+ log.debug("Pausing stats collection for {}; rate: {}; overall: {}",
+ sw.getStringId(),
+ loadCounter.getWindowRate(PAUSE_WINDOW),
+ loadCounter.getWindowRate(HIGH_WINDOW));
+ pollTask.cancel();
+ pollTask = null;
+ }
+ }
+
+ private synchronized void resume() {
+ log.debug("Resuming stats collection for {}; rate: {}; overall: {}",
+ sw.getStringId(),
+ loadCounter.getWindowRate(PAUSE_WINDOW),
+ loadCounter.getWindowRate(HIGH_WINDOW));
+ pollTask = new PollTimerTask();
+ timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
}
public synchronized void stop() {
- if (task != null) {
- log.debug("Stopping Stats collection thread for {}", sw.getStringId());
- task.cancel();
- task = null;
+ if (pauseTask != null) {
+ pauseTask.cancel();
+ pauseTask = null;
}
+ if (pollTask != null) {
+ log.debug("Stopping Stats collection thread for {}", sw.getStringId());
+ pollTask.cancel();
+ pollTask = null;
+ }
+ if (loadCounter != null) {
+ loadCounter.destroy();
+ loadCounter = null;
+ }
+ }
+
+ public void received() {
+ waiting.set(false);
}
}
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 8730378..5f2713b5 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -293,6 +293,34 @@
tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency));
}
+ private void resetEvents(Dpid dpid) {
+ SwitchDataCollector collector;
+ if (adaptiveFlowSampling) {
+ collector = afsCollectors.get(dpid);
+ } else {
+ collector = simpleCollectors.get(dpid);
+ }
+ if (collector != null) {
+ collector.resetEvents();
+ }
+ }
+
+ private void recordEvent(Dpid dpid) {
+ recordEvents(dpid, 1);
+ }
+
+ private void recordEvents(Dpid dpid, int events) {
+ SwitchDataCollector collector;
+ if (adaptiveFlowSampling) {
+ collector = afsCollectors.get(dpid);
+ } else {
+ collector = simpleCollectors.get(dpid);
+ }
+ if (collector != null) {
+ collector.recordEvents(events);
+ }
+ }
+
@Override
public void applyFlowRule(FlowRule... flowRules) {
for (FlowRule flowRule : flowRules) {
@@ -310,6 +338,8 @@
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
Optional.empty(), Optional.of(driverService)).buildFlowAdd());
+
+ recordEvent(dpid);
}
@Override
@@ -329,6 +359,8 @@
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
Optional.empty(), Optional.of(driverService)).buildFlowDel());
+
+ recordEvent(dpid);
}
@Override
@@ -371,6 +403,8 @@
OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
.setXid(batch.id());
sw.sendMsg(builder.build());
+
+ recordEvents(dpid, batch.getOperations().size());
}
private class InternalFlowProvider
@@ -416,6 +450,16 @@
break;
case STATS_REPLY:
if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+ // Let's unblock first the collector
+ SwitchDataCollector collector;
+ if (adaptiveFlowSampling) {
+ collector = afsCollectors.get(dpid);
+ } else {
+ collector = simpleCollectors.get(dpid);
+ }
+ if (collector != null) {
+ collector.received();
+ }
pushFlowMetrics(dpid, (OFFlowStatsReply) msg, getDriver(deviceId));
} else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) {
pushTableStatistics(dpid, (OFTableStatsReply) msg);
@@ -584,7 +628,9 @@
@Override
public void receivedRoleReply(Dpid dpid, RoleState requested,
RoleState response) {
- // Do nothing here for now.
+ if (response == RoleState.MASTER) {
+ resetEvents(dpid);
+ }
}
private DriverHandler getDriver(DeviceId devId) {
@@ -757,4 +803,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
index ae1664b..2c42d2b 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
@@ -30,4 +30,27 @@
* Stops the collector.
*/
void stop();
+
+ /**
+ * Signals reply has been received.
+ */
+ default void received() {
+
+ }
+
+ /**
+ * Records the number of events seen.
+ *
+ * @param events number of events
+ */
+ default void recordEvents(int events) {
+
+ }
+
+ /**
+ * Resets the number of events seen.
+ */
+ default void resetEvents() {
+
+ }
}
\ No newline at end of file