ONOS-5691 ONOS-5742 Fixing intent framework difficulties
- Refactoring AbstractAccumulator to use less blocking synchronization
- Fixing bug in AbstractAccumulator that could leave some items
without firing
- Updated IntentStore for resubmitting pending operations
Change-Id: Iaf240da65e11ceb7d7d745cf4e25bfb8c26ed1eb
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index fb3e4f0..273d409 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -15,6 +15,7 @@
*/
package org.onlab.util;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,6 +23,7 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -40,10 +42,10 @@
private final int maxBatchMillis;
private final int maxIdleMillis;
- private volatile TimerTask idleTask = new ProcessorTask();
- private volatile TimerTask maxTask = new ProcessorTask();
+ private final AtomicReference<TimerTask> idleTask = new AtomicReference<>();
+ private final AtomicReference<TimerTask> maxTask = new AtomicReference<>();
- private List<T> items = Lists.newArrayList();
+ private final List<T> items;
/**
* Creates an item accumulator capable of triggering on the specified
@@ -52,6 +54,11 @@
* @param timer timer to use for scheduling check-points
* @param maxItems maximum number of items to accumulate before
* processing is triggered
+ * <p>
+ * NB: It is possible that processItems will contain
+ * more than maxItems under high load or if isReady()
+ * can return false.
+ * </p>
* @param maxBatchMillis maximum number of millis allowed since the first
* item before processing is triggered
* @param maxIdleMillis maximum number millis between items before
@@ -68,103 +75,118 @@
this.maxItems = maxItems;
this.maxBatchMillis = maxBatchMillis;
this.maxIdleMillis = maxIdleMillis;
+
+ items = Lists.newArrayListWithExpectedSize(maxItems);
}
@Override
- public synchronized void add(T item) {
- idleTask = cancelIfActive(idleTask);
- items.add(checkNotNull(item, "Item cannot be null"));
+ public void add(T item) {
+ final int sizeAtTimeOfAdd;
+ synchronized (items) {
+ items.add(item);
+ sizeAtTimeOfAdd = items.size();
+ }
+
+ /*
+ WARNING: It is possible that the item that was just added to the list
+ has been processed by an existing idle task at this point.
+
+ By rescheduling the following timers, it is possible that a
+ superfluous maxTask is generated now OR that the idle task and max
+ task are scheduled at their specified delays. This could result in
+ calls to processItems sooner than expected.
+ */
// Did we hit the max item threshold?
- if (items.size() >= maxItems) {
- maxTask = cancelIfActive(maxTask);
- scheduleNow();
+ if (sizeAtTimeOfAdd >= maxItems) {
+ if (maxIdleMillis < maxBatchMillis) {
+ cancelTask(idleTask);
+ }
+ rescheduleTask(maxTask, 0 /* now! */);
} else {
// Otherwise, schedule idle task and if this is a first item
// also schedule the max batch age task.
- idleTask = schedule(maxIdleMillis);
- if (items.size() == 1) {
- maxTask = schedule(maxBatchMillis);
+ if (maxIdleMillis < maxBatchMillis) {
+ rescheduleTask(idleTask, maxIdleMillis);
+ }
+ if (sizeAtTimeOfAdd == 1) {
+ rescheduleTask(maxTask, maxBatchMillis);
}
}
}
/**
- * Finalizes the current batch, if ready, and schedules a new processor
- * in the immediate future.
+ * Reschedules the specified task, cancelling existing one if applicable.
+ *
+ * @param taskRef task reference
+ * @param millis delay in milliseconds
*/
- private void scheduleNow() {
- if (isReady()) {
- TimerTask task = new ProcessorTask(finalizeCurrentBatch());
- timer.schedule(task, 1);
- }
+ private void rescheduleTask(AtomicReference<TimerTask> taskRef, long millis) {
+ ProcessorTask newTask = new ProcessorTask();
+ timer.schedule(newTask, millis);
+ swapAndCancelTask(taskRef, newTask);
}
/**
- * Schedules a new processor task given number of millis in the future.
- * Batch finalization is deferred to time of execution.
+ * Cancels the specified task if it has not run or is not running.
+ *
+ * @param taskRef task reference
*/
- private TimerTask schedule(int millis) {
- TimerTask task = new ProcessorTask();
- timer.schedule(task, millis);
- return task;
+ private void cancelTask(AtomicReference<TimerTask> taskRef) {
+ swapAndCancelTask(taskRef, null);
}
/**
- * Cancels the specified task if it is active.
+ * Sets the new task and attempts to cancelTask the old one.
+ *
+ * @param taskRef task reference
+ * @param newTask new task
*/
- private TimerTask cancelIfActive(TimerTask task) {
- if (task != null) {
- task.cancel();
+ private void swapAndCancelTask(AtomicReference<TimerTask> taskRef,
+ TimerTask newTask) {
+ TimerTask oldTask = taskRef.getAndSet(newTask);
+ if (oldTask != null) {
+ oldTask.cancel();
}
- return task;
}
// Task for triggering processing of accumulated items
private class ProcessorTask extends TimerTask {
-
- private final List<T> items;
-
- // Creates a new processor task with deferred batch finalization.
- ProcessorTask() {
- this.items = null;
- }
-
- // Creates a new processor task with pre-emptive batch finalization.
- ProcessorTask(List<T> items) {
- this.items = items;
- }
-
@Override
public void run() {
- synchronized (AbstractAccumulator.this) {
- idleTask = cancelIfActive(idleTask);
- }
- if (isReady()) {
- try {
- synchronized (AbstractAccumulator.this) {
- maxTask = cancelIfActive(maxTask);
- }
- List<T> batch = items != null ? items : finalizeCurrentBatch();
+ try {
+ if (isReady()) {
+
+ List<T> batch = finalizeCurrentBatch();
if (!batch.isEmpty()) {
processItems(batch);
}
- } catch (Exception e) {
- log.warn("Unable to process batch due to", e);
+ } else {
+ rescheduleTask(idleTask, maxIdleMillis);
}
- } else {
- synchronized (AbstractAccumulator.this) {
- idleTask = schedule(maxIdleMillis);
- }
+ } catch (Exception e) {
+ log.warn("Unable to process batch due to", e);
}
}
}
- // Demotes and returns the current batch of items and promotes a new one.
- private synchronized List<T> finalizeCurrentBatch() {
- List<T> toBeProcessed = items;
- items = Lists.newArrayList();
- return toBeProcessed;
+ /**
+ * Returns an immutable copy of the existing items and clear the list.
+ *
+ * @return list of existing items
+ */
+ private List<T> finalizeCurrentBatch() {
+ List<T> finalizedList;
+ synchronized (items) {
+ finalizedList = ImmutableList.copyOf(items);
+ items.clear();
+ /*
+ * To avoid reprocessing being triggered on an empty list.
+ */
+ cancelTask(maxTask);
+ cancelTask(idleTask);
+ }
+ return finalizedList;
}
@Override
diff --git a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
index adb0e95..d935cd8 100644
--- a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -20,9 +20,7 @@
import java.util.List;
import java.util.stream.IntStream;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.onlab.junit.TestTools.assertAfter;
/**
@@ -142,7 +140,8 @@
IntStream.range(0, 1000).forEach(i -> accumulator.add(new TestItem("#" + i)));
timer.advanceTimeMillis(1);
assertAfter(100, () -> assertEquals("wrong item count", 1000, accumulator.itemCount));
- assertEquals("wrong batch count", 200, accumulator.batchCount);
+ //TODO this assertion could fail under heavy load
+ assertTrue("batch count not near 200", Math.abs(200 - accumulator.batchCount) < 10);
}
private class TestItem {