ONOS-2572 Fix to abstract accumulator to proactively finalize the batches when full and to avoid repeat misfires.
Change-Id: Ibc9904b36f9cf8c9aed36e828152600a2d7a6192
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index e9e3cc8..3aa4509 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -49,13 +49,13 @@
* Creates an item accumulator capable of triggering on the specified
* thresholds.
*
- * @param timer timer to use for scheduling check-points
- * @param maxItems maximum number of items to accumulate before
- * processing is triggered
- * @param maxBatchMillis maximum number of millis allowed since the first
- * item before processing is triggered
- * @param maxIdleMillis maximum number millis between items before
- * processing is triggered
+ * @param timer timer to use for scheduling check-points
+ * @param maxItems maximum number of items to accumulate before
+ * processing is triggered
+ * @param maxBatchMillis maximum number of millis allowed since the first
+ * item before processing is triggered
+ * @param maxIdleMillis maximum number millis between items before
+ * processing is triggered
*/
protected AbstractAccumulator(Timer timer, int maxItems,
int maxBatchMillis, int maxIdleMillis) {
@@ -78,7 +78,7 @@
// Did we hit the max item threshold?
if (items.size() >= maxItems) {
maxTask = cancelIfActive(maxTask);
- schedule(1);
+ scheduleNow();
} else {
// Otherwise, schedule idle task and if this is a first item
// also schedule the max batch age task.
@@ -89,14 +89,30 @@
}
}
- // Schedules a new processor task given number of millis in the future.
+ /**
+ * Finalizes the current batch, if ready, and schedules a new processor
+ * in the immediate future.
+ */
+ private void scheduleNow() {
+ if (isReady()) {
+ TimerTask task = new ProcessorTask(finalizeCurrentBatch());
+ timer.schedule(task, 1);
+ }
+ }
+
+ /**
+ * Schedules a new processor task given number of millis in the future.
+ * Batch finalization is deferred to time of execution.
+ */
private TimerTask schedule(int millis) {
TimerTask task = new ProcessorTask();
timer.schedule(task, millis);
return task;
}
- // Cancels the specified task if it is active.
+ /**
+ * Cancels the specified task if it is active.
+ */
private TimerTask cancelIfActive(TimerTask task) {
if (task != null) {
task.cancel();
@@ -106,6 +122,19 @@
// Task for triggering processing of accumulated items
private class ProcessorTask extends TimerTask {
+
+ private final List<T> items;
+
+ // Creates a new processor task with deferred batch finalization.
+ ProcessorTask() {
+ this.items = null;
+ }
+
+ // Creates a new processor task with pre-emptive batch finalization.
+ ProcessorTask(List<T> items) {
+ this.items = items;
+ }
+
@Override
public void run() {
synchronized (AbstractAccumulator.this) {
@@ -116,9 +145,9 @@
synchronized (AbstractAccumulator.this) {
maxTask = cancelIfActive(maxTask);
}
- List<T> items = finalizeCurrentBatch();
- if (!items.isEmpty()) {
- processItems(items);
+ List<T> batch = items != null ? items : finalizeCurrentBatch();
+ if (!batch.isEmpty()) {
+ processItems(batch);
}
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e);
diff --git a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
index 421fa7a..02f0deb 100644
--- a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -20,8 +20,10 @@
import java.util.List;
import java.util.Timer;
+import java.util.stream.IntStream;
import static org.junit.Assert.*;
+import static org.onlab.junit.TestTools.assertAfter;
import static org.onlab.junit.TestTools.delay;
/**
@@ -136,6 +138,14 @@
assertEquals("incorrect batch", "abcdefg", accumulator.batch);
}
+ @Ignore("FIXME: timing sensitive test failing randomly.")
+ @Test
+ public void stormTest() {
+ TestAccumulator accumulator = new TestAccumulator();
+ IntStream.range(0, 1000).forEach(i -> accumulator.add(new TestItem("#" + i)));
+ assertAfter(100, () -> assertEquals("wrong item count", 1000, accumulator.itemCount));
+ assertEquals("wrong batch count", 200, accumulator.batchCount);
+ }
private class TestItem {
private final String s;
@@ -149,6 +159,8 @@
String batch = "";
boolean ready = true;
+ int batchCount = 0;
+ int itemCount = 0;
protected TestAccumulator() {
super(timer, 5, 100, 70);
@@ -156,6 +168,8 @@
@Override
public void processItems(List<TestItem> items) {
+ batchCount++;
+ itemCount += items.size();
for (TestItem item : items) {
batch += item.s;
}