Modify OpenFlow provider to avoid stats collection when switch is under high load
* Track load via sliding window
* Pause stats collection when load is significantly above average
* Resume stats collection when load returns to normal
* Pause stats collection when reply is in-flight

Change-Id: I3159b4f806a6405ca6be494534497348716cc921
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index 1aabf24..c8bf707 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -16,6 +16,8 @@
 package org.onosproject.provider.of.flow.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.util.SlidingWindowCounter;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
@@ -25,7 +27,10 @@
 
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -35,11 +40,31 @@
 
     private final Logger log = getLogger(getClass());
 
-    public static final long SECONDS = 1000L;
+    private static final int SECONDS = 1000;
+
+    // Number of ticks which defines the pause window.
+    private static final int PAUSE_WINDOW = 2;
+    // Number of ticks which defines the high load window
+    private static final int HIGH_WINDOW = 60;
+    // Number of ticks which defines the low load window
+    private static final int LOW_WINDOW = 15;
+    // Multiplier factor of the load
+    private static final int LOAD_FACTOR = 2;
+    // Event/s defining the min load rate
+    private static final int MIN_LOAD_RATE = 50;
+    // Event/s defining the max load rate
+    private static final int MAX_LOAD_RATE = 500;
 
     private final OpenFlowSwitch sw;
     private Timer timer;
-    private TimerTask task;
+    private TimerTask pauseTask;
+    private TimerTask pollTask;
+
+    private SlidingWindowCounter loadCounter;
+    // Defines whether the collector is in pause or not for high load
+    private final AtomicBoolean paused = new AtomicBoolean();
+    // Defines whether the collector is in waiting or not for a previous stats reply
+    private final AtomicBoolean waiting = new AtomicBoolean();
 
     private int pollInterval;
 
@@ -63,15 +88,104 @@
      */
     synchronized void adjustPollInterval(int pollInterval) {
         this.pollInterval = pollInterval;
-        task.cancel();
-        task = new InternalTimerTask();
-        timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000L);
+        if (pollTask != null) {
+            pollTask.cancel();
+        }
+        // If we went through start - let's schedule it
+        if (loadCounter != null) {
+            pollTask = new PollTimerTask();
+            timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+        }
     }
 
-    private class InternalTimerTask extends TimerTask {
+    /**
+     * Resets the collector's event count.
+     */
+    @Override
+    public synchronized void resetEvents() {
+        loadCounter.clear();
+        if (paused.compareAndSet(true, false)) {
+            resume();
+        }
+        // Let's reset also waiting, the reply can be discarded/lost
+        // during a change of mastership
+        waiting.set(false);
+    }
+
+    /**
+     * Records a number of flow events that have occurred.
+     *
+     * @param events the number of events that occurred
+     */
+    @Override
+    public void recordEvents(int events) {
+        SlidingWindowCounter loadCounter = this.loadCounter;
+        if (loadCounter != null) {
+            loadCounter.incrementCount(events);
+        }
+    }
+
+    /**
+     * Returns a boolean indicating whether the switch is under high load.
+     * <p>
+     * The switch is considered under high load if the average rate over the last two seconds is
+     * greater than twice the overall rate or 50 flows/sec.
+     *
+     * @return indicates whether the switch is under high load
+     */
+    private boolean isHighLoad() {
+        return loadCounter.getWindowRate(PAUSE_WINDOW)
+            > max(min(loadCounter.getWindowRate(HIGH_WINDOW) * LOAD_FACTOR, MAX_LOAD_RATE), MIN_LOAD_RATE);
+    }
+
+    /**
+     * Returns a boolean indicating whether the switch is under low load.
+     * <p>
+     * The switch is considered under low load if the average rate over the last 15 seconds is
+     * less than the overall rate.
+     *
+     * @return indicates whether the switch is under low load
+     */
+    private boolean isLowLoad() {
+        return loadCounter.getWindowRate(LOW_WINDOW) < loadCounter.getWindowRate(HIGH_WINDOW);
+    }
+
+    private class PauseTimerTask extends TimerTask {
         @Override
         public void run() {
-            if (sw.getRole() == RoleState.MASTER) {
+            if (isHighLoad()) {
+                if (paused.compareAndSet(false, true)) {
+                    pause();
+                }
+            } else if (isLowLoad()) {
+                if (paused.compareAndSet(true, false)) {
+                    resume();
+                }
+            }
+        }
+    }
+
+    private class PollTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            // Check whether we are still waiting a previous reply
+            if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
+                // Check whether the switch is under high load from this master. This is done here in case a large
+                // batch was pushed immediately prior to this task running.
+                if (isHighLoad()) {
+                    log.debug("Skipping stats collection for {} due to high load; rate: {}; overall: {}",
+                              sw.getStringId(),
+                              loadCounter.getWindowRate(PAUSE_WINDOW),
+                              loadCounter.getWindowRate(HIGH_WINDOW));
+                    return;
+                } else {
+                    log.debug(
+                        "Permitting stats collection for {}; rate: {}; overall: {}",
+                        sw.getStringId(),
+                        loadCounter.getWindowRate(PAUSE_WINDOW),
+                        loadCounter.getWindowRate(HIGH_WINDOW));
+                }
+
                 log.trace("Collecting stats for {}", sw.getStringId());
                 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
                         .setMatch(sw.factory().matchWildcardAll())
@@ -79,6 +193,9 @@
                         .setOutPort(OFPort.NO_MASK)
                         .build();
                 sw.sendMsg(request);
+                // Other flow stats will not be asked
+                // if we don't see first the reply of this request
+                waiting.set(true);
             }
         }
     }
@@ -86,17 +203,51 @@
     public synchronized void start() {
         // Initially start polling quickly. Then drop down to configured value
         log.debug("Starting Stats collection thread for {}", sw.getStringId());
-        task = new InternalTimerTask();
-        timer.scheduleAtFixedRate(task, 1 * SECONDS,
-                                  pollInterval * SECONDS);
+        loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
+        pauseTask = new PauseTimerTask();
+        timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
+        pollTask = new PollTimerTask();
+        timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
+    }
+
+    private synchronized void pause() {
+        if (pollTask != null) {
+            log.debug("Pausing stats collection for {}; rate: {}; overall: {}",
+                      sw.getStringId(),
+                      loadCounter.getWindowRate(PAUSE_WINDOW),
+                      loadCounter.getWindowRate(HIGH_WINDOW));
+            pollTask.cancel();
+            pollTask = null;
+        }
+    }
+
+    private synchronized void resume() {
+        log.debug("Resuming stats collection for {}; rate: {}; overall: {}",
+                  sw.getStringId(),
+                  loadCounter.getWindowRate(PAUSE_WINDOW),
+                  loadCounter.getWindowRate(HIGH_WINDOW));
+        pollTask = new PollTimerTask();
+        timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
     }
 
     public synchronized void stop() {
-        if (task != null) {
-            log.debug("Stopping Stats collection thread for {}", sw.getStringId());
-            task.cancel();
-            task = null;
+        if (pauseTask != null) {
+            pauseTask.cancel();
+            pauseTask = null;
         }
+        if (pollTask != null) {
+            log.debug("Stopping Stats collection thread for {}", sw.getStringId());
+            pollTask.cancel();
+            pollTask = null;
+        }
+        if (loadCounter != null) {
+            loadCounter.destroy();
+            loadCounter = null;
+        }
+    }
+
+    public void received() {
+        waiting.set(false);
     }
 
 }
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index a72c082..f2419e0 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -293,6 +293,34 @@
         tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency));
     }
 
+    private void resetEvents(Dpid dpid) {
+        SwitchDataCollector collector;
+        if (adaptiveFlowSampling) {
+            collector = afsCollectors.get(dpid);
+        } else {
+            collector = simpleCollectors.get(dpid);
+        }
+        if (collector != null) {
+            collector.resetEvents();
+        }
+    }
+
+    private void recordEvent(Dpid dpid) {
+        recordEvents(dpid, 1);
+    }
+
+    private void recordEvents(Dpid dpid, int events) {
+        SwitchDataCollector collector;
+        if (adaptiveFlowSampling) {
+            collector = afsCollectors.get(dpid);
+        } else {
+            collector = simpleCollectors.get(dpid);
+        }
+        if (collector != null) {
+            collector.recordEvents(events);
+        }
+    }
+
     @Override
     public void applyFlowRule(FlowRule... flowRules) {
         for (FlowRule flowRule : flowRules) {
@@ -316,6 +344,8 @@
         }
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                 Optional.empty(), Optional.of(driverService)).buildFlowAdd());
+
+        recordEvent(dpid);
     }
 
     @Override
@@ -341,6 +371,8 @@
         }
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                                           Optional.empty(), Optional.of(driverService)).buildFlowDel());
+
+        recordEvent(dpid);
     }
 
     @Override
@@ -397,6 +429,8 @@
         OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
                 .setXid(batch.id());
         sw.sendMsg(builder.build());
+
+        recordEvents(dpid, batch.getOperations().size());
     }
 
     private boolean hasPayload(FlowRuleExtPayLoad flowRuleExtPayLoad) {
@@ -448,6 +482,16 @@
                     break;
                 case STATS_REPLY:
                     if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+                        // Let's unblock first the collector
+                        SwitchDataCollector collector;
+                        if (adaptiveFlowSampling) {
+                            collector = afsCollectors.get(dpid);
+                        } else {
+                            collector = simpleCollectors.get(dpid);
+                        }
+                        if (collector != null) {
+                            collector.received();
+                        }
                         pushFlowMetrics(dpid, (OFFlowStatsReply) msg, getDriver(deviceId));
                     } else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) {
                         pushTableStatistics(dpid, (OFTableStatsReply) msg);
@@ -616,7 +660,9 @@
         @Override
         public void receivedRoleReply(Dpid dpid, RoleState requested,
                                       RoleState response) {
-            // Do nothing here for now.
+            if (response == RoleState.MASTER) {
+                resetEvents(dpid);
+            }
         }
 
         private DriverHandler getDriver(DeviceId devId) {
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
index ae1664b..2c42d2b 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
@@ -30,4 +30,27 @@
      * Stops the collector.
      */
     void stop();
+
+    /**
+     * Signals reply has been received.
+     */
+    default void received() {
+
+    }
+
+    /**
+     * Records the number of events seen.
+     *
+     * @param events number of events
+     */
+    default void recordEvents(int events) {
+
+    }
+
+    /**
+     * Resets the number of events seen.
+     */
+    default void resetEvents() {
+
+    }
 }
\ No newline at end of file
diff --git a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
index 33dfec1..cc4f4fe 100644
--- a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
+++ b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
@@ -16,14 +16,13 @@
 package org.onlab.util;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.Math.min;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 
@@ -43,6 +42,9 @@
 
     private final ScheduledExecutorService background;
 
+    private final AtomicLong totalCount = new AtomicLong();
+    private final AtomicLong totalSlots = new AtomicLong(1);
+
     private static final int SLIDE_WINDOW_PERIOD_SECONDS = 1;
 
     /**
@@ -58,13 +60,11 @@
         this.headSlot = 0;
 
         // Initialize each item in the list to an AtomicLong of 0
-        this.counters = Collections.nCopies(windowSlots, 0)
-                .stream()
-                .map(AtomicLong::new)
-                .collect(Collectors.toCollection(ArrayList::new));
+        this.counters = new ArrayList<>();
+        this.counters.add(new AtomicLong());
 
         background = newSingleThreadScheduledExecutor(groupedThreads("SlidingWindowCounter", "bg-%d"));
-        background.scheduleWithFixedDelay(this::advanceHead, 0,
+        background.scheduleWithFixedDelay(this::advanceHead, 1,
                                           SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS);
     }
 
@@ -93,6 +93,28 @@
 
     private void incrementCount(int slot, long value) {
         counters.get(slot).addAndGet(value);
+        totalCount.addAndGet(value);
+    }
+
+    /**
+     * Gets the total count for the last N window slots.
+     *
+     * @param slots number of slots to include in the count
+     * @return total count for last N slots
+     * @deprecated since 1.12
+     */
+    @Deprecated
+    public long get(int slots) {
+        return getWindowCount(slots);
+    }
+
+    /**
+     * Gets the total count for all slots.
+     *
+     * @return total count for all slots
+     */
+    public long getWindowCount() {
+        return getWindowCount(windowSlots);
     }
 
     /**
@@ -101,12 +123,13 @@
      * @param slots number of slots to include in the count
      * @return total count for last N slots
      */
-    public long get(int slots) {
+    public long getWindowCount(int slots) {
         checkArgument(slots <= windowSlots,
                       "Requested window must be less than the total window slots");
 
         long sum = 0;
 
+        slots = min(slots, counters.size());
         for (int i = 0; i < slots; i++) {
             int currentIndex = headSlot - i;
             if (currentIndex < 0) {
@@ -118,9 +141,62 @@
         return sum;
     }
 
+    /**
+     * Returns the average rate over the window.
+     *
+     * @return the average rate over the window
+     */
+    public double getWindowRate() {
+        return getWindowRate(windowSlots);
+    }
+
+    /**
+     * Returns the average rate over the given window.
+     *
+     * @param slots the number of slots to include in the window
+     * @return the average rate over the given window
+     */
+    public double getWindowRate(int slots) {
+        return getWindowCount(slots) / (double) min(slots, counters.size());
+    }
+
+    /**
+     * Returns the overall number of increments.
+     *
+     * @return the overall number of increments
+     */
+    public long getOverallCount() {
+        return totalCount.get();
+    }
+
+    /**
+     * Returns the overall rate.
+     *
+     * @return the overall rate
+     */
+    public double getOverallRate() {
+        return totalCount.get() / (double) totalSlots.get();
+    }
+
+    /**
+     * Clears the counter.
+     */
+    public void clear() {
+        counters.clear();
+        counters.add(new AtomicLong());
+        totalCount.set(0);
+        totalSlots.set(1);
+        headSlot = 0;
+    }
+
     void advanceHead() {
-        counters.get(slotAfter(headSlot)).set(0);
-        headSlot = slotAfter(headSlot);
+        if (counters.size() - 1 < slotAfter(headSlot)) {
+            counters.add(0, new AtomicLong(0));
+        } else {
+            counters.get(slotAfter(headSlot)).set(0);
+            headSlot = slotAfter(headSlot);
+        }
+        totalSlots.incrementAndGet();
     }
 
     private int slotAfter(int slot) {
diff --git a/utils/misc/src/test/java/org/onlab/util/SlidingWindowCounterTest.java b/utils/misc/src/test/java/org/onlab/util/SlidingWindowCounterTest.java
index 445584e..039b1ff 100644
--- a/utils/misc/src/test/java/org/onlab/util/SlidingWindowCounterTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/SlidingWindowCounterTest.java
@@ -17,7 +17,6 @@
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static junit.framework.TestCase.fail;
@@ -27,8 +26,6 @@
 /**
  * Unit tests for the sliding window counter.
  */
-
-@Ignore("Disable these for now because of intermittent load related failures on Jenkins runs.")
 public class SlidingWindowCounterTest {
 
     private SlidingWindowCounter counter;
@@ -80,6 +77,31 @@
     }
 
     @Test
+    public void testRates() {
+        assertEquals(0, counter.getWindowRate(), 0.01);
+        assertEquals(0, counter.getOverallRate(), 0.01);
+        assertEquals(0, counter.getOverallCount());
+        counter.incrementCount();
+        assertEquals(1, counter.getWindowRate(), 0.01);
+        assertEquals(1, counter.getOverallRate(), 0.01);
+        assertEquals(1, counter.getOverallCount());
+        counter.advanceHead();
+        counter.incrementCount();
+        counter.incrementCount();
+        assertEquals(1.5, counter.getWindowRate(), 0.01);
+        assertEquals(2, counter.getWindowRate(1), 0.01);
+        assertEquals(1.5, counter.getOverallRate(), 0.01);
+        assertEquals(3, counter.getOverallCount());
+        counter.advanceHead();
+        counter.incrementCount();
+        counter.incrementCount();
+        counter.incrementCount();
+        assertEquals(2.5, counter.getWindowRate(), 0.01);
+        assertEquals(2, counter.getOverallRate(), 0.01);
+        assertEquals(6, counter.getOverallCount());
+    }
+
+    @Test
     public void testCornerCases() {
         try {
             counter.get(3);