Enhancing accumulator to allow subclasses to indicate whether they are ready for the batch to be processed. Default behaviour returns true.
Change-Id: I53a3ffc3ecd75ed2607f155a61971e05a6009a66
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index 00a5a2c..a7b50cb 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -76,7 +76,7 @@
items.add(checkNotNull(item, "Item cannot be null"));
// Did we hit the max item threshold?
- if (items.size() == maxItems) {
+ if (items.size() >= maxItems) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
@@ -108,12 +108,16 @@
private class ProcessorTask extends TimerTask {
@Override
public void run() {
- try {
- idleTask = cancelIfActive(idleTask);
- maxTask = cancelIfActive(maxTask);
- processItems(finalizeCurrentBatch());
- } catch (Exception e) {
- log.warn("Unable to process batch due to {}", e);
+ idleTask = cancelIfActive(idleTask);
+ if (isReady()) {
+ try {
+ maxTask = cancelIfActive(maxTask);
+ processItems(finalizeCurrentBatch());
+ } catch (Exception e) {
+ log.warn("Unable to process batch due to {}", e);
+ }
+ } else {
+ idleTask = schedule(maxIdleMillis);
}
}
}
@@ -125,6 +129,11 @@
return toBeProcessed;
}
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
/**
* Returns the backing timer.
*
@@ -163,4 +172,5 @@
public int maxIdleMillis() {
return maxIdleMillis;
}
+
}
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index 568e38c..20b7a48 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -40,6 +40,10 @@
*/
void processItems(List<T> items);
- //TODO consider a blocking version that required consumer participation
-
+ /**
+ * Indicates whether the accumulator is ready to process items.
+ *
+ * @return true if ready to process
+ */
+ boolean isReady();
}