Updated accumulator documentation and refactored names to remove the event heritage.
Change-Id: I2238ab1215281702e670a406fb901ba8a4ef85ce
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index f417562..00a5a2c 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -27,64 +27,63 @@
import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Base implementation of an event accumulator. It allows triggering based on
- * event inter-arrival time threshold, maximum batch life threshold and maximum
+ * Base implementation of an item accumulator. It allows triggering based on
+ * item inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
-// FIXME refactor the names here
public abstract class AbstractAccumulator<T> implements Accumulator<T> {
private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
private final Timer timer;
- private final int maxEvents;
+ private final int maxItems;
private final int maxBatchMillis;
private final int maxIdleMillis;
private TimerTask idleTask = new ProcessorTask();
private TimerTask maxTask = new ProcessorTask();
- private List<T> events = Lists.newArrayList();
+ private List<T> items = Lists.newArrayList();
/**
- * Creates an event accumulator capable of triggering on the specified
+ * Creates an item accumulator capable of triggering on the specified
* thresholds.
*
- * @param timer timer to use for scheduling check-points
- * @param maxEvents maximum number of events to accumulate before
- * processing is triggered
- * @param maxBatchMillis maximum number of millis allowed since the first
- * event before processing is triggered
- * @param maxIdleMillis maximum number millis between events before
- * processing is triggered
+ * @param timer timer to use for scheduling check-points
+ * @param maxItems maximum number of items to accumulate before
+ * processing is triggered
+ * @param maxBatchMillis maximum number of millis allowed since the first
+ * item before processing is triggered
+ * @param maxIdleMillis maximum number millis between items before
+ * processing is triggered
*/
- protected AbstractAccumulator(Timer timer, int maxEvents,
+ protected AbstractAccumulator(Timer timer, int maxItems,
int maxBatchMillis, int maxIdleMillis) {
this.timer = checkNotNull(timer, "Timer cannot be null");
- checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
+ checkArgument(maxItems > 1, "Maximum number of items must be > 1");
checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
- this.maxEvents = maxEvents;
+ this.maxItems = maxItems;
this.maxBatchMillis = maxBatchMillis;
this.maxIdleMillis = maxIdleMillis;
}
@Override
- public synchronized void add(T event) {
+ public synchronized void add(T item) {
idleTask = cancelIfActive(idleTask);
- events.add(checkNotNull(event, "Event cannot be null"));
+ items.add(checkNotNull(item, "Item cannot be null"));
- // Did we hit the max event threshold?
- if (events.size() == maxEvents) {
+ // Did we hit the max item threshold?
+ if (items.size() == maxItems) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
- // Otherwise, schedule idle task and if this is a first event
+ // Otherwise, schedule idle task and if this is a first item
// also schedule the max batch age task.
idleTask = schedule(maxIdleMillis);
- if (events.size() == 1) {
+ if (items.size() == 1) {
maxTask = schedule(maxBatchMillis);
}
}
@@ -105,24 +104,24 @@
return task;
}
- // Task for triggering processing of accumulated events
+ // Task for triggering processing of accumulated items
private class ProcessorTask extends TimerTask {
@Override
public void run() {
try {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
- processEvents(finalizeCurrentBatch());
+ processItems(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e);
}
}
}
- // Demotes and returns the current batch of events and promotes a new one.
+ // Demotes and returns the current batch of items and promotes a new one.
private synchronized List<T> finalizeCurrentBatch() {
- List<T> toBeProcessed = events;
- events = Lists.newArrayList();
+ List<T> toBeProcessed = items;
+ items = Lists.newArrayList();
return toBeProcessed;
}
@@ -136,18 +135,18 @@
}
/**
- * Returns the maximum number of events allowed to accumulate before
+ * Returns the maximum number of items allowed to accumulate before
* processing is triggered.
*
- * @return max number of events
+ * @return max number of items
*/
- public int maxEvents() {
- return maxEvents;
+ public int maxItems() {
+ return maxItems;
}
/**
* Returns the maximum number of millis allowed to expire since the first
- * event before processing is triggered.
+ * item before processing is triggered.
*
* @return max number of millis a batch is allowed to last
*/
@@ -157,9 +156,9 @@
/**
* Returns the maximum number of millis allowed to expire since the last
- * event arrival before processing is triggered.
+ * item arrival before processing is triggered.
*
- * @return max number of millis since the last event
+ * @return max number of millis since the last item
*/
public int maxIdleMillis() {
return maxIdleMillis;
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index 897bddf..568e38c 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -18,25 +18,28 @@
import java.util.List;
/**
- * Abstraction of an accumulator capable of collecting events and at some
- * point in time triggers processing of all previously accumulated events.
+ * Abstraction of an accumulator capable of collecting items and at some
+ * point in time triggers processing of all previously accumulated items.
+ *
+ * @param <T> item type
*/
public interface Accumulator<T> {
/**
- * Adds an event to the current batch. This operation may, or may not
- * trigger processing of the current batch of events.
+ * Adds an item to the current batch. This operation may, or may not
+ * trigger processing of the current batch of items.
*
- * @param event event to be added to the current batch
+ * @param item item to be added to the current batch
*/
- void add(T event);
+ void add(T item);
/**
- * Processes the specified list of accumulated events.
+ * Processes the specified list of accumulated items.
*
- * @param events list of accumulated events
+ * @param items list of accumulated items
*/
- void processEvents(List<T> events);
+ void processItems(List<T> items);
//TODO consider a blocking version that required consumer participation
+
}