Refactored intermediate IntentUpdate classes
Change-Id: I3d4a435ef4aa97559d5407d49f45519098c3f193
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 373b629..7325dd5 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -15,23 +15,9 @@
*/
package org.onosproject.net.intent.impl;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -42,8 +28,8 @@
import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
-import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentBatchDelegate;
@@ -61,17 +47,25 @@
import org.onosproject.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.namedThreads;
-import static org.onosproject.net.intent.IntentState.FAILED;
-import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
-import static org.onosproject.net.intent.IntentState.WITHDRAW_REQ;
+import static org.onosproject.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -285,6 +279,23 @@
return installable;
}
+ //TODO javadoc
+ //FIXME
+ FlowRuleOperations coordinate(List<Intent> installables) {
+ //List<FlowRuleBatchOperation> batches = new ArrayList<>(installables.size());
+ for (Intent installable : installables) {
+ try {
+ registerSubclassInstallerIfNeeded(installable);
+ //FIXME need to migrate installers to FlowRuleOperations
+ // FIXME need to aggregate the FlowRuleOperations across installables
+ getInstaller(installable).install2(installable).build(null/*FIXME*/);
+ } catch (Exception e) { // TODO this should be IntentException
+ throw new FlowRuleBatchOperationConversionException(null/*FIXME*/, e);
+ }
+ }
+ return null;
+ }
+
/**
* Uninstalls all installable intents associated with the given intent.
*
@@ -292,19 +303,21 @@
* @param installables installable intents
* @return list of batches to uninstall intent
*/
- List<FlowRuleBatchOperation> uninstallIntent(Intent intent, List<Intent> installables) {
+ //FIXME
+ FlowRuleOperations uninstallIntent(Intent intent, List<Intent> installables) {
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
for (Intent installable : installables) {
trackerService.removeTrackedResources(intent.id(),
installable.resources());
try {
- batches.addAll(getInstaller(installable).uninstall(installable));
+ // FIXME need to aggregate the FlowRuleOperations across installables
+ getInstaller(installable).uninstall2(installable).build(null/*FIXME*/);
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to:", intent.id(), e);
// TODO: this should never happen. but what if it does?
}
}
- return batches;
+ return null; //FIXME
}
/**
@@ -414,12 +427,11 @@
// TODO: simplify the branching statements
private IntentUpdate createIntentUpdate(IntentData intentData) {
- IntentData currentState = store.getIntentData(intentData.key());
switch (intentData.state()) {
case INSTALL_REQ:
- return new InstallRequest(this, intentData.intent(), currentState);
+ return new InstallRequest(this, intentData);
case WITHDRAW_REQ:
- return new WithdrawRequest(this, intentData.intent(), currentState);
+ return new WithdrawRequest(this, intentData);
// fallthrough
case COMPILING:
case INSTALLING:
@@ -430,21 +442,12 @@
case FAILED:
default:
// illegal state
- return new DoNothing();
+ return new CompilingFailed(intentData);
}
}
- List<FlowRuleBatchOperation> convert(List<Intent> installables) {
- List<FlowRuleBatchOperation> batches = new ArrayList<>(installables.size());
- for (Intent installable : installables) {
- try {
- registerSubclassInstallerIfNeeded(installable);
- batches.addAll(getInstaller(installable).install(installable));
- } catch (Exception e) { // TODO this should be IntentException
- throw new FlowRuleBatchOperationConversionException(batches, e);
- }
- }
- return batches;
+ private Future<CompletedIntentUpdate> submitIntentData(IntentData data) {
+ return workerExecutor.submit(new IntentWorker(data));
}
private class IntentBatchPreprocess implements Runnable {
@@ -476,15 +479,13 @@
@Override
public void run() {
try {
- // 1. wrap each intentdata in a runnable and submit
- List<Future<IntentUpdate>> updates = createIntentUpdates();
- // TODO
- // 2. wait for completion of all the work
- // 3. accumulate results and submit batch write of IntentData to store
- // (we can also try to update these individually)
-
-
- //new IntentBatchApplyFirst(data, processIntentUpdates(updates), endTime, 0, null).run();
+ /*
+ 1. wrap each intentdata in a runnable and submit
+ 2. wait for completion of all the work
+ 3. accumulate results and submit batch write of IntentData to store
+ (we can also try to update these individually)
+ */
+ submitUpdates(waitForFutures(createIntentUpdates()));
} catch (Exception e) {
log.error("Error submitting batches:", e);
// FIXME incomplete Intents should be cleaned up
@@ -498,18 +499,33 @@
}
}
- private List<Future<IntentUpdate>> createIntentUpdates() {
+ private List<Future<CompletedIntentUpdate>> createIntentUpdates() {
return data.stream()
- .map(IntentManager.this::submitIntentData)
- .collect(Collectors.toList());
+ .map(IntentManager.this::submitIntentData)
+ .collect(Collectors.toList());
+ }
+
+ private List<CompletedIntentUpdate> waitForFutures(List<Future<CompletedIntentUpdate>> futures) {
+ ImmutableList.Builder<CompletedIntentUpdate> updateBuilder = ImmutableList.builder();
+ for (Future<CompletedIntentUpdate> future : futures) {
+ try {
+ updateBuilder.add(future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ //FIXME
+ log.warn("Future failed: {}", e);
+ }
+ }
+ return updateBuilder.build();
+ }
+
+ private void submitUpdates(List<CompletedIntentUpdate> updates) {
+ store.batchWrite(updates.stream()
+ .map(CompletedIntentUpdate::data)
+ .collect(Collectors.toList()));
}
}
- private Future<IntentUpdate> submitIntentData(IntentData data) {
- return workerExecutor.submit(new IntentWorker(data));
- }
-
- private class IntentWorker implements Callable<IntentUpdate> {
+ private final class IntentWorker implements Callable<CompletedIntentUpdate> {
private final IntentData data;
@@ -518,7 +534,7 @@
}
@Override
- public IntentUpdate call() throws Exception {
+ public CompletedIntentUpdate call() throws Exception {
IntentUpdate update = createIntentUpdate(data);
Optional<IntentUpdate> currentPhase = Optional.of(update);
IntentUpdate previousPhase = update;
@@ -527,167 +543,7 @@
previousPhase = currentPhase.get();
currentPhase = previousPhase.execute();
}
- return previousPhase;
- }
- }
-
-
- // TODO: better naming
- private class IntentBatchApplyFirst extends IntentBatchPreprocess {
-
- protected final List<CompletedIntentUpdate> intentUpdates;
- protected final int installAttempt;
- protected Future<CompletedBatchOperation> future;
-
- IntentBatchApplyFirst(Collection<IntentData> operations, List<CompletedIntentUpdate> intentUpdates,
- long endTime, int installAttempt, Future<CompletedBatchOperation> future) {
- super(operations, endTime);
- this.intentUpdates = ImmutableList.copyOf(intentUpdates);
- this.future = future;
- this.installAttempt = installAttempt;
- }
-
- @Override
- public void run() {
- Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
- new IntentBatchProcessFutures(data, intentUpdates, endTime, installAttempt, future).run();
- }
-
- /**
- * Builds and applies the next batch, and returns the future.
- *
- * @return Future for next batch
- */
- protected Future<CompletedBatchOperation> applyNextBatch(List<CompletedIntentUpdate> updates) {
- //TODO test this. (also, maybe save this batch)
-
- FlowRuleBatchOperation batch = createFlowRuleBatchOperation(updates);
- if (batch.size() > 0) {
- //FIXME apply batch might throw an exception
- return flowRuleService.applyBatch(batch);
- } else {
- return null;
- }
- }
-
- private FlowRuleBatchOperation createFlowRuleBatchOperation(List<CompletedIntentUpdate> intentUpdates) {
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList(), null, 0);
- for (CompletedIntentUpdate update : intentUpdates) {
- FlowRuleBatchOperation currentBatch = update.currentBatch();
- if (currentBatch != null) {
- batch.addAll(currentBatch);
- }
- }
- return batch;
- }
-
- protected void abandonShip() {
- // the batch has failed
- // TODO: maybe we should do more?
- log.error("Walk the plank, matey...");
- future = null;
- //FIXME
-// batchService.removeIntentOperations(data);
- }
- }
-
- // TODO: better naming
- private class IntentBatchProcessFutures extends IntentBatchApplyFirst {
-
- IntentBatchProcessFutures(Collection<IntentData> operations, List<CompletedIntentUpdate> intentUpdates,
- long endTime, int installAttempt, Future<CompletedBatchOperation> future) {
- super(operations, intentUpdates, endTime, installAttempt, future);
- }
-
- @Override
- public void run() {
- try {
- Future<CompletedBatchOperation> future = processFutures();
- if (future == null) {
- // there are no outstanding batches; we are done
- //FIXME
- return; //?
-// batchService.removeIntentOperations(data);
- } else if (System.currentTimeMillis() > endTime) {
- // - cancel current FlowRuleBatch and resubmit again
- retry();
- } else {
- // we are not done yet, yield the thread by resubmitting ourselves
- batchExecutor.submit(new IntentBatchProcessFutures(data, intentUpdates, endTime,
- installAttempt, future));
- }
- } catch (Exception e) {
- log.error("Error submitting batches:", e);
- // FIXME incomplete Intents should be cleaned up
- // (transition to FAILED, etc.)
- abandonShip();
- }
- }
-
- /**
- * Iterate through the pending futures, and remove them when they have completed.
- */
- private Future<CompletedBatchOperation> processFutures() {
- try {
- CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
- updateBatches(completed);
- return applyNextBatch(intentUpdates);
- } catch (TimeoutException | InterruptedException te) {
- log.trace("Installation of intents are still pending: {}", data);
- return future;
- } catch (ExecutionException e) {
- log.warn("Execution of batch failed: {}", data, e);
- abandonShip();
- return future;
- }
- }
-
- private void updateBatches(CompletedBatchOperation completed) {
- if (completed.isSuccess()) {
- for (CompletedIntentUpdate update : intentUpdates) {
- update.batchSuccess();
- }
- } else {
- // entire batch has been reverted...
- log.debug("Failed items: {}", completed.failedItems());
- log.debug("Failed ids: {}", completed.failedIds());
-
- for (Long id : completed.failedIds()) {
- IntentId targetId = IntentId.valueOf(id);
- for (CompletedIntentUpdate update : intentUpdates) {
- for (Intent intent : update.allInstallables()) {
- if (intent.id().equals(targetId)) {
- update.batchFailed();
- break;
- }
- }
- }
- // don't increment the non-failed items, as they have been reverted.
- }
- }
- }
-
- private void retry() {
- log.debug("Execution timed out, retrying.");
- if (future.cancel(true)) { // cancel success; batch is reverted
- // reset the timer
- long timeLimit = calculateTimeoutLimit();
- int attempts = installAttempt + 1;
- if (attempts == MAX_ATTEMPTS) {
- log.warn("Install request timed out: {}", data);
- for (CompletedIntentUpdate update : intentUpdates) {
- update.batchFailed();
- }
- } else if (attempts > MAX_ATTEMPTS) {
- abandonShip();
- return;
- }
- Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
- batchExecutor.submit(new IntentBatchProcessFutures(data, intentUpdates, timeLimit, attempts, future));
- } else {
- log.error("Cancelling FlowRuleBatch failed.");
- abandonShip();
- }
+ return (CompletedIntentUpdate) previousPhase;
}
}
@@ -700,4 +556,166 @@
// TODO ensure that only one batch is in flight at a time
}
}
+
+// /////////**************************///////////////////
+// FIXME Need to build and monitor contexts from FlowRuleService
+//
+// // TODO: better naming
+// private class IntentBatchApplyFirst extends IntentBatchPreprocess {
+//
+// protected final List<CompletedIntentUpdate> intentUpdates;
+// protected final int installAttempt;
+// protected Future<CompletedBatchOperation> future;
+//
+// IntentBatchApplyFirst(Collection<IntentData> operations, List<CompletedIntentUpdate> intentUpdates,
+// long endTime, int installAttempt, Future<CompletedBatchOperation> future) {
+// super(operations, endTime);
+// this.intentUpdates = ImmutableList.copyOf(intentUpdates);
+// this.future = future;
+// this.installAttempt = installAttempt;
+// }
+//
+// @Override
+// public void run() {
+// Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
+// new IntentBatchProcessFutures(data, intentUpdates, endTime, installAttempt, future).run();
+// }
+//
+// /**
+// * Builds and applies the next batch, and returns the future.
+// *
+// * @return Future for next batch
+// */
+// protected Future<CompletedBatchOperation> applyNextBatch(List<CompletedIntentUpdate> updates) {
+// //TODO test this. (also, maybe save this batch)
+//
+// FlowRuleBatchOperation batch = createFlowRuleBatchOperation(updates);
+// if (batch.size() > 0) {
+// //FIXME apply batch might throw an exception
+// return flowRuleService.applyBatch(batch);
+// } else {
+// return null;
+// }
+// }
+//
+// private FlowRuleBatchOperation createFlowRuleBatchOperation(List<CompletedIntentUpdate> intentUpdates) {
+// FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList(), null, 0);
+// for (CompletedIntentUpdate update : intentUpdates) {
+// FlowRuleBatchOperation currentBatch = update.currentBatch();
+// if (currentBatch != null) {
+// batch.addAll(currentBatch);
+// }
+// }
+// return batch;
+// }
+//
+// protected void abandonShip() {
+// // the batch has failed
+// // TODO: maybe we should do more?
+// log.error("Walk the plank, matey...");
+// future = null;
+// //FIXME
+// //batchService.removeIntentOperations(data);
+// }
+// }
+//
+// // TODO: better naming
+// private class IntentBatchProcessFutures extends IntentBatchApplyFirst {
+//
+// IntentBatchProcessFutures(Collection<IntentData> operations, List<CompletedIntentUpdate> intentUpdates,
+// long endTime, int installAttempt, Future<CompletedBatchOperation> future) {
+// super(operations, intentUpdates, endTime, installAttempt, future);
+// }
+//
+// @Override
+// public void run() {
+// try {
+// Future<CompletedBatchOperation> future = processFutures();
+// if (future == null) {
+// // there are no outstanding batches; we are done
+// //FIXME
+// return; //?
+// //batchService.removeIntentOperations(data);
+// } else if (System.currentTimeMillis() > endTime) {
+// // - cancel current FlowRuleBatch and resubmit again
+// retry();
+// } else {
+// // we are not done yet, yield the thread by resubmitting ourselves
+// batchExecutor.submit(new IntentBatchProcessFutures(data, intentUpdates, endTime,
+// installAttempt, future));
+// }
+// } catch (Exception e) {
+// log.error("Error submitting batches:", e);
+// // FIXME incomplete Intents should be cleaned up
+// // (transition to FAILED, etc.)
+// abandonShip();
+// }
+// }
+//
+// /**
+// * Iterate through the pending futures, and remove them when they have completed.
+// */
+// private Future<CompletedBatchOperation> processFutures() {
+// try {
+// CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
+// updateBatches(completed);
+// return applyNextBatch(intentUpdates);
+// } catch (TimeoutException | InterruptedException te) {
+// log.trace("Installation of intents are still pending: {}", data);
+// return future;
+// } catch (ExecutionException e) {
+// log.warn("Execution of batch failed: {}", data, e);
+// abandonShip();
+// return future;
+// }
+// }
+//
+// private void updateBatches(CompletedBatchOperation completed) {
+// if (completed.isSuccess()) {
+// for (CompletedIntentUpdate update : intentUpdates) {
+// update.batchSuccess();
+// }
+// } else {
+// // entire batch has been reverted...
+// log.debug("Failed items: {}", completed.failedItems());
+// log.debug("Failed ids: {}", completed.failedIds());
+//
+// for (Long id : completed.failedIds()) {
+// IntentId targetId = IntentId.valueOf(id);
+// for (CompletedIntentUpdate update : intentUpdates) {
+// for (Intent intent : update.allInstallables()) {
+// if (intent.id().equals(targetId)) {
+// update.batchFailed();
+// break;
+// }
+// }
+// }
+// // don't increment the non-failed items, as they have been reverted.
+// }
+// }
+// }
+//
+// private void retry() {
+// log.debug("Execution timed out, retrying.");
+// if (future.cancel(true)) { // cancel success; batch is reverted
+// // reset the timer
+// long timeLimit = calculateTimeoutLimit();
+// int attempts = installAttempt + 1;
+// if (attempts == MAX_ATTEMPTS) {
+// log.warn("Install request timed out: {}", data);
+// for (CompletedIntentUpdate update : intentUpdates) {
+// update.batchFailed();
+// }
+// } else if (attempts > MAX_ATTEMPTS) {
+// abandonShip();
+// return;
+// }
+// Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
+// batchExecutor.submit(new IntentBatchProcessFutures(data, intentUpdates, timeLimit, attempts, future));
+// } else {
+// log.error("Cancelling FlowRuleBatch failed.");
+// abandonShip();
+// }
+// }
+// }
}