Updated accumulator documentation and refactored names to remove the event heritage.
Change-Id: I2238ab1215281702e670a406fb901ba8a4ef85ce
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index f417562..00a5a2c 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -27,64 +27,63 @@
import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Base implementation of an event accumulator. It allows triggering based on
- * event inter-arrival time threshold, maximum batch life threshold and maximum
+ * Base implementation of an item accumulator. It allows triggering based on
+ * item inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
-// FIXME refactor the names here
public abstract class AbstractAccumulator<T> implements Accumulator<T> {
private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
private final Timer timer;
- private final int maxEvents;
+ private final int maxItems;
private final int maxBatchMillis;
private final int maxIdleMillis;
private TimerTask idleTask = new ProcessorTask();
private TimerTask maxTask = new ProcessorTask();
- private List<T> events = Lists.newArrayList();
+ private List<T> items = Lists.newArrayList();
/**
- * Creates an event accumulator capable of triggering on the specified
+ * Creates an item accumulator capable of triggering on the specified
* thresholds.
*
- * @param timer timer to use for scheduling check-points
- * @param maxEvents maximum number of events to accumulate before
- * processing is triggered
- * @param maxBatchMillis maximum number of millis allowed since the first
- * event before processing is triggered
- * @param maxIdleMillis maximum number millis between events before
- * processing is triggered
+ * @param timer timer to use for scheduling check-points
+ * @param maxItems maximum number of items to accumulate before
+ * processing is triggered
+ * @param maxBatchMillis maximum number of millis allowed since the first
+ * item before processing is triggered
+ * @param maxIdleMillis maximum number millis between items before
+ * processing is triggered
*/
- protected AbstractAccumulator(Timer timer, int maxEvents,
+ protected AbstractAccumulator(Timer timer, int maxItems,
int maxBatchMillis, int maxIdleMillis) {
this.timer = checkNotNull(timer, "Timer cannot be null");
- checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
+ checkArgument(maxItems > 1, "Maximum number of items must be > 1");
checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
- this.maxEvents = maxEvents;
+ this.maxItems = maxItems;
this.maxBatchMillis = maxBatchMillis;
this.maxIdleMillis = maxIdleMillis;
}
@Override
- public synchronized void add(T event) {
+ public synchronized void add(T item) {
idleTask = cancelIfActive(idleTask);
- events.add(checkNotNull(event, "Event cannot be null"));
+ items.add(checkNotNull(item, "Item cannot be null"));
- // Did we hit the max event threshold?
- if (events.size() == maxEvents) {
+ // Did we hit the max item threshold?
+ if (items.size() == maxItems) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
- // Otherwise, schedule idle task and if this is a first event
+ // Otherwise, schedule idle task and if this is a first item
// also schedule the max batch age task.
idleTask = schedule(maxIdleMillis);
- if (events.size() == 1) {
+ if (items.size() == 1) {
maxTask = schedule(maxBatchMillis);
}
}
@@ -105,24 +104,24 @@
return task;
}
- // Task for triggering processing of accumulated events
+ // Task for triggering processing of accumulated items
private class ProcessorTask extends TimerTask {
@Override
public void run() {
try {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
- processEvents(finalizeCurrentBatch());
+ processItems(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e);
}
}
}
- // Demotes and returns the current batch of events and promotes a new one.
+ // Demotes and returns the current batch of items and promotes a new one.
private synchronized List<T> finalizeCurrentBatch() {
- List<T> toBeProcessed = events;
- events = Lists.newArrayList();
+ List<T> toBeProcessed = items;
+ items = Lists.newArrayList();
return toBeProcessed;
}
@@ -136,18 +135,18 @@
}
/**
- * Returns the maximum number of events allowed to accumulate before
+ * Returns the maximum number of items allowed to accumulate before
* processing is triggered.
*
- * @return max number of events
+ * @return max number of items
*/
- public int maxEvents() {
- return maxEvents;
+ public int maxItems() {
+ return maxItems;
}
/**
* Returns the maximum number of millis allowed to expire since the first
- * event before processing is triggered.
+ * item before processing is triggered.
*
* @return max number of millis a batch is allowed to last
*/
@@ -157,9 +156,9 @@
/**
* Returns the maximum number of millis allowed to expire since the last
- * event arrival before processing is triggered.
+ * item arrival before processing is triggered.
*
- * @return max number of millis since the last event
+ * @return max number of millis since the last item
*/
public int maxIdleMillis() {
return maxIdleMillis;
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index 897bddf..568e38c 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -18,25 +18,28 @@
import java.util.List;
/**
- * Abstraction of an accumulator capable of collecting events and at some
- * point in time triggers processing of all previously accumulated events.
+ * Abstraction of an accumulator capable of collecting items and at some
+ * point in time triggers processing of all previously accumulated items.
+ *
+ * @param <T> item type
*/
public interface Accumulator<T> {
/**
- * Adds an event to the current batch. This operation may, or may not
- * trigger processing of the current batch of events.
+ * Adds an item to the current batch. This operation may, or may not
+ * trigger processing of the current batch of items.
*
- * @param event event to be added to the current batch
+ * @param item item to be added to the current batch
*/
- void add(T event);
+ void add(T item);
/**
- * Processes the specified list of accumulated events.
+ * Processes the specified list of accumulated items.
*
- * @param events list of accumulated events
+ * @param items list of accumulated items
*/
- void processEvents(List<T> events);
+ void processItems(List<T> items);
//TODO consider a blocking version that required consumer participation
+
}
diff --git a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
new file mode 100644
index 0000000..b2b0b1f
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Timer;
+
+import static org.junit.Assert.*;
+import static org.onlab.junit.TestTools.delay;
+
+/**
+ * Tests the operation of the accumulator.
+ */
+public class AbstractAccumulatorTest {
+
+ private final Timer timer = new Timer();
+
+ @Test
+ public void basics() throws Exception {
+ TestAccumulator accumulator = new TestAccumulator();
+ assertEquals("incorrect timer", timer, accumulator.timer());
+ assertEquals("incorrect max events", 5, accumulator.maxItems());
+ assertEquals("incorrect max ms", 100, accumulator.maxBatchMillis());
+ assertEquals("incorrect idle ms", 50, accumulator.maxIdleMillis());
+ }
+
+ @Test
+ public void eventTrigger() {
+ TestAccumulator accumulator = new TestAccumulator();
+ accumulator.add(new TestItem("a"));
+ accumulator.add(new TestItem("b"));
+ accumulator.add(new TestItem("c"));
+ accumulator.add(new TestItem("d"));
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.add(new TestItem("e"));
+ delay(20);
+ assertFalse("should have fired", accumulator.batch.isEmpty());
+ assertEquals("incorrect batch", "abcde", accumulator.batch);
+ }
+
+ @Ignore("FIXME: timing sensitive test failing randomly.")
+ @Test
+ public void timeTrigger() {
+ TestAccumulator accumulator = new TestAccumulator();
+ accumulator.add(new TestItem("a"));
+ delay(30);
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.add(new TestItem("b"));
+ delay(30);
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.add(new TestItem("c"));
+ delay(30);
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.add(new TestItem("d"));
+ delay(30);
+ assertFalse("should have fired", accumulator.batch.isEmpty());
+ assertEquals("incorrect batch", "abcd", accumulator.batch);
+ }
+
+ @Test
+ public void idleTrigger() {
+ TestAccumulator accumulator = new TestAccumulator();
+ accumulator.add(new TestItem("a"));
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.add(new TestItem("b"));
+ delay(80);
+ assertFalse("should have fired", accumulator.batch.isEmpty());
+ assertEquals("incorrect batch", "ab", accumulator.batch);
+ }
+
+ private class TestItem {
+ private final String s;
+
+ public TestItem(String s) {
+ this.s = s;
+ }
+ }
+
+ private class TestAccumulator extends AbstractAccumulator<TestItem> {
+
+ String batch = "";
+
+ protected TestAccumulator() {
+ super(timer, 5, 100, 50);
+ }
+
+ @Override
+ public void processItems(List<TestItem> items) {
+ for (TestItem item : items) {
+ batch += item.s;
+ }
+ }
+ }
+
+}