Enhancing accumulator to allow subclasses to indicate whether they are ready for the batch to be processed. Default behaviour returns true.

Change-Id: I53a3ffc3ecd75ed2607f155a61971e05a6009a66
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index 00a5a2c..a7b50cb 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -76,7 +76,7 @@
         items.add(checkNotNull(item, "Item cannot be null"));
 
         // Did we hit the max item threshold?
-        if (items.size() == maxItems) {
+        if (items.size() >= maxItems) {
             maxTask = cancelIfActive(maxTask);
             schedule(1);
         } else {
@@ -108,12 +108,16 @@
     private class ProcessorTask extends TimerTask {
         @Override
         public void run() {
-            try {
-                idleTask = cancelIfActive(idleTask);
-                maxTask = cancelIfActive(maxTask);
-                processItems(finalizeCurrentBatch());
-            } catch (Exception e) {
-                log.warn("Unable to process batch due to {}", e);
+            idleTask = cancelIfActive(idleTask);
+            if (isReady()) {
+                try {
+                    maxTask = cancelIfActive(maxTask);
+                    processItems(finalizeCurrentBatch());
+                } catch (Exception e) {
+                    log.warn("Unable to process batch due to {}", e);
+                }
+            } else {
+                idleTask = schedule(maxIdleMillis);
             }
         }
     }
@@ -125,6 +129,11 @@
         return toBeProcessed;
     }
 
+    @Override
+    public boolean isReady() {
+        return true;
+    }
+
     /**
      * Returns the backing timer.
      *
@@ -163,4 +172,5 @@
     public int maxIdleMillis() {
         return maxIdleMillis;
     }
+
 }
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index 568e38c..20b7a48 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -40,6 +40,10 @@
      */
     void processItems(List<T> items);
 
-    //TODO consider a blocking version that required consumer participation
-
+    /**
+     * Indicates whether the accumulator is ready to process items.
+     *
+     * @return true if ready to process
+     */
+    boolean isReady();
 }
diff --git a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
index b2b0b1f..e3d87e2 100644
--- a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -37,7 +37,7 @@
         assertEquals("incorrect timer", timer, accumulator.timer());
         assertEquals("incorrect max events", 5, accumulator.maxItems());
         assertEquals("incorrect max ms", 100, accumulator.maxBatchMillis());
-        assertEquals("incorrect idle ms", 50, accumulator.maxIdleMillis());
+        assertEquals("incorrect idle ms", 70, accumulator.maxIdleMillis());
     }
 
     @Test
@@ -68,7 +68,7 @@
         delay(30);
         assertTrue("should not have fired yet", accumulator.batch.isEmpty());
         accumulator.add(new TestItem("d"));
-        delay(30);
+        delay(60);
         assertFalse("should have fired", accumulator.batch.isEmpty());
         assertEquals("incorrect batch", "abcd", accumulator.batch);
     }
@@ -84,6 +84,54 @@
         assertEquals("incorrect batch", "ab", accumulator.batch);
     }
 
+    @Test
+    public void readyIdleTrigger() {
+        TestAccumulator accumulator = new TestAccumulator();
+        accumulator.ready = false;
+        accumulator.add(new TestItem("a"));
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.add(new TestItem("b"));
+        delay(80);
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.ready = true;
+        delay(80);
+        assertFalse("should have fired", accumulator.batch.isEmpty());
+        assertEquals("incorrect batch", "ab", accumulator.batch);
+    }
+
+    @Test
+    public void readyLongTrigger() {
+        TestAccumulator accumulator = new TestAccumulator();
+        accumulator.ready = false;
+        delay(120);
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.add(new TestItem("a"));
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.ready = true;
+        delay(80);
+        assertFalse("should have fired", accumulator.batch.isEmpty());
+        assertEquals("incorrect batch", "a", accumulator.batch);
+    }
+
+    @Test
+    public void readyMaxTrigger() {
+        TestAccumulator accumulator = new TestAccumulator();
+        accumulator.ready = false;
+        accumulator.add(new TestItem("a"));
+        accumulator.add(new TestItem("b"));
+        accumulator.add(new TestItem("c"));
+        accumulator.add(new TestItem("d"));
+        accumulator.add(new TestItem("e"));
+        accumulator.add(new TestItem("f"));
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.ready = true;
+        accumulator.add(new TestItem("g"));
+        delay(5);
+        assertFalse("should have fired", accumulator.batch.isEmpty());
+        assertEquals("incorrect batch", "abcdefg", accumulator.batch);
+    }
+
+
     private class TestItem {
         private final String s;
 
@@ -95,9 +143,10 @@
     private class TestAccumulator extends AbstractAccumulator<TestItem> {
 
         String batch = "";
+        boolean ready = true;
 
         protected TestAccumulator() {
-            super(timer, 5, 100, 50);
+            super(timer, 5, 100, 70);
         }
 
         @Override
@@ -106,6 +155,11 @@
                 batch += item.s;
             }
         }
+
+        @Override
+        public boolean isReady() {
+            return ready;
+        }
     }
 
 }