Modify OpenFlow provider to avoid stats collection when switch is under high load
* Track load via sliding window
* Pause stats collection when load is significantly above average
* Resume stats collection when load returns to normal
* Pause stats collection when reply is in-flight

Change-Id: I3159b4f806a6405ca6be494534497348716cc921
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index 1aabf24..c8bf707 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -16,6 +16,8 @@
 package org.onosproject.provider.of.flow.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.util.SlidingWindowCounter;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
@@ -25,7 +27,10 @@
 
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -35,11 +40,31 @@
 
     private final Logger log = getLogger(getClass());
 
-    public static final long SECONDS = 1000L;
+    private static final int SECONDS = 1000;
+
+    // Number of ticks which defines the pause window.
+    private static final int PAUSE_WINDOW = 2;
+    // Number of ticks which defines the high load window
+    private static final int HIGH_WINDOW = 60;
+    // Number of ticks which defines the low load window
+    private static final int LOW_WINDOW = 15;
+    // Multiplier factor of the load
+    private static final int LOAD_FACTOR = 2;
+    // Event/s defining the min load rate
+    private static final int MIN_LOAD_RATE = 50;
+    // Event/s defining the max load rate
+    private static final int MAX_LOAD_RATE = 500;
 
     private final OpenFlowSwitch sw;
     private Timer timer;
-    private TimerTask task;
+    private TimerTask pauseTask;
+    private TimerTask pollTask;
+
+    private SlidingWindowCounter loadCounter;
+    // Defines whether the collector is in pause or not for high load
+    private final AtomicBoolean paused = new AtomicBoolean();
+    // Defines whether the collector is in waiting or not for a previous stats reply
+    private final AtomicBoolean waiting = new AtomicBoolean();
 
     private int pollInterval;
 
@@ -63,15 +88,104 @@
      */
     synchronized void adjustPollInterval(int pollInterval) {
         this.pollInterval = pollInterval;
-        task.cancel();
-        task = new InternalTimerTask();
-        timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000L);
+        if (pollTask != null) {
+            pollTask.cancel();
+        }
+        // If we went through start - let's schedule it
+        if (loadCounter != null) {
+            pollTask = new PollTimerTask();
+            timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+        }
     }
 
-    private class InternalTimerTask extends TimerTask {
+    /**
+     * Resets the collector's event count.
+     */
+    @Override
+    public synchronized void resetEvents() {
+        loadCounter.clear();
+        if (paused.compareAndSet(true, false)) {
+            resume();
+        }
+        // Let's reset also waiting, the reply can be discarded/lost
+        // during a change of mastership
+        waiting.set(false);
+    }
+
+    /**
+     * Records a number of flow events that have occurred.
+     *
+     * @param events the number of events that occurred
+     */
+    @Override
+    public void recordEvents(int events) {
+        SlidingWindowCounter loadCounter = this.loadCounter;
+        if (loadCounter != null) {
+            loadCounter.incrementCount(events);
+        }
+    }
+
+    /**
+     * Returns a boolean indicating whether the switch is under high load.
+     * <p>
+     * The switch is considered under high load if the average rate over the last two seconds is
+     * greater than twice the overall rate or 50 flows/sec.
+     *
+     * @return indicates whether the switch is under high load
+     */
+    private boolean isHighLoad() {
+        return loadCounter.getWindowRate(PAUSE_WINDOW)
+            > max(min(loadCounter.getWindowRate(HIGH_WINDOW) * LOAD_FACTOR, MAX_LOAD_RATE), MIN_LOAD_RATE);
+    }
+
+    /**
+     * Returns a boolean indicating whether the switch is under low load.
+     * <p>
+     * The switch is considered under low load if the average rate over the last 15 seconds is
+     * less than the overall rate.
+     *
+     * @return indicates whether the switch is under low load
+     */
+    private boolean isLowLoad() {
+        return loadCounter.getWindowRate(LOW_WINDOW) < loadCounter.getWindowRate(HIGH_WINDOW);
+    }
+
+    private class PauseTimerTask extends TimerTask {
         @Override
         public void run() {
-            if (sw.getRole() == RoleState.MASTER) {
+            if (isHighLoad()) {
+                if (paused.compareAndSet(false, true)) {
+                    pause();
+                }
+            } else if (isLowLoad()) {
+                if (paused.compareAndSet(true, false)) {
+                    resume();
+                }
+            }
+        }
+    }
+
+    private class PollTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            // Check whether we are still waiting a previous reply
+            if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
+                // Check whether the switch is under high load from this master. This is done here in case a large
+                // batch was pushed immediately prior to this task running.
+                if (isHighLoad()) {
+                    log.debug("Skipping stats collection for {} due to high load; rate: {}; overall: {}",
+                              sw.getStringId(),
+                              loadCounter.getWindowRate(PAUSE_WINDOW),
+                              loadCounter.getWindowRate(HIGH_WINDOW));
+                    return;
+                } else {
+                    log.debug(
+                        "Permitting stats collection for {}; rate: {}; overall: {}",
+                        sw.getStringId(),
+                        loadCounter.getWindowRate(PAUSE_WINDOW),
+                        loadCounter.getWindowRate(HIGH_WINDOW));
+                }
+
                 log.trace("Collecting stats for {}", sw.getStringId());
                 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
                         .setMatch(sw.factory().matchWildcardAll())
@@ -79,6 +193,9 @@
                         .setOutPort(OFPort.NO_MASK)
                         .build();
                 sw.sendMsg(request);
+                // Other flow stats will not be asked
+                // if we don't see first the reply of this request
+                waiting.set(true);
             }
         }
     }
@@ -86,17 +203,51 @@
     public synchronized void start() {
         // Initially start polling quickly. Then drop down to configured value
         log.debug("Starting Stats collection thread for {}", sw.getStringId());
-        task = new InternalTimerTask();
-        timer.scheduleAtFixedRate(task, 1 * SECONDS,
-                                  pollInterval * SECONDS);
+        loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
+        pauseTask = new PauseTimerTask();
+        timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
+        pollTask = new PollTimerTask();
+        timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
+    }
+
+    private synchronized void pause() {
+        if (pollTask != null) {
+            log.debug("Pausing stats collection for {}; rate: {}; overall: {}",
+                      sw.getStringId(),
+                      loadCounter.getWindowRate(PAUSE_WINDOW),
+                      loadCounter.getWindowRate(HIGH_WINDOW));
+            pollTask.cancel();
+            pollTask = null;
+        }
+    }
+
+    private synchronized void resume() {
+        log.debug("Resuming stats collection for {}; rate: {}; overall: {}",
+                  sw.getStringId(),
+                  loadCounter.getWindowRate(PAUSE_WINDOW),
+                  loadCounter.getWindowRate(HIGH_WINDOW));
+        pollTask = new PollTimerTask();
+        timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
     }
 
     public synchronized void stop() {
-        if (task != null) {
-            log.debug("Stopping Stats collection thread for {}", sw.getStringId());
-            task.cancel();
-            task = null;
+        if (pauseTask != null) {
+            pauseTask.cancel();
+            pauseTask = null;
         }
+        if (pollTask != null) {
+            log.debug("Stopping Stats collection thread for {}", sw.getStringId());
+            pollTask.cancel();
+            pollTask = null;
+        }
+        if (loadCounter != null) {
+            loadCounter.destroy();
+            loadCounter = null;
+        }
+    }
+
+    public void received() {
+        waiting.set(false);
     }
 
 }
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 8730378..5f2713b5 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -293,6 +293,34 @@
         tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency));
     }
 
+    private void resetEvents(Dpid dpid) {
+        SwitchDataCollector collector;
+        if (adaptiveFlowSampling) {
+            collector = afsCollectors.get(dpid);
+        } else {
+            collector = simpleCollectors.get(dpid);
+        }
+        if (collector != null) {
+            collector.resetEvents();
+        }
+    }
+
+    private void recordEvent(Dpid dpid) {
+        recordEvents(dpid, 1);
+    }
+
+    private void recordEvents(Dpid dpid, int events) {
+        SwitchDataCollector collector;
+        if (adaptiveFlowSampling) {
+            collector = afsCollectors.get(dpid);
+        } else {
+            collector = simpleCollectors.get(dpid);
+        }
+        if (collector != null) {
+            collector.recordEvents(events);
+        }
+    }
+
     @Override
     public void applyFlowRule(FlowRule... flowRules) {
         for (FlowRule flowRule : flowRules) {
@@ -310,6 +338,8 @@
 
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                 Optional.empty(), Optional.of(driverService)).buildFlowAdd());
+
+        recordEvent(dpid);
     }
 
     @Override
@@ -329,6 +359,8 @@
 
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                                           Optional.empty(), Optional.of(driverService)).buildFlowDel());
+
+        recordEvent(dpid);
     }
 
     @Override
@@ -371,6 +403,8 @@
         OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
                 .setXid(batch.id());
         sw.sendMsg(builder.build());
+
+        recordEvents(dpid, batch.getOperations().size());
     }
 
     private class InternalFlowProvider
@@ -416,6 +450,16 @@
                     break;
                 case STATS_REPLY:
                     if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+                        // Let's unblock first the collector
+                        SwitchDataCollector collector;
+                        if (adaptiveFlowSampling) {
+                            collector = afsCollectors.get(dpid);
+                        } else {
+                            collector = simpleCollectors.get(dpid);
+                        }
+                        if (collector != null) {
+                            collector.received();
+                        }
                         pushFlowMetrics(dpid, (OFFlowStatsReply) msg, getDriver(deviceId));
                     } else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) {
                         pushTableStatistics(dpid, (OFTableStatsReply) msg);
@@ -584,7 +628,9 @@
         @Override
         public void receivedRoleReply(Dpid dpid, RoleState requested,
                                       RoleState response) {
-            // Do nothing here for now.
+            if (response == RoleState.MASTER) {
+                resetEvents(dpid);
+            }
         }
 
         private DriverHandler getDriver(DeviceId devId) {
@@ -757,4 +803,4 @@
         }
     }
 
-}
\ No newline at end of file
+}
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
index ae1664b..2c42d2b 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/SwitchDataCollector.java
@@ -30,4 +30,27 @@
      * Stops the collector.
      */
     void stop();
+
+    /**
+     * Signals reply has been received.
+     */
+    default void received() {
+
+    }
+
+    /**
+     * Records the number of events seen.
+     *
+     * @param events number of events
+     */
+    default void recordEvents(int events) {
+
+    }
+
+    /**
+     * Resets the number of events seen.
+     */
+    default void resetEvents() {
+
+    }
 }
\ No newline at end of file