ONOS-2572 Fix to abstract accumulator to proactively finalize the batches when full and to avoid repeat misfires.

Change-Id: Ibc9904b36f9cf8c9aed36e828152600a2d7a6192
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index e9e3cc8..3aa4509 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -49,13 +49,13 @@
      * Creates an item accumulator capable of triggering on the specified
      * thresholds.
      *
-     * @param timer               timer to use for scheduling check-points
-     * @param maxItems            maximum number of items to accumulate before
-     *                            processing is triggered
-     * @param maxBatchMillis      maximum number of millis allowed since the first
-     *                            item before processing is triggered
-     * @param maxIdleMillis       maximum number millis between items before
-     *                            processing is triggered
+     * @param timer          timer to use for scheduling check-points
+     * @param maxItems       maximum number of items to accumulate before
+     *                       processing is triggered
+     * @param maxBatchMillis maximum number of millis allowed since the first
+     *                       item before processing is triggered
+     * @param maxIdleMillis  maximum number millis between items before
+     *                       processing is triggered
      */
     protected AbstractAccumulator(Timer timer, int maxItems,
                                   int maxBatchMillis, int maxIdleMillis) {
@@ -78,7 +78,7 @@
         // Did we hit the max item threshold?
         if (items.size() >= maxItems) {
             maxTask = cancelIfActive(maxTask);
-            schedule(1);
+            scheduleNow();
         } else {
             // Otherwise, schedule idle task and if this is a first item
             // also schedule the max batch age task.
@@ -89,14 +89,30 @@
         }
     }
 
-    // Schedules a new processor task given number of millis in the future.
+    /**
+     * Finalizes the current batch, if ready, and schedules a new processor
+     * in the immediate future.
+     */
+    private void scheduleNow() {
+        if (isReady()) {
+            TimerTask task = new ProcessorTask(finalizeCurrentBatch());
+            timer.schedule(task, 1);
+        }
+    }
+
+    /**
+     * Schedules a new processor task given number of millis in the future.
+     * Batch finalization is deferred to time of execution.
+     */
     private TimerTask schedule(int millis) {
         TimerTask task = new ProcessorTask();
         timer.schedule(task, millis);
         return task;
     }
 
-    // Cancels the specified task if it is active.
+    /**
+     * Cancels the specified task if it is active.
+     */
     private TimerTask cancelIfActive(TimerTask task) {
         if (task != null) {
             task.cancel();
@@ -106,6 +122,19 @@
 
     // Task for triggering processing of accumulated items
     private class ProcessorTask extends TimerTask {
+
+        private final List<T> items;
+
+        // Creates a new processor task with deferred batch finalization.
+        ProcessorTask() {
+            this.items = null;
+        }
+
+        // Creates a new processor task with pre-emptive batch finalization.
+        ProcessorTask(List<T> items) {
+            this.items = items;
+        }
+
         @Override
         public void run() {
             synchronized (AbstractAccumulator.this) {
@@ -116,9 +145,9 @@
                     synchronized (AbstractAccumulator.this) {
                         maxTask = cancelIfActive(maxTask);
                     }
-                    List<T> items = finalizeCurrentBatch();
-                    if (!items.isEmpty()) {
-                        processItems(items);
+                    List<T> batch = items != null ? items : finalizeCurrentBatch();
+                    if (!batch.isEmpty()) {
+                        processItems(batch);
                     }
                 } catch (Exception e) {
                     log.warn("Unable to process batch due to {}", e);
diff --git a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
index 421fa7a..02f0deb 100644
--- a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -20,8 +20,10 @@
 
 import java.util.List;
 import java.util.Timer;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.*;
+import static org.onlab.junit.TestTools.assertAfter;
 import static org.onlab.junit.TestTools.delay;
 
 /**
@@ -136,6 +138,14 @@
         assertEquals("incorrect batch", "abcdefg", accumulator.batch);
     }
 
+    @Ignore("FIXME: timing sensitive test failing randomly.")
+    @Test
+    public void stormTest() {
+        TestAccumulator accumulator = new TestAccumulator();
+        IntStream.range(0, 1000).forEach(i -> accumulator.add(new TestItem("#" + i)));
+        assertAfter(100, () -> assertEquals("wrong item count", 1000, accumulator.itemCount));
+        assertEquals("wrong batch count", 200, accumulator.batchCount);
+    }
 
     private class TestItem {
         private final String s;
@@ -149,6 +159,8 @@
 
         String batch = "";
         boolean ready = true;
+        int batchCount = 0;
+        int itemCount = 0;
 
         protected TestAccumulator() {
             super(timer, 5, 100, 70);
@@ -156,6 +168,8 @@
 
         @Override
         public void processItems(List<TestItem> items) {
+            batchCount++;
+            itemCount += items.size();
             for (TestItem item : items) {
                 batch += item.s;
             }