Modify OpenFlow provider to avoid stats collection when switch is under high load
* Track load via sliding window
* Pause stats collection when load is significantly above average
* Resume stats collection when load returns to normal
* Pause stats collection when reply is in-flight

Change-Id: I3159b4f806a6405ca6be494534497348716cc921
diff --git a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
index 33dfec1..cc4f4fe 100644
--- a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
+++ b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
@@ -16,14 +16,13 @@
 package org.onlab.util;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.Math.min;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 
@@ -43,6 +42,9 @@
 
     private final ScheduledExecutorService background;
 
+    private final AtomicLong totalCount = new AtomicLong();
+    private final AtomicLong totalSlots = new AtomicLong(1);
+
     private static final int SLIDE_WINDOW_PERIOD_SECONDS = 1;
 
     /**
@@ -58,13 +60,11 @@
         this.headSlot = 0;
 
         // Initialize each item in the list to an AtomicLong of 0
-        this.counters = Collections.nCopies(windowSlots, 0)
-                .stream()
-                .map(AtomicLong::new)
-                .collect(Collectors.toCollection(ArrayList::new));
+        this.counters = new ArrayList<>();
+        this.counters.add(new AtomicLong());
 
         background = newSingleThreadScheduledExecutor(groupedThreads("SlidingWindowCounter", "bg-%d"));
-        background.scheduleWithFixedDelay(this::advanceHead, 0,
+        background.scheduleWithFixedDelay(this::advanceHead, 1,
                                           SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS);
     }
 
@@ -93,6 +93,28 @@
 
     private void incrementCount(int slot, long value) {
         counters.get(slot).addAndGet(value);
+        totalCount.addAndGet(value);
+    }
+
+    /**
+     * Gets the total count for the last N window slots.
+     *
+     * @param slots number of slots to include in the count
+     * @return total count for last N slots
+     * @deprecated since 1.12
+     */
+    @Deprecated
+    public long get(int slots) {
+        return getWindowCount(slots);
+    }
+
+    /**
+     * Gets the total count for all slots.
+     *
+     * @return total count for all slots
+     */
+    public long getWindowCount() {
+        return getWindowCount(windowSlots);
     }
 
     /**
@@ -101,12 +123,13 @@
      * @param slots number of slots to include in the count
      * @return total count for last N slots
      */
-    public long get(int slots) {
+    public long getWindowCount(int slots) {
         checkArgument(slots <= windowSlots,
                       "Requested window must be less than the total window slots");
 
         long sum = 0;
 
+        slots = min(slots, counters.size());
         for (int i = 0; i < slots; i++) {
             int currentIndex = headSlot - i;
             if (currentIndex < 0) {
@@ -118,9 +141,62 @@
         return sum;
     }
 
+    /**
+     * Returns the average rate over the window.
+     *
+     * @return the average rate over the window
+     */
+    public double getWindowRate() {
+        return getWindowRate(windowSlots);
+    }
+
+    /**
+     * Returns the average rate over the given window.
+     *
+     * @param slots the number of slots to include in the window
+     * @return the average rate over the given window
+     */
+    public double getWindowRate(int slots) {
+        return getWindowCount(slots) / (double) min(slots, counters.size());
+    }
+
+    /**
+     * Returns the overall number of increments.
+     *
+     * @return the overall number of increments
+     */
+    public long getOverallCount() {
+        return totalCount.get();
+    }
+
+    /**
+     * Returns the overall rate.
+     *
+     * @return the overall rate
+     */
+    public double getOverallRate() {
+        return totalCount.get() / (double) totalSlots.get();
+    }
+
+    /**
+     * Clears the counter.
+     */
+    public void clear() {
+        counters.clear();
+        counters.add(new AtomicLong());
+        totalCount.set(0);
+        totalSlots.set(1);
+        headSlot = 0;
+    }
+
     void advanceHead() {
-        counters.get(slotAfter(headSlot)).set(0);
-        headSlot = slotAfter(headSlot);
+        if (counters.size() - 1 < slotAfter(headSlot)) {
+            counters.add(0, new AtomicLong(0));
+        } else {
+            counters.get(slotAfter(headSlot)).set(0);
+            headSlot = slotAfter(headSlot);
+        }
+        totalSlots.incrementAndGet();
     }
 
     private int slotAfter(int slot) {