Modify OpenFlow provider to avoid stats collection when switch is under high load
* Track load via sliding window
* Pause stats collection when load is significantly above average
* Resume stats collection when load returns to normal
* Pause stats collection when reply is in-flight
Change-Id: I3159b4f806a6405ca6be494534497348716cc921
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index 4f518b0..c8bf707 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -16,6 +16,8 @@
package org.onosproject.provider.of.flow.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.util.SlidingWindowCounter;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
@@ -25,7 +27,10 @@
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -35,11 +40,31 @@
private final Logger log = getLogger(getClass());
- public static final int SECONDS = 1000;
+ private static final int SECONDS = 1000;
+
+ // Number of ticks which defines the pause window.
+ private static final int PAUSE_WINDOW = 2;
+ // Number of ticks which defines the high load window
+ private static final int HIGH_WINDOW = 60;
+ // Number of ticks which defines the low load window
+ private static final int LOW_WINDOW = 15;
+ // Multiplier factor of the load
+ private static final int LOAD_FACTOR = 2;
+ // Event/s defining the min load rate
+ private static final int MIN_LOAD_RATE = 50;
+ // Event/s defining the max load rate
+ private static final int MAX_LOAD_RATE = 500;
private final OpenFlowSwitch sw;
private Timer timer;
- private TimerTask task;
+ private TimerTask pauseTask;
+ private TimerTask pollTask;
+
+ private SlidingWindowCounter loadCounter;
+ // Defines whether the collector is in pause or not for high load
+ private final AtomicBoolean paused = new AtomicBoolean();
+ // Defines whether the collector is in waiting or not for a previous stats reply
+ private final AtomicBoolean waiting = new AtomicBoolean();
private int pollInterval;
@@ -63,15 +88,104 @@
*/
synchronized void adjustPollInterval(int pollInterval) {
this.pollInterval = pollInterval;
- task.cancel();
- task = new InternalTimerTask();
- timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000);
+ if (pollTask != null) {
+ pollTask.cancel();
+ }
+ // If we went through start - let's schedule it
+ if (loadCounter != null) {
+ pollTask = new PollTimerTask();
+ timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+ }
}
- private class InternalTimerTask extends TimerTask {
+ /**
+ * Resets the collector's event count.
+ */
+ @Override
+ public synchronized void resetEvents() {
+ loadCounter.clear();
+ if (paused.compareAndSet(true, false)) {
+ resume();
+ }
+ // Let's reset also waiting, the reply can be discarded/lost
+ // during a change of mastership
+ waiting.set(false);
+ }
+
+ /**
+ * Records a number of flow events that have occurred.
+ *
+ * @param events the number of events that occurred
+ */
+ @Override
+ public void recordEvents(int events) {
+ SlidingWindowCounter loadCounter = this.loadCounter;
+ if (loadCounter != null) {
+ loadCounter.incrementCount(events);
+ }
+ }
+
+ /**
+ * Returns a boolean indicating whether the switch is under high load.
+ * <p>
+ * The switch is considered under high load if the average rate over the last two seconds is
+ * greater than twice the overall rate or 50 flows/sec.
+ *
+ * @return indicates whether the switch is under high load
+ */
+ private boolean isHighLoad() {
+ return loadCounter.getWindowRate(PAUSE_WINDOW)
+ > max(min(loadCounter.getWindowRate(HIGH_WINDOW) * LOAD_FACTOR, MAX_LOAD_RATE), MIN_LOAD_RATE);
+ }
+
+ /**
+ * Returns a boolean indicating whether the switch is under low load.
+ * <p>
+ * The switch is considered under low load if the average rate over the last 15 seconds is
+ * less than the overall rate.
+ *
+ * @return indicates whether the switch is under low load
+ */
+ private boolean isLowLoad() {
+ return loadCounter.getWindowRate(LOW_WINDOW) < loadCounter.getWindowRate(HIGH_WINDOW);
+ }
+
+ private class PauseTimerTask extends TimerTask {
@Override
public void run() {
- if (sw.getRole() == RoleState.MASTER) {
+ if (isHighLoad()) {
+ if (paused.compareAndSet(false, true)) {
+ pause();
+ }
+ } else if (isLowLoad()) {
+ if (paused.compareAndSet(true, false)) {
+ resume();
+ }
+ }
+ }
+ }
+
+ private class PollTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ // Check whether we are still waiting a previous reply
+ if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
+ // Check whether the switch is under high load from this master. This is done here in case a large
+ // batch was pushed immediately prior to this task running.
+ if (isHighLoad()) {
+ log.debug("Skipping stats collection for {} due to high load; rate: {}; overall: {}",
+ sw.getStringId(),
+ loadCounter.getWindowRate(PAUSE_WINDOW),
+ loadCounter.getWindowRate(HIGH_WINDOW));
+ return;
+ } else {
+ log.debug(
+ "Permitting stats collection for {}; rate: {}; overall: {}",
+ sw.getStringId(),
+ loadCounter.getWindowRate(PAUSE_WINDOW),
+ loadCounter.getWindowRate(HIGH_WINDOW));
+ }
+
log.trace("Collecting stats for {}", sw.getStringId());
OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
.setMatch(sw.factory().matchWildcardAll())
@@ -79,6 +193,9 @@
.setOutPort(OFPort.NO_MASK)
.build();
sw.sendMsg(request);
+ // Other flow stats will not be asked
+ // if we don't see first the reply of this request
+ waiting.set(true);
}
}
}
@@ -86,17 +203,51 @@
public synchronized void start() {
// Initially start polling quickly. Then drop down to configured value
log.debug("Starting Stats collection thread for {}", sw.getStringId());
- task = new InternalTimerTask();
- timer.scheduleAtFixedRate(task, 1 * SECONDS,
- pollInterval * SECONDS);
+ loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
+ pauseTask = new PauseTimerTask();
+ timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
+ pollTask = new PollTimerTask();
+ timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
+ }
+
+ private synchronized void pause() {
+ if (pollTask != null) {
+ log.debug("Pausing stats collection for {}; rate: {}; overall: {}",
+ sw.getStringId(),
+ loadCounter.getWindowRate(PAUSE_WINDOW),
+ loadCounter.getWindowRate(HIGH_WINDOW));
+ pollTask.cancel();
+ pollTask = null;
+ }
+ }
+
+ private synchronized void resume() {
+ log.debug("Resuming stats collection for {}; rate: {}; overall: {}",
+ sw.getStringId(),
+ loadCounter.getWindowRate(PAUSE_WINDOW),
+ loadCounter.getWindowRate(HIGH_WINDOW));
+ pollTask = new PollTimerTask();
+ timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
}
public synchronized void stop() {
- if (task != null) {
- log.debug("Stopping Stats collection thread for {}", sw.getStringId());
- task.cancel();
- task = null;
+ if (pauseTask != null) {
+ pauseTask.cancel();
+ pauseTask = null;
}
+ if (pollTask != null) {
+ log.debug("Stopping Stats collection thread for {}", sw.getStringId());
+ pollTask.cancel();
+ pollTask = null;
+ }
+ if (loadCounter != null) {
+ loadCounter.destroy();
+ loadCounter = null;
+ }
+ }
+
+ public void received() {
+ waiting.set(false);
}
}
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 47c49bc..6087ef3 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -288,6 +288,34 @@
tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency));
}
+ private void resetEvents(Dpid dpid) {
+ SwitchDataCollector collector;
+ if (adaptiveFlowSampling) {
+ collector = afsCollectors.get(dpid);
+ } else {
+ collector = simpleCollectors.get(dpid);
+ }
+ if (collector != null) {
+ collector.resetEvents();
+ }
+ }
+
+ private void recordEvent(Dpid dpid) {
+ recordEvents(dpid, 1);
+ }
+
+ private void recordEvents(Dpid dpid, int events) {
+ SwitchDataCollector collector;
+ if (adaptiveFlowSampling) {
+ collector = afsCollectors.get(dpid);
+ } else {
+ collector = simpleCollectors.get(dpid);
+ }
+ if (collector != null) {
+ collector.recordEvents(events);
+ }
+ }
+
@Override
public void applyFlowRule(FlowRule... flowRules) {
for (FlowRule flowRule : flowRules) {
@@ -311,6 +339,8 @@
}
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
Optional.empty(), Optional.of(driverService)).buildFlowAdd());
+
+ recordEvent(dpid);
}
@Override
@@ -336,6 +366,8 @@
}
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
Optional.empty(), Optional.of(driverService)).buildFlowDel());
+
+ recordEvent(dpid);
}
@Override
@@ -393,6 +425,8 @@
OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
.setXid(batch.id());
sw.sendMsg(builder.build());
+
+ recordEvents(dpid, batch.getOperations().size());
}
private boolean hasPayload(FlowRuleExtPayLoad flowRuleExtPayLoad) {
@@ -444,6 +478,16 @@
break;
case STATS_REPLY:
if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+ // Let's unblock first the collector
+ SwitchDataCollector collector;
+ if (adaptiveFlowSampling) {
+ collector = afsCollectors.get(dpid);
+ } else {
+ collector = simpleCollectors.get(dpid);
+ }
+ if (collector != null) {
+ collector.received();
+ }
pushFlowMetrics(dpid, (OFFlowStatsReply) msg, getDriver(deviceId));
} else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) {
pushTableStatistics(dpid, (OFTableStatsReply) msg);
@@ -612,7 +656,9 @@
@Override
public void receivedRoleReply(Dpid dpid, RoleState requested,
RoleState response) {
- // Do nothing here for now.
+ if (response == RoleState.MASTER) {
+ resetEvents(dpid);
+ }
}
private DriverHandler getDriver(DeviceId devId) {
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
index ae1664b..2c42d2b 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
@@ -30,4 +30,27 @@
* Stops the collector.
*/
void stop();
+
+ /**
+ * Signals reply has been received.
+ */
+ default void received() {
+
+ }
+
+ /**
+ * Records the number of events seen.
+ *
+ * @param events number of events
+ */
+ default void recordEvents(int events) {
+
+ }
+
+ /**
+ * Resets the number of events seen.
+ */
+ default void resetEvents() {
+
+ }
}
\ No newline at end of file
diff --git a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
index 33dfec1..cc4f4fe 100644
--- a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
+++ b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
@@ -16,14 +16,13 @@
package org.onlab.util;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.Math.min;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
@@ -43,6 +42,9 @@
private final ScheduledExecutorService background;
+ private final AtomicLong totalCount = new AtomicLong();
+ private final AtomicLong totalSlots = new AtomicLong(1);
+
private static final int SLIDE_WINDOW_PERIOD_SECONDS = 1;
/**
@@ -58,13 +60,11 @@
this.headSlot = 0;
// Initialize each item in the list to an AtomicLong of 0
- this.counters = Collections.nCopies(windowSlots, 0)
- .stream()
- .map(AtomicLong::new)
- .collect(Collectors.toCollection(ArrayList::new));
+ this.counters = new ArrayList<>();
+ this.counters.add(new AtomicLong());
background = newSingleThreadScheduledExecutor(groupedThreads("SlidingWindowCounter", "bg-%d"));
- background.scheduleWithFixedDelay(this::advanceHead, 0,
+ background.scheduleWithFixedDelay(this::advanceHead, 1,
SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS);
}
@@ -93,6 +93,28 @@
private void incrementCount(int slot, long value) {
counters.get(slot).addAndGet(value);
+ totalCount.addAndGet(value);
+ }
+
+ /**
+ * Gets the total count for the last N window slots.
+ *
+ * @param slots number of slots to include in the count
+ * @return total count for last N slots
+ * @deprecated since 1.12
+ */
+ @Deprecated
+ public long get(int slots) {
+ return getWindowCount(slots);
+ }
+
+ /**
+ * Gets the total count for all slots.
+ *
+ * @return total count for all slots
+ */
+ public long getWindowCount() {
+ return getWindowCount(windowSlots);
}
/**
@@ -101,12 +123,13 @@
* @param slots number of slots to include in the count
* @return total count for last N slots
*/
- public long get(int slots) {
+ public long getWindowCount(int slots) {
checkArgument(slots <= windowSlots,
"Requested window must be less than the total window slots");
long sum = 0;
+ slots = min(slots, counters.size());
for (int i = 0; i < slots; i++) {
int currentIndex = headSlot - i;
if (currentIndex < 0) {
@@ -118,9 +141,62 @@
return sum;
}
+ /**
+ * Returns the average rate over the window.
+ *
+ * @return the average rate over the window
+ */
+ public double getWindowRate() {
+ return getWindowRate(windowSlots);
+ }
+
+ /**
+ * Returns the average rate over the given window.
+ *
+ * @param slots the number of slots to include in the window
+ * @return the average rate over the given window
+ */
+ public double getWindowRate(int slots) {
+ return getWindowCount(slots) / (double) min(slots, counters.size());
+ }
+
+ /**
+ * Returns the overall number of increments.
+ *
+ * @return the overall number of increments
+ */
+ public long getOverallCount() {
+ return totalCount.get();
+ }
+
+ /**
+ * Returns the overall rate.
+ *
+ * @return the overall rate
+ */
+ public double getOverallRate() {
+ return totalCount.get() / (double) totalSlots.get();
+ }
+
+ /**
+ * Clears the counter.
+ */
+ public void clear() {
+ counters.clear();
+ counters.add(new AtomicLong());
+ totalCount.set(0);
+ totalSlots.set(1);
+ headSlot = 0;
+ }
+
void advanceHead() {
- counters.get(slotAfter(headSlot)).set(0);
- headSlot = slotAfter(headSlot);
+ if (counters.size() - 1 < slotAfter(headSlot)) {
+ counters.add(0, new AtomicLong(0));
+ } else {
+ counters.get(slotAfter(headSlot)).set(0);
+ headSlot = slotAfter(headSlot);
+ }
+ totalSlots.incrementAndGet();
}
private int slotAfter(int slot) {
diff --git a/utils/misc/src/test/java/org/onlab/util/SlidingWindowCounterTest.java b/utils/misc/src/test/java/org/onlab/util/SlidingWindowCounterTest.java
index 445584e..039b1ff 100644
--- a/utils/misc/src/test/java/org/onlab/util/SlidingWindowCounterTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/SlidingWindowCounterTest.java
@@ -17,7 +17,6 @@
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import static junit.framework.TestCase.fail;
@@ -27,8 +26,6 @@
/**
* Unit tests for the sliding window counter.
*/
-
-@Ignore("Disable these for now because of intermittent load related failures on Jenkins runs.")
public class SlidingWindowCounterTest {
private SlidingWindowCounter counter;
@@ -80,6 +77,31 @@
}
@Test
+ public void testRates() {
+ assertEquals(0, counter.getWindowRate(), 0.01);
+ assertEquals(0, counter.getOverallRate(), 0.01);
+ assertEquals(0, counter.getOverallCount());
+ counter.incrementCount();
+ assertEquals(1, counter.getWindowRate(), 0.01);
+ assertEquals(1, counter.getOverallRate(), 0.01);
+ assertEquals(1, counter.getOverallCount());
+ counter.advanceHead();
+ counter.incrementCount();
+ counter.incrementCount();
+ assertEquals(1.5, counter.getWindowRate(), 0.01);
+ assertEquals(2, counter.getWindowRate(1), 0.01);
+ assertEquals(1.5, counter.getOverallRate(), 0.01);
+ assertEquals(3, counter.getOverallCount());
+ counter.advanceHead();
+ counter.incrementCount();
+ counter.incrementCount();
+ counter.incrementCount();
+ assertEquals(2.5, counter.getWindowRate(), 0.01);
+ assertEquals(2, counter.getOverallRate(), 0.01);
+ assertEquals(6, counter.getOverallCount());
+ }
+
+ @Test
public void testCornerCases() {
try {
counter.get(3);