Initial implementation of async installers
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 16b75f2..50f1038 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -13,12 +13,14 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -28,6 +30,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.intent.InstallableIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
@@ -44,7 +47,9 @@
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
/**
* An implementation of Intent Manager.
@@ -67,7 +72,8 @@
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
- private final ExecutorService executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+ private ExecutorService executor;
+ private ExecutorService monitorExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@@ -86,6 +92,8 @@
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
+ executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+ monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
log.info("Started");
}
@@ -94,6 +102,8 @@
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
+ executor.shutdown();
+ monitorExecutor.shutdown();
log.info("Stopped");
}
@@ -240,14 +250,23 @@
}
}
- // FIXME: To make SDN-IP workable ASAP, only single level compilation is implemented
- // TODO: implement compilation traversing tree structure
+ /**
+ * Compiles an intent recursively.
+ *
+ * @param intent intent
+ * @return result of compilation
+ */
private List<InstallableIntent> compileIntent(Intent intent) {
- List<InstallableIntent> installable = new ArrayList<>();
- for (Intent compiled : getCompiler(intent).compile(intent)) {
- InstallableIntent installableIntent = (InstallableIntent) compiled;
- installable.add(installableIntent);
+ if (intent instanceof InstallableIntent) {
+ return ImmutableList.of((InstallableIntent) intent);
}
+
+ List<InstallableIntent> installable = new ArrayList<>();
+ // TODO do we need to registerSubclassCompiler?
+ for (Intent compiled : getCompiler(intent).compile(intent)) {
+ installable.addAll(compileIntent(compiled));
+ }
+
return installable;
}
@@ -261,6 +280,7 @@
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
+ List<Future<CompletedBatchOperation>> installFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
@@ -268,17 +288,20 @@
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.requiredLinks());
- getInstaller(installable).install(installable);
+ Future<CompletedBatchOperation> future = getInstaller(installable).install(installable);
+ installFutures.add(future);
}
}
- eventDispatcher.post(store.setState(intent, INSTALLED));
-
+ // FIXME we have to wait for the installable intents
+ //eventDispatcher.post(store.setState(intent, INSTALLED));
+ monitorExecutor.execute(new IntentInstallMonitor(intent, installFutures, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to: {}", intent.id(), e);
- uninstallIntent(intent);
+ uninstallIntent(intent, RECOMPILING);
// If compilation failed, kick off the recompiling phase.
- executeRecompilingPhase(intent);
+ // FIXME
+ //executeRecompilingPhase(intent);
}
}
@@ -327,12 +350,14 @@
private void executeWithdrawingPhase(Intent intent) {
// Indicate that the intent is being withdrawn.
store.setState(intent, WITHDRAWING);
- uninstallIntent(intent);
+ uninstallIntent(intent, WITHDRAWN);
// If all went well, disassociate the top-level intent with its
// installable derivatives and mark it as withdrawn.
- store.removeInstalledIntents(intent.id());
- eventDispatcher.post(store.setState(intent, WITHDRAWN));
+ // FIXME need to clean up
+ //store.removeInstalledIntents(intent.id());
+ // FIXME
+ //eventDispatcher.post(store.setState(intent, WITHDRAWN));
}
/**
@@ -340,14 +365,17 @@
*
* @param intent intent to be uninstalled
*/
- private void uninstallIntent(Intent intent) {
+ private void uninstallIntent(Intent intent, IntentState nextState) {
+ List<Future<CompletedBatchOperation>> uninstallFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (InstallableIntent installable : installables) {
- getInstaller(installable).uninstall(installable);
+ Future<CompletedBatchOperation> future = getInstaller(installable).uninstall(installable);
+ uninstallFutures.add(future);
}
}
+ monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallFutures, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
@@ -422,9 +450,10 @@
// Attempt recompilation of the specified intents first.
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
- uninstallIntent(intent);
+ uninstallIntent(intent, RECOMPILING);
- executeRecompilingPhase(intent);
+ //FIXME
+ //executeRecompilingPhase(intent);
}
if (compileAllFailed) {
@@ -460,4 +489,44 @@
}
}
+ private class IntentInstallMonitor implements Runnable {
+
+ private final Intent intent;
+ private final List<Future<CompletedBatchOperation>> futures;
+ private final IntentState nextState;
+
+ public IntentInstallMonitor(Intent intent,
+ List<Future<CompletedBatchOperation>> futures, IntentState nextState) {
+ this.intent = intent;
+ this.futures = futures;
+ this.nextState = nextState;
+ }
+
+ private void updateIntent(Intent intent) {
+ if (nextState == RECOMPILING) {
+ executor.execute(new IntentTask(nextState, intent));
+ } else if (nextState == INSTALLED || nextState == WITHDRAWN) {
+ eventDispatcher.post(store.setState(intent, nextState));
+ } else {
+ log.warn("Invalid next intent state {} for intent {}", nextState, intent);
+ }
+ }
+
+ @Override
+ public void run() {
+ for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
+ Future<CompletedBatchOperation> future = i.next();
+ if (future.isDone()) {
+ // TODO: we may want to get the future here
+ i.remove();
+ }
+ }
+ if (futures.isEmpty()) {
+ updateIntent(intent);
+ } else {
+ // resubmit ourselves if we are not done yet
+ monitorExecutor.submit(this);
+ }
+ }
+ }
}