Pre-allocate sliding window counter slots to avoid thread safety issues.
Change-Id: I331457ee36416d11e52a2e8092662ade3f2e8575
(cherry picked from commit d884d8acf99c10b5c5885a3e8328187ae2550106)
diff --git a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
index f3b10ec..e1b0557 100644
--- a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
+++ b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java
@@ -15,11 +15,14 @@
*/
package org.onlab.util;
+import com.google.common.collect.ImmutableList;
+
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
@@ -60,8 +63,9 @@
this.headSlot = 0;
// Initialize each item in the list to an AtomicLong of 0
- this.counters = new CopyOnWriteArrayList<>();
- this.counters.add(new AtomicLong());
+ this.counters = ImmutableList.copyOf(IntStream.range(0, windowSlots)
+ .mapToObj(i -> new AtomicLong())
+ .collect(Collectors.toList()));
background = newSingleThreadScheduledExecutor(groupedThreads("SlidingWindowCounter", "bg-%d"));
background.scheduleWithFixedDelay(this::advanceHead, 1,
@@ -91,6 +95,12 @@
incrementCount(headSlot, value);
}
+ /**
+ * Increments the count of the given window slot by the given value.
+ *
+ * @param slot the slot to increment
+ * @param value the value by which to increment the slot
+ */
private void incrementCount(int slot, long value) {
counters.get(slot).addAndGet(value);
totalCount.addAndGet(value);
@@ -129,7 +139,7 @@
long sum = 0;
- slots = min(slots, counters.size());
+ slots = getMinSlots(slots);
for (int i = 0; i < slots; i++) {
int currentIndex = headSlot - i;
if (currentIndex < 0) {
@@ -157,7 +167,14 @@
* @return the average rate over the given window
*/
public double getWindowRate(int slots) {
- return getWindowCount(slots) / (double) min(slots, counters.size());
+ // Compute the minimum slots to before computing the window count to ensure
+ // the window count and number of slots are for the same window.
+ slots = getMinSlots(slots);
+ return getWindowCount(slots) / (double) slots;
+ }
+
+ private int getMinSlots(int slots) {
+ return min(slots, (int) min(totalSlots.get(), Integer.MAX_VALUE));
}
/**
@@ -182,19 +199,14 @@
* Clears the counter.
*/
public void clear() {
- counters.clear();
- counters.add(new AtomicLong());
+ counters.forEach(value -> value.set(0));
totalCount.set(0);
totalSlots.set(1);
headSlot = 0;
}
void advanceHead() {
- if (counters.size() < windowSlots) {
- counters.add(new AtomicLong(0));
- } else {
- counters.get(slotAfter(headSlot)).set(0);
- }
+ counters.get(slotAfter(headSlot)).set(0);
headSlot = slotAfter(headSlot);
totalSlots.incrementAndGet();
}