Updating Intent Manager to deal with failures.
Added ids to Flow batch futures.
Adding some basic unit tests for IntentManger
Adding failedIds to the completedOperation in FlowRuleManager
Change-Id: I7645cead193299f70d319d254cd1e82d96909e7b
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index d6d06d0..6ab5e12 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -340,7 +340,8 @@
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+ return Futures.immediateFuture(new CompletedBatchOperation(true,
+ Collections.<FlowRule>emptySet()));
}
DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
@@ -379,8 +380,8 @@
private ListenableFuture<CompletedBatchOperation>
storeBatchInternal(FlowRuleBatchOperation operation) {
- final List<StoredFlowEntry> toRemove = new ArrayList<>();
- final List<StoredFlowEntry> toAdd = new ArrayList<>();
+ final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
+ final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
DeviceId did = null;
@@ -396,14 +397,14 @@
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry != null) {
entry.setState(FlowEntryState.PENDING_REMOVE);
- toRemove.add(entry);
+ toRemove.add(batchEntry);
}
} else if (op.equals(FlowRuleOperation.ADD)) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
if (!flowEntries.containsEntry(deviceId, flowEntry)) {
flowEntries.put(deviceId, flowEntry);
- toAdd.add(flowEntry);
+ toAdd.add(batchEntry);
}
}
}
@@ -427,8 +428,8 @@
}
private void updateBackup(final DeviceId deviceId,
- final List<StoredFlowEntry> toAdd,
- final List<? extends FlowRule> list) {
+ final List<FlowRuleBatchEntry> toAdd,
+ final List<FlowRuleBatchEntry> list) {
Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
@@ -442,8 +443,9 @@
}
}
- private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
- updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
+ private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
+
+ updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
}
@Override
@@ -477,8 +479,9 @@
stored.setPackets(rule.packets());
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
- // update backup.
- updateBackup(did, Arrays.asList(stored));
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
+ updateBackup(did, Arrays.asList(entry));
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
@@ -515,7 +518,9 @@
try {
// This is where one could mark a rule as removed and still keep it in the store.
final boolean removed = flowEntries.remove(deviceId, rule);
- updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
+ updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
if (removed) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
@@ -687,15 +692,17 @@
}
// Task to update FlowEntries in backup HZ store
+ // TODO: Should be refactored to contain only one list and not
+ // toAdd and toRemove
private final class UpdateBackup implements Runnable {
private final DeviceId deviceId;
- private final List<StoredFlowEntry> toAdd;
- private final List<? extends FlowRule> toRemove;
+ private final List<FlowRuleBatchEntry> toAdd;
+ private final List<FlowRuleBatchEntry> toRemove;
public UpdateBackup(DeviceId deviceId,
- List<StoredFlowEntry> toAdd,
- List<? extends FlowRule> list) {
+ List<FlowRuleBatchEntry> toAdd,
+ List<FlowRuleBatchEntry> list) {
this.deviceId = checkNotNull(deviceId);
this.toAdd = checkNotNull(toAdd);
this.toRemove = checkNotNull(list);
@@ -707,7 +714,8 @@
log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
// Following should be rewritten using async APIs
- for (StoredFlowEntry entry : toAdd) {
+ for (FlowRuleBatchEntry bEntry : toAdd) {
+ final FlowRule entry = bEntry.getTarget();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
@@ -715,8 +723,8 @@
list.addAll(original);
}
- list.remove(entry);
- list.add(entry);
+ list.remove(bEntry.getTarget());
+ list.add((StoredFlowEntry) entry);
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
@@ -730,7 +738,8 @@
log.error("Updating backup failed.");
}
}
- for (FlowRule entry : toRemove) {
+ for (FlowRuleBatchEntry bEntry : toRemove) {
+ final FlowRule entry = bEntry.getTarget();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
@@ -738,7 +747,7 @@
list.addAll(original);
}
- list.remove(entry);
+ list.remove(bEntry.getTarget());
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
index 4c146d1..68cda00 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
@@ -25,7 +25,6 @@
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
-import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
@@ -82,10 +81,17 @@
}
@Override
- public Set<IntentOperations> getIntentOperations() {
- Set<IntentOperations> set = Sets.newHashSet(currentBatches);
- set.addAll((Collection) pendingBatches);
- return set;
+ public Set<IntentOperations> getPendingOperations() {
+ synchronized (this) {
+ return Sets.newHashSet(pendingBatches);
+ }
+ }
+
+ @Override
+ public Set<IntentOperations> getCurrentOperations() {
+ synchronized (this) {
+ return Sets.newHashSet(currentBatches);
+ }
}
@Override
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 622e2de..4cd31d5 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -263,19 +263,19 @@
@Override
public Future<CompletedBatchOperation> storeBatch(
FlowRuleBatchOperation batchOperation) {
- List<FlowRule> toAdd = new ArrayList<>();
- List<FlowRule> toRemove = new ArrayList<>();
+ List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
+ List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
final FlowRule flowRule = entry.getTarget();
if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
storeFlowRule(flowRule);
- toAdd.add(flowRule);
+ toAdd.add(entry);
}
} else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
deleteFlowRule(flowRule);
- toRemove.add(flowRule);
+ toRemove.add(entry);
}
} else {
throw new UnsupportedOperationException("Unsupported operation type");
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
index b8d22bf..f92eca0 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
@@ -15,7 +15,7 @@
*/
package org.onlab.onos.store.trivial.impl;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,7 +25,8 @@
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
-import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -37,7 +38,8 @@
public class SimpleIntentBatchQueue implements IntentBatchService {
private final Logger log = getLogger(getClass());
- private final Set<IntentOperations> pendingBatches = new HashSet<>();
+ private final Queue<IntentOperations> pendingBatches = new LinkedList<>();
+ private final Set<IntentOperations> currentBatches = Sets.newHashSet();
private IntentBatchDelegate delegate;
@Activate
@@ -53,18 +55,42 @@
@Override
public void addIntentOperations(IntentOperations operations) {
checkState(delegate != null, "No delegate set");
- pendingBatches.add(operations);
- delegate.execute(operations);
+ synchronized (this) {
+ pendingBatches.add(operations);
+ if (currentBatches.isEmpty()) {
+ IntentOperations work = pendingBatches.poll();
+ currentBatches.add(work);
+ delegate.execute(work);
+ }
+ }
}
@Override
public void removeIntentOperations(IntentOperations operations) {
- pendingBatches.remove(operations);
+ // we allow at most one outstanding batch at a time
+ synchronized (this) {
+ checkState(currentBatches.remove(operations), "Operations not found in current ops.");
+ checkState(currentBatches.isEmpty(), "More than one outstanding batch.");
+ IntentOperations work = pendingBatches.poll();
+ if (work != null) {
+ currentBatches.add(work);
+ delegate.execute(work);
+ }
+ }
}
@Override
- public Set<IntentOperations> getIntentOperations() {
- return ImmutableSet.copyOf(pendingBatches);
+ public Set<IntentOperations> getPendingOperations() {
+ synchronized (this) {
+ return Sets.newHashSet(pendingBatches);
+ }
+ }
+
+ @Override
+ public Set<IntentOperations> getCurrentOperations() {
+ synchronized (this) {
+ return Sets.newHashSet(currentBatches);
+ }
}
@Override