Updated accumulator documentation and refactored names to remove the event heritage.

Change-Id: I2238ab1215281702e670a406fb901ba8a4ef85ce
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index f417562..00a5a2c 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -27,64 +27,63 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * Base implementation of an event accumulator. It allows triggering based on
- * event inter-arrival time threshold, maximum batch life threshold and maximum
+ * Base implementation of an item accumulator. It allows triggering based on
+ * item inter-arrival time threshold, maximum batch life threshold and maximum
  * batch size.
  */
-// FIXME refactor the names here
 public abstract class AbstractAccumulator<T> implements Accumulator<T> {
 
     private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
 
     private final Timer timer;
-    private final int maxEvents;
+    private final int maxItems;
     private final int maxBatchMillis;
     private final int maxIdleMillis;
 
     private TimerTask idleTask = new ProcessorTask();
     private TimerTask maxTask = new ProcessorTask();
 
-    private List<T> events = Lists.newArrayList();
+    private List<T> items = Lists.newArrayList();
 
     /**
-     * Creates an event accumulator capable of triggering on the specified
+     * Creates an item accumulator capable of triggering on the specified
      * thresholds.
      *
-     * @param timer          timer to use for scheduling check-points
-     * @param maxEvents      maximum number of events to accumulate before
-     *                       processing is triggered
-     * @param maxBatchMillis maximum number of millis allowed since the first
-     *                       event before processing is triggered
-     * @param maxIdleMillis  maximum number millis between events before
-     *                       processing is triggered
+     * @param timer               timer to use for scheduling check-points
+     * @param maxItems            maximum number of items to accumulate before
+     *                            processing is triggered
+     * @param maxBatchMillis      maximum number of millis allowed since the first
+     *                            item before processing is triggered
+     * @param maxIdleMillis       maximum number millis between items before
+     *                            processing is triggered
      */
-    protected AbstractAccumulator(Timer timer, int maxEvents,
+    protected AbstractAccumulator(Timer timer, int maxItems,
                                   int maxBatchMillis, int maxIdleMillis) {
         this.timer = checkNotNull(timer, "Timer cannot be null");
 
-        checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
+        checkArgument(maxItems > 1, "Maximum number of items must be > 1");
         checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
         checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
 
-        this.maxEvents = maxEvents;
+        this.maxItems = maxItems;
         this.maxBatchMillis = maxBatchMillis;
         this.maxIdleMillis = maxIdleMillis;
     }
 
     @Override
-    public synchronized void add(T event) {
+    public synchronized void add(T item) {
         idleTask = cancelIfActive(idleTask);
-        events.add(checkNotNull(event, "Event cannot be null"));
+        items.add(checkNotNull(item, "Item cannot be null"));
 
-        // Did we hit the max event threshold?
-        if (events.size() == maxEvents) {
+        // Did we hit the max item threshold?
+        if (items.size() == maxItems) {
             maxTask = cancelIfActive(maxTask);
             schedule(1);
         } else {
-            // Otherwise, schedule idle task and if this is a first event
+            // Otherwise, schedule idle task and if this is a first item
             // also schedule the max batch age task.
             idleTask = schedule(maxIdleMillis);
-            if (events.size() == 1) {
+            if (items.size() == 1) {
                 maxTask = schedule(maxBatchMillis);
             }
         }
@@ -105,24 +104,24 @@
         return task;
     }
 
-    // Task for triggering processing of accumulated events
+    // Task for triggering processing of accumulated items
     private class ProcessorTask extends TimerTask {
         @Override
         public void run() {
             try {
                 idleTask = cancelIfActive(idleTask);
                 maxTask = cancelIfActive(maxTask);
-                processEvents(finalizeCurrentBatch());
+                processItems(finalizeCurrentBatch());
             } catch (Exception e) {
                 log.warn("Unable to process batch due to {}", e);
             }
         }
     }
 
-    // Demotes and returns the current batch of events and promotes a new one.
+    // Demotes and returns the current batch of items and promotes a new one.
     private synchronized List<T> finalizeCurrentBatch() {
-        List<T> toBeProcessed = events;
-        events = Lists.newArrayList();
+        List<T> toBeProcessed = items;
+        items = Lists.newArrayList();
         return toBeProcessed;
     }
 
@@ -136,18 +135,18 @@
     }
 
     /**
-     * Returns the maximum number of events allowed to accumulate before
+     * Returns the maximum number of items allowed to accumulate before
      * processing is triggered.
      *
-     * @return max number of events
+     * @return max number of items
      */
-    public int maxEvents() {
-        return maxEvents;
+    public int maxItems() {
+        return maxItems;
     }
 
     /**
      * Returns the maximum number of millis allowed to expire since the first
-     * event before processing is triggered.
+     * item before processing is triggered.
      *
      * @return max number of millis a batch is allowed to last
      */
@@ -157,9 +156,9 @@
 
     /**
      * Returns the maximum number of millis allowed to expire since the last
-     * event arrival before processing is triggered.
+     * item arrival before processing is triggered.
      *
-     * @return max number of millis since the last event
+     * @return max number of millis since the last item
      */
     public int maxIdleMillis() {
         return maxIdleMillis;
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index 897bddf..568e38c 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -18,25 +18,28 @@
 import java.util.List;
 
 /**
- * Abstraction of an accumulator capable of collecting events and at some
- * point in time triggers processing of all previously accumulated events.
+ * Abstraction of an accumulator capable of collecting items and at some
+ * point in time triggers processing of all previously accumulated items.
+ *
+ * @param <T> item type
  */
 public interface Accumulator<T> {
 
     /**
-     * Adds an event to the current batch. This operation may, or may not
-     * trigger processing of the current batch of events.
+     * Adds an item to the current batch. This operation may, or may not
+     * trigger processing of the current batch of items.
      *
-     * @param event event to be added to the current batch
+     * @param item item to be added to the current batch
      */
-    void add(T event);
+    void add(T item);
 
     /**
-     * Processes the specified list of accumulated events.
+     * Processes the specified list of accumulated items.
      *
-     * @param events list of accumulated events
+     * @param items list of accumulated items
      */
-    void processEvents(List<T> events);
+    void processItems(List<T> items);
 
     //TODO consider a blocking version that required consumer participation
+
 }
diff --git a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
new file mode 100644
index 0000000..b2b0b1f
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Timer;
+
+import static org.junit.Assert.*;
+import static org.onlab.junit.TestTools.delay;
+
+/**
+ * Tests the operation of the accumulator.
+ */
+public class AbstractAccumulatorTest {
+
+    private final Timer timer = new Timer();
+
+    @Test
+    public void basics() throws Exception {
+        TestAccumulator accumulator = new TestAccumulator();
+        assertEquals("incorrect timer", timer, accumulator.timer());
+        assertEquals("incorrect max events", 5, accumulator.maxItems());
+        assertEquals("incorrect max ms", 100, accumulator.maxBatchMillis());
+        assertEquals("incorrect idle ms", 50, accumulator.maxIdleMillis());
+    }
+
+    @Test
+    public void eventTrigger() {
+        TestAccumulator accumulator = new TestAccumulator();
+        accumulator.add(new TestItem("a"));
+        accumulator.add(new TestItem("b"));
+        accumulator.add(new TestItem("c"));
+        accumulator.add(new TestItem("d"));
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.add(new TestItem("e"));
+        delay(20);
+        assertFalse("should have fired", accumulator.batch.isEmpty());
+        assertEquals("incorrect batch", "abcde", accumulator.batch);
+    }
+
+    @Ignore("FIXME: timing sensitive test failing randomly.")
+    @Test
+    public void timeTrigger() {
+        TestAccumulator accumulator = new TestAccumulator();
+        accumulator.add(new TestItem("a"));
+        delay(30);
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.add(new TestItem("b"));
+        delay(30);
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.add(new TestItem("c"));
+        delay(30);
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.add(new TestItem("d"));
+        delay(30);
+        assertFalse("should have fired", accumulator.batch.isEmpty());
+        assertEquals("incorrect batch", "abcd", accumulator.batch);
+    }
+
+    @Test
+    public void idleTrigger() {
+        TestAccumulator accumulator = new TestAccumulator();
+        accumulator.add(new TestItem("a"));
+        assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+        accumulator.add(new TestItem("b"));
+        delay(80);
+        assertFalse("should have fired", accumulator.batch.isEmpty());
+        assertEquals("incorrect batch", "ab", accumulator.batch);
+    }
+
+    private class TestItem {
+        private final String s;
+
+        public TestItem(String s) {
+            this.s = s;
+        }
+    }
+
+    private class TestAccumulator extends AbstractAccumulator<TestItem> {
+
+        String batch = "";
+
+        protected TestAccumulator() {
+            super(timer, 5, 100, 50);
+        }
+
+        @Override
+        public void processItems(List<TestItem> items) {
+            for (TestItem item : items) {
+                batch += item.s;
+            }
+        }
+    }
+
+}