Enhancing accumulator to allow subclasses to indicate whether they are ready for the batch to be processed. Default behaviour returns true.

Change-Id: I53a3ffc3ecd75ed2607f155a61971e05a6009a66
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index 00a5a2c..a7b50cb 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -76,7 +76,7 @@
         items.add(checkNotNull(item, "Item cannot be null"));
 
         // Did we hit the max item threshold?
-        if (items.size() == maxItems) {
+        if (items.size() >= maxItems) {
             maxTask = cancelIfActive(maxTask);
             schedule(1);
         } else {
@@ -108,12 +108,16 @@
     private class ProcessorTask extends TimerTask {
         @Override
         public void run() {
-            try {
-                idleTask = cancelIfActive(idleTask);
-                maxTask = cancelIfActive(maxTask);
-                processItems(finalizeCurrentBatch());
-            } catch (Exception e) {
-                log.warn("Unable to process batch due to {}", e);
+            idleTask = cancelIfActive(idleTask);
+            if (isReady()) {
+                try {
+                    maxTask = cancelIfActive(maxTask);
+                    processItems(finalizeCurrentBatch());
+                } catch (Exception e) {
+                    log.warn("Unable to process batch due to {}", e);
+                }
+            } else {
+                idleTask = schedule(maxIdleMillis);
             }
         }
     }
@@ -125,6 +129,11 @@
         return toBeProcessed;
     }
 
+    @Override
+    public boolean isReady() {
+        return true;
+    }
+
     /**
      * Returns the backing timer.
      *
@@ -163,4 +172,5 @@
     public int maxIdleMillis() {
         return maxIdleMillis;
     }
+
 }
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index 568e38c..20b7a48 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -40,6 +40,10 @@
      */
     void processItems(List<T> items);
 
-    //TODO consider a blocking version that required consumer participation
-
+    /**
+     * Indicates whether the accumulator is ready to process items.
+     *
+     * @return true if ready to process
+     */
+    boolean isReady();
 }