Updating Intent Manager to deal with failures.
Added ids to Flow batch futures.
Adding some basic unit tests for IntentManger
Adding failedIds to the completedOperation in FlowRuleManager
Change-Id: I7645cead193299f70d319d254cd1e82d96909e7b
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index b988744..a7bffe7 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -15,6 +15,9 @@
*/
package org.onlab.onos.net.flow;
+
+import java.util.Collections;
+
import java.util.Set;
import com.google.common.collect.ImmutableSet;
@@ -26,6 +29,21 @@
private final boolean success;
private final Set<FlowRule> failures;
+ private final Set<Long> failedIds;
+
+ /**
+ * Creates a new batch completion result.
+ *
+ * @param success indicates whether the completion is successful.
+ * @param failures set of any failures encountered
+ * @param failedIds (optional) set of failed operation ids
+ */
+ public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
+ Set<Long> failedIds) {
+ this.success = success;
+ this.failures = ImmutableSet.copyOf(failures);
+ this.failedIds = ImmutableSet.copyOf(failedIds);
+ }
/**
* Creates a new batch completion result.
@@ -36,8 +54,11 @@
public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
+ this.failedIds = Collections.emptySet();
}
+
+
@Override
public boolean isSuccess() {
return success;
@@ -48,4 +69,8 @@
return failures;
}
+ public Set<Long> failedIds() {
+ return failedIds;
+ }
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
index d84e51b..7a635e7 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
@@ -21,8 +21,20 @@
public class FlowRuleBatchEntry
extends BatchOperationEntry<FlowRuleOperation, FlowRule> {
+ private final Long id; // FIXME: consider using Optional<Long>
+
public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) {
super(operator, target);
+ this.id = null;
+ }
+
+ public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target, Long id) {
+ super(operator, target);
+ this.id = id;
+ }
+
+ public Long id() {
+ return id;
}
public enum FlowRuleOperation {
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
index f75c663..09698a4 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
@@ -18,38 +18,52 @@
import java.util.Collections;
import java.util.List;
-import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+
+
import com.google.common.collect.Lists;
public class FlowRuleBatchRequest {
private final int batchId;
- private final List<FlowRule> toAdd;
- private final List<FlowRule> toRemove;
+ private final List<FlowRuleBatchEntry> toAdd;
+ private final List<FlowRuleBatchEntry> toRemove;
- public FlowRuleBatchRequest(int batchId, List<? extends FlowRule> toAdd, List<? extends FlowRule> toRemove) {
+ public FlowRuleBatchRequest(int batchId, List<FlowRuleBatchEntry> toAdd,
+ List<FlowRuleBatchEntry> toRemove) {
this.batchId = batchId;
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
}
public List<FlowRule> toAdd() {
- return toAdd;
+ return FluentIterable.from(toAdd).transform(
+ new Function<FlowRuleBatchEntry, FlowRule>() {
+
+ @Override
+ public FlowRule apply(FlowRuleBatchEntry input) {
+ return input.getTarget();
+ }
+ }).toList();
}
public List<FlowRule> toRemove() {
- return toRemove;
+ return FluentIterable.from(toRemove).transform(
+ new Function<FlowRuleBatchEntry, FlowRule>() {
+
+ @Override
+ public FlowRule apply(FlowRuleBatchEntry input) {
+ return input.getTarget();
+ }
+ }).toList();
}
public FlowRuleBatchOperation asBatchOperation() {
List<FlowRuleBatchEntry> entries = Lists.newArrayList();
- for (FlowRule e : toAdd) {
- entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
- }
- for (FlowRule e : toRemove) {
- entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
- }
+ entries.addAll(toAdd);
+ entries.addAll(toRemove);
return new FlowRuleBatchOperation(entries);
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
index 37a1d4a..0364ad7 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
@@ -37,10 +37,16 @@
void removeIntentOperations(IntentOperations operations);
/**
- * Returns the set of intent batches currently being tracked.
+ * Returns the set of intent batches that are pending.
* @return set of batches
*/
- Set<IntentOperations> getIntentOperations();
+ Set<IntentOperations> getPendingOperations();
+
+ /**
+ * Returns the set of intent batches currently being processed.
+ * @return set of batches
+ */
+ Set<IntentOperations> getCurrentOperations();
/**
* Sets the batch service delegate.
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentState.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentState.java
index 3fbe82d..0e9211d 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentState.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentState.java
@@ -67,6 +67,18 @@
RECOMPILING,
/**
+ * TODO: Indicated that an intent will soon be recompiled.
+ */
+ //UPDATE,
+
+ /**
+ * TODO.
+ * Indicates that an application has requested that an intent be withdrawn.
+ * It will start withdrawing short, but not necessarily on this instance.
+ */
+ //WITHDRAW_REQ,
+
+ /**
* Indicates that the intent is being withdrawn. This is a transitional
* state, triggered by invocation of the
* {@link IntentService#withdraw(Intent)} but one with only one outcome,
diff --git a/core/api/src/test/java/org/onlab/onos/net/flow/FlowRuleBatchRequestTest.java b/core/api/src/test/java/org/onlab/onos/net/flow/FlowRuleBatchRequestTest.java
index 7541692..6710f18 100644
--- a/core/api/src/test/java/org/onlab/onos/net/flow/FlowRuleBatchRequestTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/flow/FlowRuleBatchRequestTest.java
@@ -21,6 +21,8 @@
import org.junit.Test;
import org.onlab.onos.net.intent.IntentTestsMocks;
+import static org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation.*;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@@ -38,10 +40,10 @@
public void testConstruction() {
final FlowRule rule1 = new IntentTestsMocks.MockFlowRule(1);
final FlowRule rule2 = new IntentTestsMocks.MockFlowRule(2);
- final List<FlowRule> toAdd = new LinkedList<>();
- toAdd.add(rule1);
- final List<FlowRule> toRemove = new LinkedList<>();
- toRemove.add(rule2);
+ final List<FlowRuleBatchEntry> toAdd = new LinkedList<>();
+ toAdd.add(new FlowRuleBatchEntry(ADD, rule1));
+ final List<FlowRuleBatchEntry> toRemove = new LinkedList<>();
+ toRemove.add(new FlowRuleBatchEntry(REMOVE, rule2));
final FlowRuleBatchRequest request =
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/IntentTestsMocks.java b/core/api/src/test/java/org/onlab/onos/net/intent/IntentTestsMocks.java
index cc2ef75..c5483c9 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/IntentTestsMocks.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/IntentTestsMocks.java
@@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import org.onlab.onos.net.DeviceId;
@@ -331,7 +332,22 @@
return false;
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(priority);
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final MockFlowRule other = (MockFlowRule) obj;
+ return Objects.equals(this.priority, other.priority);
+ }
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index d356d9a..803abb9 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -488,13 +488,14 @@
boolean success = true;
Set<FlowRule> failed = Sets.newHashSet();
+ Set<Long> failedIds = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
- success = validateBatchOperation(failed, completed);
+ success = validateBatchOperation(failed, failedIds, completed);
}
- return finalizeBatchOperation(success, failed);
+ return finalizeBatchOperation(success, failed, failedIds);
}
@@ -508,22 +509,25 @@
}
boolean success = true;
Set<FlowRule> failed = Sets.newHashSet();
+ Set<Long> failedIds = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get(timeout, unit);
- success = validateBatchOperation(failed, completed);
+ success = validateBatchOperation(failed, failedIds, completed);
}
- return finalizeBatchOperation(success, failed);
+ return finalizeBatchOperation(success, failed, failedIds);
}
private boolean validateBatchOperation(Set<FlowRule> failed,
- CompletedBatchOperation completed) {
+ Set<Long> failedIds,
+ CompletedBatchOperation completed) {
if (isCancelled()) {
throw new CancellationException();
}
if (!completed.isSuccess()) {
failed.addAll(completed.failedItems());
+ failedIds.addAll(completed.failedIds());
cleanUpBatch();
cancelAllSubBatches();
return false;
@@ -538,7 +542,8 @@
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
- Set<FlowRule> failed) {
+ Set<FlowRule> failed,
+ Set<Long> failedIds) {
synchronized (this) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
@@ -546,7 +551,7 @@
}
throw new CancellationException();
}
- overall = new CompletedBatchOperation(success, failed);
+ overall = new CompletedBatchOperation(success, failed, failedIds);
return overall;
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 70878c5..361c2ea 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -49,9 +49,9 @@
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -330,12 +330,12 @@
batches.addAll(getInstaller(installable).install(installable));
} catch (IntentException e) {
log.warn("Unable to install intent {} due to:", update.newIntent().id(), e);
+ trackerService.removeTrackedResources(update.newIntent().id(),
+ installable.resources());
//FIXME we failed... intent should be recompiled
- // TODO: remove resources
- // recompile!!!
}
}
- update.setBatches(batches);
+ update.addBatches(batches);
}
/**
@@ -348,80 +348,32 @@
if (!update.oldIntent().equals(update.newIntent())) {
update.setState(update.oldIntent(), WITHDRAWING);
} // else newIntent is FAILED
- uninstallIntent(update);
-
- // If all went well, disassociate the top-level intent with its
- // installable derivatives and mark it as withdrawn.
- // FIXME need to clean up
- //store.removeInstalledIntents(intent.id());
+ update.addBatches(uninstallIntent(update.oldIntent(), update.oldInstallables()));
}
/**
* Uninstalls all installable intents associated with the given intent.
*
- * @param update intent update
+ * @param intent intent
+ * @param installables installable intents
+ * @return list of batches to uninstall intent
*/
- //FIXME: need to handle next state properly
- private void uninstallIntent(IntentUpdate update) {
- if (update.oldInstallables == null) {
- return;
+ private List<FlowRuleBatchOperation> uninstallIntent(Intent intent, List<Intent> installables) {
+ if (installables == null) {
+ return Collections.emptyList();
}
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
- for (Intent installable : update.oldInstallables()) {
- trackerService.removeTrackedResources(update.oldIntent().id(),
+ for (Intent installable : installables) {
+ trackerService.removeTrackedResources(intent.id(),
installable.resources());
try {
batches.addAll(getInstaller(installable).uninstall(installable));
} catch (IntentException e) {
- log.warn("Unable to uninstall intent {} due to:", update.oldIntent().id(), e);
+ log.warn("Unable to uninstall intent {} due to:", intent.id(), e);
// TODO: this should never happen. but what if it does?
}
}
- update.setBatches(batches);
- // FIXME: next state for old is WITHDRAWN or FAILED
- }
-
- /**
- * Recompiles the specified intent.
- *
- * @param update intent update
- */
- // FIXME: update this to work
- private void executeRecompilingPhase(IntentUpdate update) {
- Intent intent = update.newIntent();
- // Indicate that the intent is entering the recompiling phase.
- store.setState(intent, RECOMPILING);
-
- List<FlowRuleBatchOperation> batches = Lists.newArrayList();
- try {
- // Compile the intent into installable derivatives.
- List<Intent> installable = compileIntent(intent, update);
-
- // If all went well, compare the existing list of installable
- // intents with the newly compiled list. If they are the same,
- // bail, out since the previous approach was determined not to
- // be viable.
- // FIXME do we need this?
- List<Intent> originalInstallable = store.getInstallableIntents(intent.id());
-
- //FIXME let's be smarter about how we perform the update
- //batches.addAll(uninstallIntent(intent, null));
-
- if (Objects.equals(originalInstallable, installable)) {
- eventDispatcher.post(store.setState(intent, FAILED));
- } else {
- // Otherwise, re-associate the newly compiled installable intents
- // with the top-level intent and kick off installing phase.
- store.setInstallableIntents(intent.id(), installable);
- // FIXME commented out for now
- //batches.addAll(executeInstallingPhase(update));
- }
- } catch (Exception e) {
- log.warn("Unable to recompile intent {} due to:", intent.id(), e);
-
- // If compilation failed, mark the intent as failed.
- eventDispatcher.post(store.setState(intent, FAILED));
- }
+ return batches;
}
/**
@@ -442,9 +394,10 @@
for (int i = 0; i < update.oldInstallables().size(); i++) {
Intent oldInstallable = update.oldInstallables().get(i);
Intent newInstallable = update.newInstallables().get(i);
- if (oldInstallable.equals(newInstallable)) {
- continue;
- }
+ //FIXME revisit this
+// if (oldInstallable.equals(newInstallable)) {
+// continue;
+// }
checkArgument(oldInstallable.getClass().equals(newInstallable.getClass()),
"Installable Intent type mismatch.");
trackerService.removeTrackedResources(update.oldIntent().id(), oldInstallable.resources());
@@ -454,9 +407,12 @@
} catch (IntentException e) {
log.warn("Unable to update intent {} due to:", update.oldIntent().id(), e);
//FIXME... we failed. need to uninstall (if same) or revert (if different)
+ trackerService.removeTrackedResources(update.newIntent().id(), newInstallable.resources());
+ update.setState(update.newIntent(), FAILED);
+ batches = uninstallIntent(update.oldIntent(), update.oldInstallables());
}
}
- update.setBatches(batches);
+ update.addBatches(batches);
}
/**
@@ -541,13 +497,12 @@
}
/**
- * TODO.
- * @param op intent operation
- * @return intent update
+ * TODO. rename this...
+ * @param update intent update
*/
- private IntentUpdate processIntentOperation(IntentOperation op) {
- IntentUpdate update = new IntentUpdate(op);
+ private void processIntentUpdate(IntentUpdate update) {
+ // check to see if the intent needs to be compiled or recompiled
if (update.newIntent() != null) {
executeCompilingPhase(update);
}
@@ -559,32 +514,29 @@
} else if (update.oldInstallables() != null) {
executeWithdrawingPhase(update);
} else {
- if (update.oldIntent() != null) {
- // TODO this shouldn't happen
- return update; //FIXME
+ if (update.oldIntent() != null &&
+ !update.oldIntent().equals(update.newIntent())) {
+ // removing failed intent
+ update.setState(update.oldIntent(), WITHDRAWING);
}
- if (update.newIntent() != null) {
- // TODO assert that next state is failed
- return update; //FIXME
- }
+// if (update.newIntent() != null) {
+// // TODO assert that next state is failed
+// }
}
-
- return update;
}
// TODO comments...
private class IntentUpdate {
- private final IntentOperation op;
private final Intent oldIntent;
private final Intent newIntent;
private final Map<Intent, IntentState> stateMap = Maps.newHashMap();
private final List<Intent> oldInstallables;
private List<Intent> newInstallables;
- private List<FlowRuleBatchOperation> batches;
+ private final List<FlowRuleBatchOperation> batches = Lists.newLinkedList();
+ private int currentBatch = 0; // TODO: maybe replace with an iterator
IntentUpdate(IntentOperation op) {
- this.op = op;
switch (op.type()) {
case SUBMIT:
newIntent = op.intent();
@@ -600,7 +552,7 @@
break;
case UPDATE:
oldIntent = store.getIntent(op.intentId());
- newIntent = oldIntent; //InnerAssignment: Inner assignments should be avoided.
+ newIntent = oldIntent;
break;
default:
oldIntent = null;
@@ -617,7 +569,6 @@
// fetch the old intent's installables from the store
if (oldIntent != null) {
oldInstallables = store.getInstallableIntents(oldIntent.id());
- // TODO: remove intent from store after uninstall
} else {
oldInstallables = null;
}
@@ -644,12 +595,72 @@
store.setInstallableIntents(newIntent.id(), installables);
}
+ boolean isComplete() {
+ return currentBatch >= batches.size();
+ }
+
+ FlowRuleBatchOperation currentBatch() {
+ return !isComplete() ? batches.get(currentBatch) : null;
+ }
+
+ void incrementBatch(boolean success) {
+ if (success) { // actually increment
+ if (++currentBatch == batches.size()) {
+ finalizeStates();
+ }
+ } else { // the current batch has failed, so recompile
+ // remove the current batch and all remaining
+ for (int i = currentBatch; i < batches.size(); i++) {
+ batches.remove(i);
+ }
+ if (oldIntent != null) {
+ executeWithdrawingPhase(this); // remove the old intent
+ }
+ if (newIntent != null) {
+ setState(newIntent, FAILED);
+ batches.addAll(uninstallIntent(newIntent, newInstallables()));
+ }
+
+ // FIXME: should we try to recompile?
+ }
+ }
+
+ // FIXME make sure this is called!!!
+ private void finalizeStates() {
+ for (Intent intent : stateMap.keySet()) {
+ switch (getState(intent)) {
+ case INSTALLING:
+ setState(intent, INSTALLED);
+ break;
+ case WITHDRAWING:
+ setState(intent, WITHDRAWN);
+ store.removeInstalledIntents(intent.id());
+ //store.removeIntent(intent.id()); // FIXME we die a horrible death here
+ break;
+ case FAILED:
+ store.removeInstalledIntents(intent.id());
+ break;
+
+ // FALLTHROUGH to default from here
+ case SUBMITTED:
+ case COMPILING:
+ case RECOMPILING:
+ case WITHDRAWN:
+ case INSTALLED:
+ default:
+ //FIXME clean this up (we shouldn't ever get here)
+ log.warn("Bad state: {} for {}", getState(intent), intent);
+ break;
+ }
+ }
+ }
+
List<FlowRuleBatchOperation> batches() {
return batches;
}
- void setBatches(List<FlowRuleBatchOperation> batches) {
- this.batches = batches;
+ void addBatches(List<FlowRuleBatchOperation> batches) {
+ this.batches.addAll(batches);
}
IntentState getState(Intent intent) {
@@ -659,7 +670,7 @@
void setState(Intent intent, IntentState newState) {
// TODO: clean this up, or set to debug
IntentState oldState = stateMap.get(intent);
- log.info("intent id: {}, old state: {}, new state: {}",
+ log.debug("intent id: {}, old state: {}, new state: {}",
intent.id(), oldState, newState);
stateMap.put(intent, newState);
@@ -674,143 +685,72 @@
}
}
- private static List<FlowRuleBatchOperation> mergeBatches(Map<IntentOperation,
- IntentUpdate> intentUpdates) {
- //TODO test this.
- List<FlowRuleBatchOperation> batches = Lists.newArrayList();
- for (IntentUpdate update : intentUpdates.values()) {
- if (update.batches() == null) {
- continue;
- }
- int i = 0;
- for (FlowRuleBatchOperation batch : update.batches()) {
- if (i == batches.size()) {
- batches.add(batch);
- } else {
- FlowRuleBatchOperation existing = batches.get(i);
- existing.addAll(batch);
- }
- i++;
- }
- }
- return batches;
- }
-
- // Auxiliary runnable to perform asynchronous tasks.
- private class IntentTask implements Runnable {
- private final IntentOperations operations;
-
- public IntentTask(IntentOperations operations) {
- this.operations = operations;
- }
-
- @Override
- public void run() {
- Map<IntentOperation, IntentUpdate> intentUpdates = Maps.newHashMap();
- for (IntentOperation op : operations.operations()) {
- intentUpdates.put(op, processIntentOperation(op));
- }
- List<FlowRuleBatchOperation> batches = mergeBatches(intentUpdates);
- monitorExecutor.execute(new IntentInstallMonitor(operations, intentUpdates, batches));
- }
- }
-
private class IntentInstallMonitor implements Runnable {
private static final long TIMEOUT = 5000; // ms
- private final IntentOperations ops;
- private final Map<IntentOperation, IntentUpdate> intentUpdateMap;
- private final List<FlowRuleBatchOperation> work;
- private Future<CompletedBatchOperation> future;
- private final long startTime = System.currentTimeMillis();
- private final long endTime = startTime + TIMEOUT;
+ private static final int MAX_ATTEMPTS = 3;
- public IntentInstallMonitor(IntentOperations ops,
- Map<IntentOperation, IntentUpdate> intentUpdateMap,
- List<FlowRuleBatchOperation> work) {
+ private final IntentOperations ops;
+ private final List<IntentUpdate> intentUpdates = Lists.newArrayList();
+
+ private Future<CompletedBatchOperation> future;
+ private long startTime = System.currentTimeMillis();
+ private long endTime = startTime + TIMEOUT;
+ private int installAttempt;
+
+ public IntentInstallMonitor(IntentOperations ops) {
this.ops = ops;
- this.intentUpdateMap = intentUpdateMap;
- this.work = work;
+ }
+
+ private void buildIntentUpdates() {
+ for (IntentOperation op : ops.operations()) {
+ IntentUpdate update = new IntentUpdate(op);
+ intentUpdates.add(update);
+ processIntentUpdate(update);
+ }
future = applyNextBatch();
}
/**
- * Applies the next batch, and returns the future.
+ * Builds and applies the next batch, and returns the future.
*
* @return Future for next batch
*/
private Future<CompletedBatchOperation> applyNextBatch() {
- if (work.isEmpty()) {
- return null;
+ //TODO test this. (also, maybe save this batch)
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
+ for (IntentUpdate update : intentUpdates) {
+ if (!update.isComplete()) {
+ batch.addAll(update.currentBatch());
+ }
}
- FlowRuleBatchOperation batch = work.remove(0);
- return flowRuleService.applyBatch(batch);
+ return (batch.size() > 0) ? flowRuleService.applyBatch(batch) : null;
}
- /**
- * Update the intent store with the next status for this intent.
- */
- private void updateIntents() {
- // FIXME we assume everything passes for now.
- for (IntentUpdate update : intentUpdateMap.values()) {
- for (Intent intent : update.stateMap().keySet()) {
- switch (update.getState(intent)) {
- case INSTALLING:
- update.setState(intent, INSTALLED);
- break;
- case WITHDRAWING:
- update.setState(intent, WITHDRAWN);
- // Fall-through
- case FAILED:
- store.removeInstalledIntents(intent.id());
- break;
-
- case SUBMITTED:
- case COMPILING:
- case RECOMPILING:
- case WITHDRAWN:
- case INSTALLED:
- default:
- //FIXME clean this up (we shouldn't ever get here)
- log.warn("Bad state: {} for {}", update.getState(intent), intent);
- break;
- }
+ private void updateBatches(CompletedBatchOperation completed) {
+ if (completed.isSuccess()) {
+ for (IntentUpdate update : intentUpdates) {
+ update.incrementBatch(true);
}
- }
- /*
- for (IntentOperation op : ops.operations()) {
- switch (op.type()) {
- case SUBMIT:
- store.setState(op.intent(), INSTALLED);
- break;
- case WITHDRAW:
- Intent intent = store.getIntent(op.intentId());
- store.setState(intent, WITHDRAWN);
- break;
- case REPLACE:
- store.setState(op.intent(), INSTALLED);
- intent = store.getIntent(op.intentId());
- store.setState(intent, WITHDRAWN);
- break;
- case UPDATE:
- intent = store.getIntent(op.intentId());
- store.setState(intent, INSTALLED);
- break;
- default:
- break;
- }
- }
- */
- /*
- if (nextState == RECOMPILING) {
- eventDispatcher.post(store.setState(intent, FAILED));
- // FIXME try to recompile
-// executor.execute(new IntentTask(nextState, intent));
- } else if (nextState == INSTALLED || nextState == WITHDRAWN) {
- eventDispatcher.post(store.setState(intent, nextState));
} else {
- log.warn("Invalid next intent state {} for intent {}", nextState, intent);
- }*/
+ // entire batch has been reverted...
+ log.warn("Failed items: {}", completed.failedItems());
+
+ for (Long id : completed.failedIds()) {
+ IntentId targetId = IntentId.valueOf(id);
+ for (IntentUpdate update : intentUpdates) {
+ List<Intent> installables = Lists.newArrayList(update.newInstallables());
+ installables.addAll(update.oldInstallables());
+ for (Intent intent : installables) {
+ if (intent.id().equals(targetId)) {
+ update.incrementBatch(false);
+ break;
+ }
+ }
+ }
+ // don't increment the non-failed items, as they have been reverted.
+ }
+ }
}
/**
@@ -822,34 +762,60 @@
}
try {
CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
- if (completed.isSuccess()) {
- future = applyNextBatch();
- } else {
- // TODO check if future succeeded and if not report fail items
- log.warn("Failed items: {}", completed.failedItems());
- // FIXME revert.... by submitting a new batch
- //uninstallIntent(intent, RECOMPILING);
- }
+ updateBatches(completed);
+ future = applyNextBatch();
} catch (TimeoutException | InterruptedException | ExecutionException te) {
//TODO look into error message
- log.debug("Intallations of intent {} is still pending", ops);
+ log.debug("Installation of intents are still pending: {}", ops);
}
}
+ private void retry() {
+ if (future.cancel(true)) { // cancel success; batch is reverted
+ // reset the timer
+ endTime = System.currentTimeMillis() + TIMEOUT;
+ if (installAttempt++ >= MAX_ATTEMPTS) {
+ log.warn("Install request timed out: {}", ops);
+ for (IntentUpdate update : intentUpdates) {
+ update.incrementBatch(false);
+ }
+ } // else just resubmit the work
+ future = applyNextBatch();
+ monitorExecutor.submit(this);
+ } else {
+ // FIXME
+ // cancel failed... batch is broken; shouldn't happen!
+ // we could manually reverse everything
+ // ... or just core dump and send email to Ali
+ batchService.removeIntentOperations(ops);
+ }
+ }
+
+ boolean isComplete() {
+ // TODO: actually check with the intent update?
+ return future == null;
+ }
+
@Override
public void run() {
- processFutures();
- if (future == null) {
- // woohoo! we are done!
- updateIntents();
- batchService.removeIntentOperations(ops);
- } else if (endTime < System.currentTimeMillis()) {
- log.warn("Install request timed out");
-// future.cancel(true);
- // TODO retry and/or report the failure
- } else {
- // resubmit ourselves if we are not done yet
- monitorExecutor.submit(this);
+ try {
+ if (intentUpdates.isEmpty()) {
+ // this should only be called on the first iteration
+ // note: this a "expensive", so it is not done in the constructor
+ buildIntentUpdates();
+ }
+ processFutures();
+ if (isComplete()) {
+ // there are no outstanding batches; we are done
+ batchService.removeIntentOperations(ops);
+ } else if (endTime < System.currentTimeMillis()) {
+ retry();
+ } else {
+ // we are not done yet, yield the thread by resubmitting ourselves
+ monitorExecutor.submit(this);
+ }
+ } catch (Exception e) {
+ log.error("Error submitting batches:", e);
}
}
}
@@ -859,7 +825,7 @@
public void execute(IntentOperations operations) {
log.info("Execute operations: {}", operations);
//FIXME: perhaps we want to track this task so that we can cancel it.
- executor.execute(new IntentTask(operations));
+ monitorExecutor.execute(new IntentInstallMonitor(operations));
}
@Override
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index a29cd0f..0bfd6c3 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -101,7 +101,8 @@
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment, 123, //FIXME 123
appId, (short) (intent.id().fingerprint() & 0xffff), 0, true);
- rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule,
+ intent.id().fingerprint()));
prev = link.dst();
}
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
@@ -127,7 +128,8 @@
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, (short) (intent.id().fingerprint() & 0xffff), 0, true);
- rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule,
+ intent.id().fingerprint()));
prev = link.dst();
}
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
diff --git a/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
new file mode 100644
index 0000000..b3ebe70
--- /dev/null
+++ b/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
@@ -0,0 +1,348 @@
+package org.onlab.onos.net.intent.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.TestApplicationId;
+import org.onlab.onos.core.ApplicationId;
+import org.onlab.onos.event.impl.TestEventDispatcher;
+import org.onlab.onos.net.NetworkResource;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentCompiler;
+import org.onlab.onos.net.intent.IntentEvent;
+import org.onlab.onos.net.intent.IntentEvent.Type;
+import org.onlab.onos.net.intent.IntentExtensionService;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentInstaller;
+import org.onlab.onos.net.intent.IntentListener;
+import org.onlab.onos.net.intent.IntentService;
+import org.onlab.onos.net.intent.IntentState;
+import org.onlab.onos.net.intent.IntentTestsMocks;
+import org.onlab.onos.net.resource.LinkResourceAllocations;
+import org.onlab.onos.store.trivial.impl.SimpleIntentBatchQueue;
+import org.onlab.onos.store.trivial.impl.SimpleIntentStore;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.intent.IntentState.*;
+import static org.onlab.util.Tools.delay;
+
+/**
+ * Test intent manager and transitions.
+ *
+ * TODO implement the following tests:
+ * - {submit, withdraw, update, replace} intent
+ * - {submit, update, recomiling} intent with failed compilation
+ * - failed reservation
+ * - push timeout recovery
+ * - failed items recovery
+ *
+ * in general, verify intents store, flow store, and work queue
+ */
+public class IntentManagerTest {
+
+ private static final ApplicationId APPID = new TestApplicationId("manager-test");
+
+ private IntentManager manager;
+ private MockFlowRuleService flowRuleService;
+
+ protected IntentService service;
+ protected IntentExtensionService extensionService;
+ protected TestListener listener = new TestListener();
+ protected TestIntentCompiler compiler = new TestIntentCompiler();
+ protected TestIntentInstaller installer = new TestIntentInstaller();
+
+ @Before
+ public void setUp() {
+ manager = new IntentManager();
+ flowRuleService = new MockFlowRuleService();
+ manager.store = new SimpleIntentStore();
+ manager.batchService = new SimpleIntentBatchQueue();
+ manager.eventDispatcher = new TestEventDispatcher();
+ manager.trackerService = new TestIntentTracker();
+ manager.flowRuleService = flowRuleService;
+ service = manager;
+ extensionService = manager;
+
+ manager.activate();
+ service.addListener(listener);
+ extensionService.registerCompiler(MockIntent.class, compiler);
+ extensionService.registerInstaller(MockInstallableIntent.class, installer);
+
+ assertTrue("store should be empty",
+ Sets.newHashSet(service.getIntents()).isEmpty());
+ assertEquals(0L, flowRuleService.getFlowRuleCount());
+ }
+
+ @After
+ public void tearDown() {
+ // verify that all intents are parked and the batch operation is unblocked
+ Set<IntentState> parked = Sets.newHashSet(INSTALLED, WITHDRAWN, FAILED);
+ for (Intent i : service.getIntents()) {
+ IntentState state = service.getIntentState(i.id());
+ assertTrue("Intent " + i.id() + " is in invalid state " + state,
+ parked.contains(state));
+ }
+ //the batch has not yet been removed when we receive the last event
+ // FIXME: this doesn't guarantee to avoid the race
+ for (int tries = 0; tries < 10; tries++) {
+ if (manager.batchService.getPendingOperations().isEmpty() &&
+ manager.batchService.getCurrentOperations().isEmpty()) {
+ break;
+ }
+ delay(10);
+ }
+ assertTrue("There are still pending batch operations.",
+ manager.batchService.getPendingOperations().isEmpty());
+ assertTrue("There are still outstanding batch operations.",
+ manager.batchService.getCurrentOperations().isEmpty());
+
+ extensionService.unregisterCompiler(MockIntent.class);
+ extensionService.unregisterInstaller(MockInstallableIntent.class);
+ service.removeListener(listener);
+ manager.deactivate();
+ // TODO null the other refs?
+ }
+
+ @Test
+ public void submitIntent() {
+ flowRuleService.setFuture(true);
+
+ listener.setLatch(1, Type.SUBMITTED);
+ listener.setLatch(1, Type.INSTALLED);
+ Intent intent = new MockIntent(MockIntent.nextId());
+ service.submit(intent);
+ listener.await(Type.SUBMITTED);
+ listener.await(Type.INSTALLED);
+ assertEquals(1L, service.getIntentCount());
+ assertEquals(1L, flowRuleService.getFlowRuleCount());
+ }
+
+ @Test
+ public void withdrawIntent() {
+ flowRuleService.setFuture(true);
+
+ listener.setLatch(1, Type.INSTALLED);
+ Intent intent = new MockIntent(MockIntent.nextId());
+ service.submit(intent);
+ listener.await(Type.INSTALLED);
+ assertEquals(1L, service.getIntentCount());
+ assertEquals(1L, flowRuleService.getFlowRuleCount());
+
+ listener.setLatch(1, Type.WITHDRAWN);
+ service.withdraw(intent);
+ listener.await(Type.WITHDRAWN);
+ assertEquals(1L, service.getIntentCount());
+ assertEquals(0L, flowRuleService.getFlowRuleCount());
+ }
+
+ @Test
+ public void stressSubmitWithdraw() {
+ flowRuleService.setFuture(true);
+
+ int count = 500;
+
+ listener.setLatch(count, Type.INSTALLED);
+ listener.setLatch(count, Type.WITHDRAWN);
+
+ Intent intent = new MockIntent(MockIntent.nextId());
+ for (int i = 0; i < count; i++) {
+ service.submit(intent);
+ service.withdraw(intent);
+ }
+
+ listener.await(Type.INSTALLED);
+ listener.await(Type.WITHDRAWN);
+ assertEquals(1L, service.getIntentCount());
+ assertEquals(0L, flowRuleService.getFlowRuleCount());
+ }
+
+ @Test
+ public void replaceIntent() {
+ flowRuleService.setFuture(true);
+
+ MockIntent intent = new MockIntent(MockIntent.nextId());
+ listener.setLatch(1, Type.INSTALLED);
+ service.submit(intent);
+ listener.await(Type.INSTALLED);
+ assertEquals(1L, service.getIntentCount());
+ assertEquals(1L, manager.flowRuleService.getFlowRuleCount());
+
+ MockIntent intent2 = new MockIntent(MockIntent.nextId());
+ listener.setLatch(1, Type.WITHDRAWN);
+ listener.setLatch(1, Type.SUBMITTED);
+ listener.setLatch(1, Type.INSTALLED);
+ service.replace(intent.id(), intent2);
+ listener.await(Type.WITHDRAWN);
+ listener.await(Type.INSTALLED);
+ assertEquals(2L, service.getIntentCount());
+ assertEquals(1L, manager.flowRuleService.getFlowRuleCount());
+ assertEquals(intent2.number().intValue(),
+ flowRuleService.flows.iterator().next().priority());
+ }
+
+ private static class TestListener implements IntentListener {
+ final Multimap<IntentEvent.Type, IntentEvent> events = HashMultimap.create();
+ Map<IntentEvent.Type, CountDownLatch> latchMap = Maps.newHashMap();
+
+ @Override
+ public void event(IntentEvent event) {
+ events.put(event.type(), event);
+ if (latchMap.containsKey(event.type())) {
+ latchMap.get(event.type()).countDown();
+ }
+ }
+
+ public int getCounts(IntentEvent.Type type) {
+ return events.get(type).size();
+ }
+
+ public void setLatch(int count, IntentEvent.Type type) {
+ latchMap.put(type, new CountDownLatch(count));
+ }
+
+ public void await(IntentEvent.Type type) {
+ try {
+ latchMap.get(type).await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private static class TestIntentTracker implements ObjectiveTrackerService {
+ private TopologyChangeDelegate delegate;
+ @Override
+ public void setDelegate(TopologyChangeDelegate delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void unsetDelegate(TopologyChangeDelegate delegate) {
+ if (delegate.equals(this.delegate)) {
+ this.delegate = null;
+ }
+ }
+
+ @Override
+ public void addTrackedResources(IntentId intentId, Collection<NetworkResource> resources) {
+ //TODO
+ }
+
+ @Override
+ public void removeTrackedResources(IntentId intentId, Collection<NetworkResource> resources) {
+ //TODO
+ }
+ }
+
+ private static class MockIntent extends Intent {
+ private static AtomicLong counter = new AtomicLong(0);
+
+ private final Long number;
+ // Nothing new here
+ public MockIntent(Long number) {
+ super(id(MockIntent.class, number), APPID, null);
+ this.number = number;
+ }
+
+ public Long number() {
+ return number;
+ }
+
+ public static Long nextId() {
+ return counter.getAndIncrement();
+ }
+ }
+
+ private static class MockInstallableIntent extends MockIntent {
+ public MockInstallableIntent(Long number) {
+ super(number);
+ }
+
+ @Override
+ public boolean isInstallable() {
+ return true;
+ }
+ }
+
+ private static class TestIntentCompiler implements IntentCompiler<MockIntent> {
+ @Override
+ public List<Intent> compile(MockIntent intent, List<Intent> installable,
+ Set<LinkResourceAllocations> resources) {
+ return Lists.newArrayList(new MockInstallableIntent(intent.number()));
+ }
+ }
+
+ private static class TestIntentInstaller implements IntentInstaller<MockInstallableIntent> {
+ @Override
+ public List<FlowRuleBatchOperation> install(MockInstallableIntent intent) {
+ FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ }
+
+ @Override
+ public List<FlowRuleBatchOperation> uninstall(MockInstallableIntent intent) {
+ FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ }
+
+ @Override
+ public List<FlowRuleBatchOperation> replace(MockInstallableIntent oldIntent, MockInstallableIntent newIntent) {
+ FlowRule fr = new IntentTestsMocks.MockFlowRule(oldIntent.number().intValue());
+ FlowRule fr2 = new IntentTestsMocks.MockFlowRule(newIntent.number().intValue());
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr2));
+ return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+ }
+ }
+
+ /**
+ * Hamcrest matcher to check that a conllection of Intents contains an
+ * Intent with the specified Intent Id.
+ */
+ public static class EntryForIntentMatcher extends TypeSafeMatcher<Collection<Intent>> {
+ private final String id;
+
+ public EntryForIntentMatcher(String idValue) {
+ id = idValue;
+ }
+
+ @Override
+ public boolean matchesSafely(Collection<Intent> intents) {
+ return hasItem(Matchers.<Intent>hasProperty("id", equalTo(id))).matches(intents);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("an intent with id \" ").
+ appendText(id).
+ appendText("\"");
+ }
+ }
+}
diff --git a/core/net/src/test/java/org/onlab/onos/net/intent/impl/MockFlowRuleService.java b/core/net/src/test/java/org/onlab/onos/net/intent/impl/MockFlowRuleService.java
new file mode 100644
index 0000000..53097a0
--- /dev/null
+++ b/core/net/src/test/java/org/onlab/onos/net/intent/impl/MockFlowRuleService.java
@@ -0,0 +1,91 @@
+package org.onlab.onos.net.intent.impl;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import org.onlab.onos.core.ApplicationId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleListener;
+import org.onlab.onos.net.flow.FlowRuleService;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+
+public class MockFlowRuleService implements FlowRuleService {
+
+ private Future<CompletedBatchOperation> future;
+ final Set<FlowRule> flows = Sets.newHashSet();
+
+ public void setFuture(boolean success) {
+ future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet()));
+ }
+
+ @Override
+ public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ FlowRule fr = fbe.getTarget();
+ switch (fbe.getOperator()) {
+ case ADD:
+ flows.add(fr);
+ break;
+ case REMOVE:
+ flows.remove(fr);
+ break;
+ case MODIFY:
+ break;
+ default:
+ break;
+ }
+ }
+ return future;
+ }
+
+ @Override
+ public int getFlowRuleCount() {
+ return flows.size();
+ }
+
+ @Override
+ public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public void applyFlowRules(FlowRule... flowRules) {
+ }
+
+ @Override
+ public void removeFlowRules(FlowRule... flowRules) {
+ }
+
+ @Override
+ public void removeFlowRulesById(ApplicationId appId) {
+ }
+
+ @Override
+ public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
+ return null;
+ }
+
+ @Override
+ public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
+ return null;
+ }
+
+ @Override
+ public void addListener(FlowRuleListener listener) {
+
+ }
+
+ @Override
+ public void removeListener(FlowRuleListener listener) {
+
+ }
+}
+
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index d6d06d0..6ab5e12 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -340,7 +340,8 @@
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+ return Futures.immediateFuture(new CompletedBatchOperation(true,
+ Collections.<FlowRule>emptySet()));
}
DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
@@ -379,8 +380,8 @@
private ListenableFuture<CompletedBatchOperation>
storeBatchInternal(FlowRuleBatchOperation operation) {
- final List<StoredFlowEntry> toRemove = new ArrayList<>();
- final List<StoredFlowEntry> toAdd = new ArrayList<>();
+ final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
+ final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
DeviceId did = null;
@@ -396,14 +397,14 @@
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry != null) {
entry.setState(FlowEntryState.PENDING_REMOVE);
- toRemove.add(entry);
+ toRemove.add(batchEntry);
}
} else if (op.equals(FlowRuleOperation.ADD)) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
if (!flowEntries.containsEntry(deviceId, flowEntry)) {
flowEntries.put(deviceId, flowEntry);
- toAdd.add(flowEntry);
+ toAdd.add(batchEntry);
}
}
}
@@ -427,8 +428,8 @@
}
private void updateBackup(final DeviceId deviceId,
- final List<StoredFlowEntry> toAdd,
- final List<? extends FlowRule> list) {
+ final List<FlowRuleBatchEntry> toAdd,
+ final List<FlowRuleBatchEntry> list) {
Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
@@ -442,8 +443,9 @@
}
}
- private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
- updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
+ private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
+
+ updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
}
@Override
@@ -477,8 +479,9 @@
stored.setPackets(rule.packets());
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
- // update backup.
- updateBackup(did, Arrays.asList(stored));
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
+ updateBackup(did, Arrays.asList(entry));
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
@@ -515,7 +518,9 @@
try {
// This is where one could mark a rule as removed and still keep it in the store.
final boolean removed = flowEntries.remove(deviceId, rule);
- updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
+ updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
if (removed) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
@@ -687,15 +692,17 @@
}
// Task to update FlowEntries in backup HZ store
+ // TODO: Should be refactored to contain only one list and not
+ // toAdd and toRemove
private final class UpdateBackup implements Runnable {
private final DeviceId deviceId;
- private final List<StoredFlowEntry> toAdd;
- private final List<? extends FlowRule> toRemove;
+ private final List<FlowRuleBatchEntry> toAdd;
+ private final List<FlowRuleBatchEntry> toRemove;
public UpdateBackup(DeviceId deviceId,
- List<StoredFlowEntry> toAdd,
- List<? extends FlowRule> list) {
+ List<FlowRuleBatchEntry> toAdd,
+ List<FlowRuleBatchEntry> list) {
this.deviceId = checkNotNull(deviceId);
this.toAdd = checkNotNull(toAdd);
this.toRemove = checkNotNull(list);
@@ -707,7 +714,8 @@
log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
// Following should be rewritten using async APIs
- for (StoredFlowEntry entry : toAdd) {
+ for (FlowRuleBatchEntry bEntry : toAdd) {
+ final FlowRule entry = bEntry.getTarget();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
@@ -715,8 +723,8 @@
list.addAll(original);
}
- list.remove(entry);
- list.add(entry);
+ list.remove(bEntry.getTarget());
+ list.add((StoredFlowEntry) entry);
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
@@ -730,7 +738,8 @@
log.error("Updating backup failed.");
}
}
- for (FlowRule entry : toRemove) {
+ for (FlowRuleBatchEntry bEntry : toRemove) {
+ final FlowRule entry = bEntry.getTarget();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
@@ -738,7 +747,7 @@
list.addAll(original);
}
- list.remove(entry);
+ list.remove(bEntry.getTarget());
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
index 4c146d1..68cda00 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
@@ -25,7 +25,6 @@
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
-import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
@@ -82,10 +81,17 @@
}
@Override
- public Set<IntentOperations> getIntentOperations() {
- Set<IntentOperations> set = Sets.newHashSet(currentBatches);
- set.addAll((Collection) pendingBatches);
- return set;
+ public Set<IntentOperations> getPendingOperations() {
+ synchronized (this) {
+ return Sets.newHashSet(pendingBatches);
+ }
+ }
+
+ @Override
+ public Set<IntentOperations> getCurrentOperations() {
+ synchronized (this) {
+ return Sets.newHashSet(currentBatches);
+ }
}
@Override
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 622e2de..4cd31d5 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -263,19 +263,19 @@
@Override
public Future<CompletedBatchOperation> storeBatch(
FlowRuleBatchOperation batchOperation) {
- List<FlowRule> toAdd = new ArrayList<>();
- List<FlowRule> toRemove = new ArrayList<>();
+ List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
+ List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
final FlowRule flowRule = entry.getTarget();
if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
storeFlowRule(flowRule);
- toAdd.add(flowRule);
+ toAdd.add(entry);
}
} else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
deleteFlowRule(flowRule);
- toRemove.add(flowRule);
+ toRemove.add(entry);
}
} else {
throw new UnsupportedOperationException("Unsupported operation type");
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
index b8d22bf..f92eca0 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
@@ -15,7 +15,7 @@
*/
package org.onlab.onos.store.trivial.impl;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,7 +25,8 @@
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
-import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -37,7 +38,8 @@
public class SimpleIntentBatchQueue implements IntentBatchService {
private final Logger log = getLogger(getClass());
- private final Set<IntentOperations> pendingBatches = new HashSet<>();
+ private final Queue<IntentOperations> pendingBatches = new LinkedList<>();
+ private final Set<IntentOperations> currentBatches = Sets.newHashSet();
private IntentBatchDelegate delegate;
@Activate
@@ -53,18 +55,42 @@
@Override
public void addIntentOperations(IntentOperations operations) {
checkState(delegate != null, "No delegate set");
- pendingBatches.add(operations);
- delegate.execute(operations);
+ synchronized (this) {
+ pendingBatches.add(operations);
+ if (currentBatches.isEmpty()) {
+ IntentOperations work = pendingBatches.poll();
+ currentBatches.add(work);
+ delegate.execute(work);
+ }
+ }
}
@Override
public void removeIntentOperations(IntentOperations operations) {
- pendingBatches.remove(operations);
+ // we allow at most one outstanding batch at a time
+ synchronized (this) {
+ checkState(currentBatches.remove(operations), "Operations not found in current ops.");
+ checkState(currentBatches.isEmpty(), "More than one outstanding batch.");
+ IntentOperations work = pendingBatches.poll();
+ if (work != null) {
+ currentBatches.add(work);
+ delegate.execute(work);
+ }
+ }
}
@Override
- public Set<IntentOperations> getIntentOperations() {
- return ImmutableSet.copyOf(pendingBatches);
+ public Set<IntentOperations> getPendingOperations() {
+ synchronized (this) {
+ return Sets.newHashSet(pendingBatches);
+ }
+ }
+
+ @Override
+ public Set<IntentOperations> getCurrentOperations() {
+ synchronized (this) {
+ return Sets.newHashSet(currentBatches);
+ }
}
@Override
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index cc33f20..793c9f3 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -53,6 +53,8 @@
import org.projectfloodlight.openflow.types.VlanVid;
import org.slf4j.Logger;
+import java.util.Optional;
+
/**
* Builder for OpenFlow flow mods based on FlowRules.
*/
@@ -63,6 +65,7 @@
private final OFFactory factory;
private final FlowRule flowRule;
private final TrafficSelector selector;
+ protected final Long xid;
/**
* Creates a new flow mod builder.
@@ -71,12 +74,13 @@
* @param factory the OpenFlow factory to use to build the flow mod
* @return the new flow mod builder
*/
- public static FlowModBuilder builder(FlowRule flowRule, OFFactory factory) {
+ public static FlowModBuilder builder(FlowRule flowRule,
+ OFFactory factory, Optional<Long> xid) {
switch (factory.getVersion()) {
case OF_10:
- return new FlowModBuilderVer10(flowRule, factory);
+ return new FlowModBuilderVer10(flowRule, factory, xid);
case OF_13:
- return new FlowModBuilderVer13(flowRule, factory);
+ return new FlowModBuilderVer13(flowRule, factory, xid);
default:
throw new UnsupportedOperationException(
"No flow mod builder for protocol version " + factory.getVersion());
@@ -89,10 +93,12 @@
* @param flowRule the flow rule to transform into a flow mod
* @param factory the OpenFlow factory to use to build the flow mod
*/
- protected FlowModBuilder(FlowRule flowRule, OFFactory factory) {
+ protected FlowModBuilder(FlowRule flowRule, OFFactory factory, Optional<Long> xid) {
this.factory = factory;
this.flowRule = flowRule;
this.selector = flowRule.selector();
+ this.xid = xid.orElse((long) 0);
+
}
/**
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilderVer10.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilderVer10.java
index ec315f5..341b718 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilderVer10.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilderVer10.java
@@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.TrafficTreatment;
@@ -62,8 +63,9 @@
* @param flowRule the flow rule to transform into a flow mod
* @param factory the OpenFlow factory to use to build the flow mod
*/
- protected FlowModBuilderVer10(FlowRule flowRule, OFFactory factory) {
- super(flowRule, factory);
+ protected FlowModBuilderVer10(FlowRule flowRule,
+ OFFactory factory, Optional<Long> xid) {
+ super(flowRule, factory, xid);
this.treatment = flowRule.treatment();
}
@@ -77,7 +79,7 @@
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowAdd fm = factory().buildFlowAdd()
- .setXid(cookie)
+ .setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
@@ -98,7 +100,7 @@
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory().buildFlowModify()
- .setXid(cookie)
+ .setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
@@ -118,7 +120,7 @@
long cookie = flowRule().id().value();
OFFlowDelete fm = factory().buildFlowDelete()
- .setXid(cookie)
+ .setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilderVer13.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilderVer13.java
index 6f595d0..f2ad959 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilderVer13.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilderVer13.java
@@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.TrafficTreatment;
@@ -70,8 +71,8 @@
* @param flowRule the flow rule to transform into a flow mod
* @param factory the OpenFlow factory to use to build the flow mod
*/
- protected FlowModBuilderVer13(FlowRule flowRule, OFFactory factory) {
- super(flowRule, factory);
+ protected FlowModBuilderVer13(FlowRule flowRule, OFFactory factory, Optional<Long> xid) {
+ super(flowRule, factory, xid);
this.treatment = flowRule.treatment();
}
@@ -93,7 +94,7 @@
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowAdd fm = factory().buildFlowAdd()
- .setXid(cookie)
+ .setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
@@ -117,7 +118,7 @@
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory().buildFlowModify()
- .setXid(cookie)
+ .setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
@@ -140,7 +141,7 @@
long cookie = flowRule().id().value();
OFFlowDelete fm = factory().buildFlowDelete()
- .setXid(cookie)
+ .setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
//.setActions(actions) //FIXME do we want to send actions in flowdel?
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 214bde3..f09748e 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -19,7 +19,6 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ExecutionList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -76,6 +75,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -122,7 +122,7 @@
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
- private final AtomicLong xidCounter = new AtomicLong(0);
+ private final AtomicLong xidCounter = new AtomicLong(1);
/**
* Creates an OpenFlow host provider.
@@ -164,7 +164,8 @@
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
- sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory()).buildFlowAdd());
+ sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
+ Optional.empty()).buildFlowAdd());
}
@@ -178,7 +179,8 @@
private void removeRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
- sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory()).buildFlowDel());
+ sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
+ Optional.empty()).buildFlowDel());
}
@Override
@@ -211,7 +213,10 @@
return failed;
}
sws.add(new Dpid(sw.getId()));
- FlowModBuilder builder = FlowModBuilder.builder(flowRule, sw.factory());
+ Long flowModXid = xidCounter.getAndIncrement();
+ FlowModBuilder builder =
+ FlowModBuilder.builder(flowRule, sw.factory(),
+ Optional.of(flowModXid));
OFFlowMod mod = null;
switch (fbe.getOperator()) {
case ADD:
@@ -228,7 +233,7 @@
}
if (mod != null) {
mods.put(mod, sw);
- fmXids.put(xidCounter.getAndIncrement(), fbe);
+ fmXids.put(flowModXid, fbe);
} else {
log.error("Conversion of flowrule {} failed.", flowRule);
}
@@ -237,6 +242,7 @@
for (Long xid : fmXids.keySet()) {
pendingFMs.put(xid, installation);
}
+
pendingFutures.put(installation.xid(), installation);
for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
OpenFlowSwitch sw = entry.getValue();
@@ -368,13 +374,13 @@
private final AtomicBoolean ok = new AtomicBoolean(true);
private final Map<Long, FlowRuleBatchEntry> fms;
+
private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
+ private Long failedId;
private final CountDownLatch countDownLatch;
private BatchState state;
- private final ExecutionList executionList = new ExecutionList();
-
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
this.xid = xidCounter.getAndIncrement();
this.state = BatchState.STARTED;
@@ -393,6 +399,7 @@
removeRequirement(dpid);
FlowEntry fe = null;
FlowRuleBatchEntry fbe = fms.get(msg.getXid());
+ failedId = fbe.id();
FlowRule offending = fbe.getTarget();
//TODO handle specific error msgs
switch (msg.getErrType()) {
@@ -492,8 +499,11 @@
public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
this.state = BatchState.FINISHED;
- CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
- //FIXME do cleanup here
+ Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
+ CompletedBatchOperation result =
+ new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
+ //FIXME do cleanup here (moved by BOC)
+ cleanUp();
return result;
}
@@ -503,8 +513,11 @@
TimeoutException {
if (countDownLatch.await(timeout, unit)) {
this.state = BatchState.FINISHED;
- CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
- // FIXME do cleanup here
+ Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
+ CompletedBatchOperation result =
+ new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
+ // FIXME do cleanup here (moved by BOC)
+ cleanUp();
return result;
}
throw new TimeoutException();
@@ -522,8 +535,8 @@
private void removeRequirement(Dpid dpid) {
countDownLatch.countDown();
sws.remove(dpid);
- //FIXME don't do cleanup here
- cleanUp();
+ //FIXME don't do cleanup here (moved by BOC)
+ //cleanUp();
}
}