Modify OpenFlow provider to avoid stats collection when switch is under high load
* Track load via sliding window
* Pause stats collection when load is significantly above average
* Resume stats collection when load returns to normal
* Pause stats collection when reply is in-flight

Change-Id: I3159b4f806a6405ca6be494534497348716cc921
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index 1aabf24..c8bf707 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -16,6 +16,8 @@
 package org.onosproject.provider.of.flow.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.util.SlidingWindowCounter;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
@@ -25,7 +27,10 @@
 
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -35,11 +40,31 @@
 
     private final Logger log = getLogger(getClass());
 
-    public static final long SECONDS = 1000L;
+    private static final int SECONDS = 1000;
+
+    // Number of ticks which defines the pause window.
+    private static final int PAUSE_WINDOW = 2;
+    // Number of ticks which defines the high load window
+    private static final int HIGH_WINDOW = 60;
+    // Number of ticks which defines the low load window
+    private static final int LOW_WINDOW = 15;
+    // Multiplier factor of the load
+    private static final int LOAD_FACTOR = 2;
+    // Event/s defining the min load rate
+    private static final int MIN_LOAD_RATE = 50;
+    // Event/s defining the max load rate
+    private static final int MAX_LOAD_RATE = 500;
 
     private final OpenFlowSwitch sw;
     private Timer timer;
-    private TimerTask task;
+    private TimerTask pauseTask;
+    private TimerTask pollTask;
+
+    private SlidingWindowCounter loadCounter;
+    // Defines whether the collector is in pause or not for high load
+    private final AtomicBoolean paused = new AtomicBoolean();
+    // Defines whether the collector is in waiting or not for a previous stats reply
+    private final AtomicBoolean waiting = new AtomicBoolean();
 
     private int pollInterval;
 
@@ -63,15 +88,104 @@
      */
     synchronized void adjustPollInterval(int pollInterval) {
         this.pollInterval = pollInterval;
-        task.cancel();
-        task = new InternalTimerTask();
-        timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000L);
+        if (pollTask != null) {
+            pollTask.cancel();
+        }
+        // If we went through start - let's schedule it
+        if (loadCounter != null) {
+            pollTask = new PollTimerTask();
+            timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+        }
     }
 
-    private class InternalTimerTask extends TimerTask {
+    /**
+     * Resets the collector's event count.
+     */
+    @Override
+    public synchronized void resetEvents() {
+        loadCounter.clear();
+        if (paused.compareAndSet(true, false)) {
+            resume();
+        }
+        // Let's reset also waiting, the reply can be discarded/lost
+        // during a change of mastership
+        waiting.set(false);
+    }
+
+    /**
+     * Records a number of flow events that have occurred.
+     *
+     * @param events the number of events that occurred
+     */
+    @Override
+    public void recordEvents(int events) {
+        SlidingWindowCounter loadCounter = this.loadCounter;
+        if (loadCounter != null) {
+            loadCounter.incrementCount(events);
+        }
+    }
+
+    /**
+     * Returns a boolean indicating whether the switch is under high load.
+     * <p>
+     * The switch is considered under high load if the average rate over the last two seconds is
+     * greater than twice the overall rate or 50 flows/sec.
+     *
+     * @return indicates whether the switch is under high load
+     */
+    private boolean isHighLoad() {
+        return loadCounter.getWindowRate(PAUSE_WINDOW)
+            > max(min(loadCounter.getWindowRate(HIGH_WINDOW) * LOAD_FACTOR, MAX_LOAD_RATE), MIN_LOAD_RATE);
+    }
+
+    /**
+     * Returns a boolean indicating whether the switch is under low load.
+     * <p>
+     * The switch is considered under low load if the average rate over the last 15 seconds is
+     * less than the overall rate.
+     *
+     * @return indicates whether the switch is under low load
+     */
+    private boolean isLowLoad() {
+        return loadCounter.getWindowRate(LOW_WINDOW) < loadCounter.getWindowRate(HIGH_WINDOW);
+    }
+
+    private class PauseTimerTask extends TimerTask {
         @Override
         public void run() {
-            if (sw.getRole() == RoleState.MASTER) {
+            if (isHighLoad()) {
+                if (paused.compareAndSet(false, true)) {
+                    pause();
+                }
+            } else if (isLowLoad()) {
+                if (paused.compareAndSet(true, false)) {
+                    resume();
+                }
+            }
+        }
+    }
+
+    private class PollTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            // Check whether we are still waiting a previous reply
+            if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
+                // Check whether the switch is under high load from this master. This is done here in case a large
+                // batch was pushed immediately prior to this task running.
+                if (isHighLoad()) {
+                    log.debug("Skipping stats collection for {} due to high load; rate: {}; overall: {}",
+                              sw.getStringId(),
+                              loadCounter.getWindowRate(PAUSE_WINDOW),
+                              loadCounter.getWindowRate(HIGH_WINDOW));
+                    return;
+                } else {
+                    log.debug(
+                        "Permitting stats collection for {}; rate: {}; overall: {}",
+                        sw.getStringId(),
+                        loadCounter.getWindowRate(PAUSE_WINDOW),
+                        loadCounter.getWindowRate(HIGH_WINDOW));
+                }
+
                 log.trace("Collecting stats for {}", sw.getStringId());
                 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
                         .setMatch(sw.factory().matchWildcardAll())
@@ -79,6 +193,9 @@
                         .setOutPort(OFPort.NO_MASK)
                         .build();
                 sw.sendMsg(request);
+                // Other flow stats will not be asked
+                // if we don't see first the reply of this request
+                waiting.set(true);
             }
         }
     }
@@ -86,17 +203,51 @@
     public synchronized void start() {
         // Initially start polling quickly. Then drop down to configured value
         log.debug("Starting Stats collection thread for {}", sw.getStringId());
-        task = new InternalTimerTask();
-        timer.scheduleAtFixedRate(task, 1 * SECONDS,
-                                  pollInterval * SECONDS);
+        loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
+        pauseTask = new PauseTimerTask();
+        timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
+        pollTask = new PollTimerTask();
+        timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
+    }
+
+    private synchronized void pause() {
+        if (pollTask != null) {
+            log.debug("Pausing stats collection for {}; rate: {}; overall: {}",
+                      sw.getStringId(),
+                      loadCounter.getWindowRate(PAUSE_WINDOW),
+                      loadCounter.getWindowRate(HIGH_WINDOW));
+            pollTask.cancel();
+            pollTask = null;
+        }
+    }
+
+    private synchronized void resume() {
+        log.debug("Resuming stats collection for {}; rate: {}; overall: {}",
+                  sw.getStringId(),
+                  loadCounter.getWindowRate(PAUSE_WINDOW),
+                  loadCounter.getWindowRate(HIGH_WINDOW));
+        pollTask = new PollTimerTask();
+        timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
     }
 
     public synchronized void stop() {
-        if (task != null) {
-            log.debug("Stopping Stats collection thread for {}", sw.getStringId());
-            task.cancel();
-            task = null;
+        if (pauseTask != null) {
+            pauseTask.cancel();
+            pauseTask = null;
         }
+        if (pollTask != null) {
+            log.debug("Stopping Stats collection thread for {}", sw.getStringId());
+            pollTask.cancel();
+            pollTask = null;
+        }
+        if (loadCounter != null) {
+            loadCounter.destroy();
+            loadCounter = null;
+        }
+    }
+
+    public void received() {
+        waiting.set(false);
     }
 
 }