Modify OpenFlow provider to avoid stats collection when switch is under high load
* Track load via sliding window
* Pause stats collection when load is significantly above average
* Resume stats collection when load returns to normal
* Pause stats collection when reply is in-flight
Change-Id: I3159b4f806a6405ca6be494534497348716cc921
diff --git a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
index 33dfec1..cc4f4fe 100644
--- a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
+++ b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
@@ -16,14 +16,13 @@
package org.onlab.util;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.Math.min;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
@@ -43,6 +42,9 @@
private final ScheduledExecutorService background;
+ private final AtomicLong totalCount = new AtomicLong();
+ private final AtomicLong totalSlots = new AtomicLong(1);
+
private static final int SLIDE_WINDOW_PERIOD_SECONDS = 1;
/**
@@ -58,13 +60,11 @@
this.headSlot = 0;
// Initialize each item in the list to an AtomicLong of 0
- this.counters = Collections.nCopies(windowSlots, 0)
- .stream()
- .map(AtomicLong::new)
- .collect(Collectors.toCollection(ArrayList::new));
+ this.counters = new ArrayList<>();
+ this.counters.add(new AtomicLong());
background = newSingleThreadScheduledExecutor(groupedThreads("SlidingWindowCounter", "bg-%d"));
- background.scheduleWithFixedDelay(this::advanceHead, 0,
+ background.scheduleWithFixedDelay(this::advanceHead, 1,
SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS);
}
@@ -93,6 +93,28 @@
private void incrementCount(int slot, long value) {
counters.get(slot).addAndGet(value);
+ totalCount.addAndGet(value);
+ }
+
+ /**
+ * Gets the total count for the last N window slots.
+ *
+ * @param slots number of slots to include in the count
+ * @return total count for last N slots
+ * @deprecated since 1.12
+ */
+ @Deprecated
+ public long get(int slots) {
+ return getWindowCount(slots);
+ }
+
+ /**
+ * Gets the total count for all slots.
+ *
+ * @return total count for all slots
+ */
+ public long getWindowCount() {
+ return getWindowCount(windowSlots);
}
/**
@@ -101,12 +123,13 @@
* @param slots number of slots to include in the count
* @return total count for last N slots
*/
- public long get(int slots) {
+ public long getWindowCount(int slots) {
checkArgument(slots <= windowSlots,
"Requested window must be less than the total window slots");
long sum = 0;
+ slots = min(slots, counters.size());
for (int i = 0; i < slots; i++) {
int currentIndex = headSlot - i;
if (currentIndex < 0) {
@@ -118,9 +141,62 @@
return sum;
}
+ /**
+ * Returns the average rate over the window.
+ *
+ * @return the average rate over the window
+ */
+ public double getWindowRate() {
+ return getWindowRate(windowSlots);
+ }
+
+ /**
+ * Returns the average rate over the given window.
+ *
+ * @param slots the number of slots to include in the window
+ * @return the average rate over the given window
+ */
+ public double getWindowRate(int slots) {
+ return getWindowCount(slots) / (double) min(slots, counters.size());
+ }
+
+ /**
+ * Returns the overall number of increments.
+ *
+ * @return the overall number of increments
+ */
+ public long getOverallCount() {
+ return totalCount.get();
+ }
+
+ /**
+ * Returns the overall rate.
+ *
+ * @return the overall rate
+ */
+ public double getOverallRate() {
+ return totalCount.get() / (double) totalSlots.get();
+ }
+
+ /**
+ * Clears the counter.
+ */
+ public void clear() {
+ counters.clear();
+ counters.add(new AtomicLong());
+ totalCount.set(0);
+ totalSlots.set(1);
+ headSlot = 0;
+ }
+
void advanceHead() {
- counters.get(slotAfter(headSlot)).set(0);
- headSlot = slotAfter(headSlot);
+ if (counters.size() - 1 < slotAfter(headSlot)) {
+ counters.add(0, new AtomicLong(0));
+ } else {
+ counters.get(slotAfter(headSlot)).set(0);
+ headSlot = slotAfter(headSlot);
+ }
+ totalSlots.incrementAndGet();
}
private int slotAfter(int slot) {