Enhancing accumulator to allow subclasses to indicate whether they are ready for the batch to be processed. Default behaviour returns true.
Change-Id: I53a3ffc3ecd75ed2607f155a61971e05a6009a66
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index 00a5a2c..a7b50cb 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -76,7 +76,7 @@
items.add(checkNotNull(item, "Item cannot be null"));
// Did we hit the max item threshold?
- if (items.size() == maxItems) {
+ if (items.size() >= maxItems) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
@@ -108,12 +108,16 @@
private class ProcessorTask extends TimerTask {
@Override
public void run() {
- try {
- idleTask = cancelIfActive(idleTask);
- maxTask = cancelIfActive(maxTask);
- processItems(finalizeCurrentBatch());
- } catch (Exception e) {
- log.warn("Unable to process batch due to {}", e);
+ idleTask = cancelIfActive(idleTask);
+ if (isReady()) {
+ try {
+ maxTask = cancelIfActive(maxTask);
+ processItems(finalizeCurrentBatch());
+ } catch (Exception e) {
+ log.warn("Unable to process batch due to {}", e);
+ }
+ } else {
+ idleTask = schedule(maxIdleMillis);
}
}
}
@@ -125,6 +129,11 @@
return toBeProcessed;
}
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
/**
* Returns the backing timer.
*
@@ -163,4 +172,5 @@
public int maxIdleMillis() {
return maxIdleMillis;
}
+
}
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index 568e38c..20b7a48 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -40,6 +40,10 @@
*/
void processItems(List<T> items);
- //TODO consider a blocking version that required consumer participation
-
+ /**
+ * Indicates whether the accumulator is ready to process items.
+ *
+ * @return true if ready to process
+ */
+ boolean isReady();
}
diff --git a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
index b2b0b1f..e3d87e2 100644
--- a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -37,7 +37,7 @@
assertEquals("incorrect timer", timer, accumulator.timer());
assertEquals("incorrect max events", 5, accumulator.maxItems());
assertEquals("incorrect max ms", 100, accumulator.maxBatchMillis());
- assertEquals("incorrect idle ms", 50, accumulator.maxIdleMillis());
+ assertEquals("incorrect idle ms", 70, accumulator.maxIdleMillis());
}
@Test
@@ -68,7 +68,7 @@
delay(30);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestItem("d"));
- delay(30);
+ delay(60);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "abcd", accumulator.batch);
}
@@ -84,6 +84,54 @@
assertEquals("incorrect batch", "ab", accumulator.batch);
}
+ @Test
+ public void readyIdleTrigger() {
+ TestAccumulator accumulator = new TestAccumulator();
+ accumulator.ready = false;
+ accumulator.add(new TestItem("a"));
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.add(new TestItem("b"));
+ delay(80);
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.ready = true;
+ delay(80);
+ assertFalse("should have fired", accumulator.batch.isEmpty());
+ assertEquals("incorrect batch", "ab", accumulator.batch);
+ }
+
+ @Test
+ public void readyLongTrigger() {
+ TestAccumulator accumulator = new TestAccumulator();
+ accumulator.ready = false;
+ delay(120);
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.add(new TestItem("a"));
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.ready = true;
+ delay(80);
+ assertFalse("should have fired", accumulator.batch.isEmpty());
+ assertEquals("incorrect batch", "a", accumulator.batch);
+ }
+
+ @Test
+ public void readyMaxTrigger() {
+ TestAccumulator accumulator = new TestAccumulator();
+ accumulator.ready = false;
+ accumulator.add(new TestItem("a"));
+ accumulator.add(new TestItem("b"));
+ accumulator.add(new TestItem("c"));
+ accumulator.add(new TestItem("d"));
+ accumulator.add(new TestItem("e"));
+ accumulator.add(new TestItem("f"));
+ assertTrue("should not have fired yet", accumulator.batch.isEmpty());
+ accumulator.ready = true;
+ accumulator.add(new TestItem("g"));
+ delay(5);
+ assertFalse("should have fired", accumulator.batch.isEmpty());
+ assertEquals("incorrect batch", "abcdefg", accumulator.batch);
+ }
+
+
private class TestItem {
private final String s;
@@ -95,9 +143,10 @@
private class TestAccumulator extends AbstractAccumulator<TestItem> {
String batch = "";
+ boolean ready = true;
protected TestAccumulator() {
- super(timer, 5, 100, 50);
+ super(timer, 5, 100, 70);
}
@Override
@@ -106,6 +155,11 @@
batch += item.s;
}
}
+
+ @Override
+ public boolean isReady() {
+ return ready;
+ }
}
}