Updating Intent Manager to deal with failures.
Added ids to Flow batch futures.
Adding some basic unit tests for IntentManger
Adding failedIds to the completedOperation in FlowRuleManager
Change-Id: I7645cead193299f70d319d254cd1e82d96909e7b
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 70878c5..361c2ea 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -49,9 +49,9 @@
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -330,12 +330,12 @@
batches.addAll(getInstaller(installable).install(installable));
} catch (IntentException e) {
log.warn("Unable to install intent {} due to:", update.newIntent().id(), e);
+ trackerService.removeTrackedResources(update.newIntent().id(),
+ installable.resources());
//FIXME we failed... intent should be recompiled
- // TODO: remove resources
- // recompile!!!
}
}
- update.setBatches(batches);
+ update.addBatches(batches);
}
/**
@@ -348,80 +348,32 @@
if (!update.oldIntent().equals(update.newIntent())) {
update.setState(update.oldIntent(), WITHDRAWING);
} // else newIntent is FAILED
- uninstallIntent(update);
-
- // If all went well, disassociate the top-level intent with its
- // installable derivatives and mark it as withdrawn.
- // FIXME need to clean up
- //store.removeInstalledIntents(intent.id());
+ update.addBatches(uninstallIntent(update.oldIntent(), update.oldInstallables()));
}
/**
* Uninstalls all installable intents associated with the given intent.
*
- * @param update intent update
+ * @param intent intent
+ * @param installables installable intents
+ * @return list of batches to uninstall intent
*/
- //FIXME: need to handle next state properly
- private void uninstallIntent(IntentUpdate update) {
- if (update.oldInstallables == null) {
- return;
+ private List<FlowRuleBatchOperation> uninstallIntent(Intent intent, List<Intent> installables) {
+ if (installables == null) {
+ return Collections.emptyList();
}
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
- for (Intent installable : update.oldInstallables()) {
- trackerService.removeTrackedResources(update.oldIntent().id(),
+ for (Intent installable : installables) {
+ trackerService.removeTrackedResources(intent.id(),
installable.resources());
try {
batches.addAll(getInstaller(installable).uninstall(installable));
} catch (IntentException e) {
- log.warn("Unable to uninstall intent {} due to:", update.oldIntent().id(), e);
+ log.warn("Unable to uninstall intent {} due to:", intent.id(), e);
// TODO: this should never happen. but what if it does?
}
}
- update.setBatches(batches);
- // FIXME: next state for old is WITHDRAWN or FAILED
- }
-
- /**
- * Recompiles the specified intent.
- *
- * @param update intent update
- */
- // FIXME: update this to work
- private void executeRecompilingPhase(IntentUpdate update) {
- Intent intent = update.newIntent();
- // Indicate that the intent is entering the recompiling phase.
- store.setState(intent, RECOMPILING);
-
- List<FlowRuleBatchOperation> batches = Lists.newArrayList();
- try {
- // Compile the intent into installable derivatives.
- List<Intent> installable = compileIntent(intent, update);
-
- // If all went well, compare the existing list of installable
- // intents with the newly compiled list. If they are the same,
- // bail, out since the previous approach was determined not to
- // be viable.
- // FIXME do we need this?
- List<Intent> originalInstallable = store.getInstallableIntents(intent.id());
-
- //FIXME let's be smarter about how we perform the update
- //batches.addAll(uninstallIntent(intent, null));
-
- if (Objects.equals(originalInstallable, installable)) {
- eventDispatcher.post(store.setState(intent, FAILED));
- } else {
- // Otherwise, re-associate the newly compiled installable intents
- // with the top-level intent and kick off installing phase.
- store.setInstallableIntents(intent.id(), installable);
- // FIXME commented out for now
- //batches.addAll(executeInstallingPhase(update));
- }
- } catch (Exception e) {
- log.warn("Unable to recompile intent {} due to:", intent.id(), e);
-
- // If compilation failed, mark the intent as failed.
- eventDispatcher.post(store.setState(intent, FAILED));
- }
+ return batches;
}
/**
@@ -442,9 +394,10 @@
for (int i = 0; i < update.oldInstallables().size(); i++) {
Intent oldInstallable = update.oldInstallables().get(i);
Intent newInstallable = update.newInstallables().get(i);
- if (oldInstallable.equals(newInstallable)) {
- continue;
- }
+ //FIXME revisit this
+// if (oldInstallable.equals(newInstallable)) {
+// continue;
+// }
checkArgument(oldInstallable.getClass().equals(newInstallable.getClass()),
"Installable Intent type mismatch.");
trackerService.removeTrackedResources(update.oldIntent().id(), oldInstallable.resources());
@@ -454,9 +407,12 @@
} catch (IntentException e) {
log.warn("Unable to update intent {} due to:", update.oldIntent().id(), e);
//FIXME... we failed. need to uninstall (if same) or revert (if different)
+ trackerService.removeTrackedResources(update.newIntent().id(), newInstallable.resources());
+ update.setState(update.newIntent(), FAILED);
+ batches = uninstallIntent(update.oldIntent(), update.oldInstallables());
}
}
- update.setBatches(batches);
+ update.addBatches(batches);
}
/**
@@ -541,13 +497,12 @@
}
/**
- * TODO.
- * @param op intent operation
- * @return intent update
+ * TODO. rename this...
+ * @param update intent update
*/
- private IntentUpdate processIntentOperation(IntentOperation op) {
- IntentUpdate update = new IntentUpdate(op);
+ private void processIntentUpdate(IntentUpdate update) {
+ // check to see if the intent needs to be compiled or recompiled
if (update.newIntent() != null) {
executeCompilingPhase(update);
}
@@ -559,32 +514,29 @@
} else if (update.oldInstallables() != null) {
executeWithdrawingPhase(update);
} else {
- if (update.oldIntent() != null) {
- // TODO this shouldn't happen
- return update; //FIXME
+ if (update.oldIntent() != null &&
+ !update.oldIntent().equals(update.newIntent())) {
+ // removing failed intent
+ update.setState(update.oldIntent(), WITHDRAWING);
}
- if (update.newIntent() != null) {
- // TODO assert that next state is failed
- return update; //FIXME
- }
+// if (update.newIntent() != null) {
+// // TODO assert that next state is failed
+// }
}
-
- return update;
}
// TODO comments...
private class IntentUpdate {
- private final IntentOperation op;
private final Intent oldIntent;
private final Intent newIntent;
private final Map<Intent, IntentState> stateMap = Maps.newHashMap();
private final List<Intent> oldInstallables;
private List<Intent> newInstallables;
- private List<FlowRuleBatchOperation> batches;
+ private final List<FlowRuleBatchOperation> batches = Lists.newLinkedList();
+ private int currentBatch = 0; // TODO: maybe replace with an iterator
IntentUpdate(IntentOperation op) {
- this.op = op;
switch (op.type()) {
case SUBMIT:
newIntent = op.intent();
@@ -600,7 +552,7 @@
break;
case UPDATE:
oldIntent = store.getIntent(op.intentId());
- newIntent = oldIntent; //InnerAssignment: Inner assignments should be avoided.
+ newIntent = oldIntent;
break;
default:
oldIntent = null;
@@ -617,7 +569,6 @@
// fetch the old intent's installables from the store
if (oldIntent != null) {
oldInstallables = store.getInstallableIntents(oldIntent.id());
- // TODO: remove intent from store after uninstall
} else {
oldInstallables = null;
}
@@ -644,12 +595,72 @@
store.setInstallableIntents(newIntent.id(), installables);
}
+ boolean isComplete() {
+ return currentBatch >= batches.size();
+ }
+
+ FlowRuleBatchOperation currentBatch() {
+ return !isComplete() ? batches.get(currentBatch) : null;
+ }
+
+ void incrementBatch(boolean success) {
+ if (success) { // actually increment
+ if (++currentBatch == batches.size()) {
+ finalizeStates();
+ }
+ } else { // the current batch has failed, so recompile
+ // remove the current batch and all remaining
+ for (int i = currentBatch; i < batches.size(); i++) {
+ batches.remove(i);
+ }
+ if (oldIntent != null) {
+ executeWithdrawingPhase(this); // remove the old intent
+ }
+ if (newIntent != null) {
+ setState(newIntent, FAILED);
+ batches.addAll(uninstallIntent(newIntent, newInstallables()));
+ }
+
+ // FIXME: should we try to recompile?
+ }
+ }
+
+ // FIXME make sure this is called!!!
+ private void finalizeStates() {
+ for (Intent intent : stateMap.keySet()) {
+ switch (getState(intent)) {
+ case INSTALLING:
+ setState(intent, INSTALLED);
+ break;
+ case WITHDRAWING:
+ setState(intent, WITHDRAWN);
+ store.removeInstalledIntents(intent.id());
+ //store.removeIntent(intent.id()); // FIXME we die a horrible death here
+ break;
+ case FAILED:
+ store.removeInstalledIntents(intent.id());
+ break;
+
+ // FALLTHROUGH to default from here
+ case SUBMITTED:
+ case COMPILING:
+ case RECOMPILING:
+ case WITHDRAWN:
+ case INSTALLED:
+ default:
+ //FIXME clean this up (we shouldn't ever get here)
+ log.warn("Bad state: {} for {}", getState(intent), intent);
+ break;
+ }
+ }
+ }
+
List<FlowRuleBatchOperation> batches() {
return batches;
}
- void setBatches(List<FlowRuleBatchOperation> batches) {
- this.batches = batches;
+ void addBatches(List<FlowRuleBatchOperation> batches) {
+ this.batches.addAll(batches);
}
IntentState getState(Intent intent) {
@@ -659,7 +670,7 @@
void setState(Intent intent, IntentState newState) {
// TODO: clean this up, or set to debug
IntentState oldState = stateMap.get(intent);
- log.info("intent id: {}, old state: {}, new state: {}",
+ log.debug("intent id: {}, old state: {}, new state: {}",
intent.id(), oldState, newState);
stateMap.put(intent, newState);
@@ -674,143 +685,72 @@
}
}
- private static List<FlowRuleBatchOperation> mergeBatches(Map<IntentOperation,
- IntentUpdate> intentUpdates) {
- //TODO test this.
- List<FlowRuleBatchOperation> batches = Lists.newArrayList();
- for (IntentUpdate update : intentUpdates.values()) {
- if (update.batches() == null) {
- continue;
- }
- int i = 0;
- for (FlowRuleBatchOperation batch : update.batches()) {
- if (i == batches.size()) {
- batches.add(batch);
- } else {
- FlowRuleBatchOperation existing = batches.get(i);
- existing.addAll(batch);
- }
- i++;
- }
- }
- return batches;
- }
-
- // Auxiliary runnable to perform asynchronous tasks.
- private class IntentTask implements Runnable {
- private final IntentOperations operations;
-
- public IntentTask(IntentOperations operations) {
- this.operations = operations;
- }
-
- @Override
- public void run() {
- Map<IntentOperation, IntentUpdate> intentUpdates = Maps.newHashMap();
- for (IntentOperation op : operations.operations()) {
- intentUpdates.put(op, processIntentOperation(op));
- }
- List<FlowRuleBatchOperation> batches = mergeBatches(intentUpdates);
- monitorExecutor.execute(new IntentInstallMonitor(operations, intentUpdates, batches));
- }
- }
-
private class IntentInstallMonitor implements Runnable {
private static final long TIMEOUT = 5000; // ms
- private final IntentOperations ops;
- private final Map<IntentOperation, IntentUpdate> intentUpdateMap;
- private final List<FlowRuleBatchOperation> work;
- private Future<CompletedBatchOperation> future;
- private final long startTime = System.currentTimeMillis();
- private final long endTime = startTime + TIMEOUT;
+ private static final int MAX_ATTEMPTS = 3;
- public IntentInstallMonitor(IntentOperations ops,
- Map<IntentOperation, IntentUpdate> intentUpdateMap,
- List<FlowRuleBatchOperation> work) {
+ private final IntentOperations ops;
+ private final List<IntentUpdate> intentUpdates = Lists.newArrayList();
+
+ private Future<CompletedBatchOperation> future;
+ private long startTime = System.currentTimeMillis();
+ private long endTime = startTime + TIMEOUT;
+ private int installAttempt;
+
+ public IntentInstallMonitor(IntentOperations ops) {
this.ops = ops;
- this.intentUpdateMap = intentUpdateMap;
- this.work = work;
+ }
+
+ private void buildIntentUpdates() {
+ for (IntentOperation op : ops.operations()) {
+ IntentUpdate update = new IntentUpdate(op);
+ intentUpdates.add(update);
+ processIntentUpdate(update);
+ }
future = applyNextBatch();
}
/**
- * Applies the next batch, and returns the future.
+ * Builds and applies the next batch, and returns the future.
*
* @return Future for next batch
*/
private Future<CompletedBatchOperation> applyNextBatch() {
- if (work.isEmpty()) {
- return null;
+ //TODO test this. (also, maybe save this batch)
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
+ for (IntentUpdate update : intentUpdates) {
+ if (!update.isComplete()) {
+ batch.addAll(update.currentBatch());
+ }
}
- FlowRuleBatchOperation batch = work.remove(0);
- return flowRuleService.applyBatch(batch);
+ return (batch.size() > 0) ? flowRuleService.applyBatch(batch) : null;
}
- /**
- * Update the intent store with the next status for this intent.
- */
- private void updateIntents() {
- // FIXME we assume everything passes for now.
- for (IntentUpdate update : intentUpdateMap.values()) {
- for (Intent intent : update.stateMap().keySet()) {
- switch (update.getState(intent)) {
- case INSTALLING:
- update.setState(intent, INSTALLED);
- break;
- case WITHDRAWING:
- update.setState(intent, WITHDRAWN);
- // Fall-through
- case FAILED:
- store.removeInstalledIntents(intent.id());
- break;
-
- case SUBMITTED:
- case COMPILING:
- case RECOMPILING:
- case WITHDRAWN:
- case INSTALLED:
- default:
- //FIXME clean this up (we shouldn't ever get here)
- log.warn("Bad state: {} for {}", update.getState(intent), intent);
- break;
- }
+ private void updateBatches(CompletedBatchOperation completed) {
+ if (completed.isSuccess()) {
+ for (IntentUpdate update : intentUpdates) {
+ update.incrementBatch(true);
}
- }
- /*
- for (IntentOperation op : ops.operations()) {
- switch (op.type()) {
- case SUBMIT:
- store.setState(op.intent(), INSTALLED);
- break;
- case WITHDRAW:
- Intent intent = store.getIntent(op.intentId());
- store.setState(intent, WITHDRAWN);
- break;
- case REPLACE:
- store.setState(op.intent(), INSTALLED);
- intent = store.getIntent(op.intentId());
- store.setState(intent, WITHDRAWN);
- break;
- case UPDATE:
- intent = store.getIntent(op.intentId());
- store.setState(intent, INSTALLED);
- break;
- default:
- break;
- }
- }
- */
- /*
- if (nextState == RECOMPILING) {
- eventDispatcher.post(store.setState(intent, FAILED));
- // FIXME try to recompile
-// executor.execute(new IntentTask(nextState, intent));
- } else if (nextState == INSTALLED || nextState == WITHDRAWN) {
- eventDispatcher.post(store.setState(intent, nextState));
} else {
- log.warn("Invalid next intent state {} for intent {}", nextState, intent);
- }*/
+ // entire batch has been reverted...
+ log.warn("Failed items: {}", completed.failedItems());
+
+ for (Long id : completed.failedIds()) {
+ IntentId targetId = IntentId.valueOf(id);
+ for (IntentUpdate update : intentUpdates) {
+ List<Intent> installables = Lists.newArrayList(update.newInstallables());
+ installables.addAll(update.oldInstallables());
+ for (Intent intent : installables) {
+ if (intent.id().equals(targetId)) {
+ update.incrementBatch(false);
+ break;
+ }
+ }
+ }
+ // don't increment the non-failed items, as they have been reverted.
+ }
+ }
}
/**
@@ -822,34 +762,60 @@
}
try {
CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
- if (completed.isSuccess()) {
- future = applyNextBatch();
- } else {
- // TODO check if future succeeded and if not report fail items
- log.warn("Failed items: {}", completed.failedItems());
- // FIXME revert.... by submitting a new batch
- //uninstallIntent(intent, RECOMPILING);
- }
+ updateBatches(completed);
+ future = applyNextBatch();
} catch (TimeoutException | InterruptedException | ExecutionException te) {
//TODO look into error message
- log.debug("Intallations of intent {} is still pending", ops);
+ log.debug("Installation of intents are still pending: {}", ops);
}
}
+ private void retry() {
+ if (future.cancel(true)) { // cancel success; batch is reverted
+ // reset the timer
+ endTime = System.currentTimeMillis() + TIMEOUT;
+ if (installAttempt++ >= MAX_ATTEMPTS) {
+ log.warn("Install request timed out: {}", ops);
+ for (IntentUpdate update : intentUpdates) {
+ update.incrementBatch(false);
+ }
+ } // else just resubmit the work
+ future = applyNextBatch();
+ monitorExecutor.submit(this);
+ } else {
+ // FIXME
+ // cancel failed... batch is broken; shouldn't happen!
+ // we could manually reverse everything
+ // ... or just core dump and send email to Ali
+ batchService.removeIntentOperations(ops);
+ }
+ }
+
+ boolean isComplete() {
+ // TODO: actually check with the intent update?
+ return future == null;
+ }
+
@Override
public void run() {
- processFutures();
- if (future == null) {
- // woohoo! we are done!
- updateIntents();
- batchService.removeIntentOperations(ops);
- } else if (endTime < System.currentTimeMillis()) {
- log.warn("Install request timed out");
-// future.cancel(true);
- // TODO retry and/or report the failure
- } else {
- // resubmit ourselves if we are not done yet
- monitorExecutor.submit(this);
+ try {
+ if (intentUpdates.isEmpty()) {
+ // this should only be called on the first iteration
+ // note: this a "expensive", so it is not done in the constructor
+ buildIntentUpdates();
+ }
+ processFutures();
+ if (isComplete()) {
+ // there are no outstanding batches; we are done
+ batchService.removeIntentOperations(ops);
+ } else if (endTime < System.currentTimeMillis()) {
+ retry();
+ } else {
+ // we are not done yet, yield the thread by resubmitting ourselves
+ monitorExecutor.submit(this);
+ }
+ } catch (Exception e) {
+ log.error("Error submitting batches:", e);
}
}
}
@@ -859,7 +825,7 @@
public void execute(IntentOperations operations) {
log.info("Execute operations: {}", operations);
//FIXME: perhaps we want to track this task so that we can cancel it.
- executor.execute(new IntentTask(operations));
+ monitorExecutor.execute(new IntentInstallMonitor(operations));
}
@Override