Serializing batch execution on per-instance basis for intents
Change-Id: Idda3f4a65e78567302d91ba0070e78d435eea8fd
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index e64fe82..70878c5 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -155,6 +155,9 @@
@Override
public void execute(IntentOperations operations) {
+ if (operations.operations().isEmpty()) {
+ return;
+ }
batchService.addIntentOperations(operations);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
index 0806d8e..e09a934 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
@@ -15,7 +15,7 @@
*/
package org.onlab.onos.store.intent.impl;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,7 +25,9 @@
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
-import java.util.HashSet;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -37,7 +39,8 @@
public class DistributedIntentBatchQueue implements IntentBatchService {
private final Logger log = getLogger(getClass());
- private final Set<IntentOperations> pendingBatches = new HashSet<>();
+ private final Queue<IntentOperations> pendingBatches = new LinkedList<>();
+ private final Set<IntentOperations> currentBatches = Sets.newHashSet();
private IntentBatchDelegate delegate;
@Activate
@@ -53,18 +56,35 @@
@Override
public void addIntentOperations(IntentOperations operations) {
checkState(delegate != null, "No delegate set");
- pendingBatches.add(operations);
- delegate.execute(operations);
+ synchronized (this) {
+ pendingBatches.add(operations);
+ if (currentBatches.isEmpty()) {
+ IntentOperations work = pendingBatches.poll();
+ currentBatches.add(work);
+ delegate.execute(work);
+ }
+ }
}
@Override
public void removeIntentOperations(IntentOperations operations) {
- pendingBatches.remove(operations);
+ // we allow at most one outstanding batch at a time
+ synchronized (this) {
+ checkState(currentBatches.remove(operations), "Operations not found in current ops.");
+ checkState(currentBatches.isEmpty(), "More than one outstanding batch.");
+ IntentOperations work = pendingBatches.poll();
+ if (work != null) {
+ currentBatches.add(work);
+ delegate.execute(work);
+ }
+ }
}
@Override
public Set<IntentOperations> getIntentOperations() {
- return ImmutableSet.copyOf(pendingBatches);
+ Set<IntentOperations> set = Sets.newHashSet(currentBatches);
+ set.addAll((Collection) pendingBatches);
+ return set;
}
@Override