Initial implementation of async installers
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
index 738be04..9855498 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentInstaller.java
@@ -1,5 +1,9 @@
package org.onlab.onos.net.intent;
+import java.util.concurrent.Future;
+
+import org.onlab.onos.net.flow.CompletedBatchOperation;
+
/**
* Abstraction of entity capable of installing intents to the environment.
*/
@@ -10,7 +14,7 @@
* @param intent intent to be installed
* @throws IntentException if issues are encountered while installing the intent
*/
- void install(T intent);
+ Future<CompletedBatchOperation> install(T intent);
/**
* Uninstalls the specified intent from the environment.
@@ -18,5 +22,5 @@
* @param intent intent to be uninstalled
* @throws IntentException if issues are encountered while uninstalling the intent
*/
- void uninstall(T intent);
+ Future<CompletedBatchOperation> uninstall(T intent);
}
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
index 7eb0e19..163a056 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
@@ -1,17 +1,25 @@
package org.onlab.onos.net.intent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.onlab.onos.net.intent.IntentEvent.Type.FAILED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.INSTALLED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.SUBMITTED;
+import static org.onlab.onos.net.intent.IntentEvent.Type.WITHDRAWN;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Future;
-import static org.junit.Assert.*;
-import static org.onlab.onos.net.intent.IntentEvent.Type.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
/**
* Suite of tests for the intent service contract.
@@ -290,17 +298,19 @@
}
@Override
- public void install(TestInstallableIntent intent) {
+ public Future<CompletedBatchOperation> install(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("install failed by design");
}
+ return null;
}
@Override
- public void uninstall(TestInstallableIntent intent) {
+ public Future<CompletedBatchOperation> uninstall(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("remove failed by design");
}
+ return null;
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 16b75f2..50f1038 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -13,12 +13,14 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -28,6 +30,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.intent.InstallableIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
@@ -44,7 +47,9 @@
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
/**
* An implementation of Intent Manager.
@@ -67,7 +72,8 @@
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
- private final ExecutorService executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+ private ExecutorService executor;
+ private ExecutorService monitorExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@@ -86,6 +92,8 @@
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
+ executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+ monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
log.info("Started");
}
@@ -94,6 +102,8 @@
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
+ executor.shutdown();
+ monitorExecutor.shutdown();
log.info("Stopped");
}
@@ -240,14 +250,23 @@
}
}
- // FIXME: To make SDN-IP workable ASAP, only single level compilation is implemented
- // TODO: implement compilation traversing tree structure
+ /**
+ * Compiles an intent recursively.
+ *
+ * @param intent intent
+ * @return result of compilation
+ */
private List<InstallableIntent> compileIntent(Intent intent) {
- List<InstallableIntent> installable = new ArrayList<>();
- for (Intent compiled : getCompiler(intent).compile(intent)) {
- InstallableIntent installableIntent = (InstallableIntent) compiled;
- installable.add(installableIntent);
+ if (intent instanceof InstallableIntent) {
+ return ImmutableList.of((InstallableIntent) intent);
}
+
+ List<InstallableIntent> installable = new ArrayList<>();
+ // TODO do we need to registerSubclassCompiler?
+ for (Intent compiled : getCompiler(intent).compile(intent)) {
+ installable.addAll(compileIntent(compiled));
+ }
+
return installable;
}
@@ -261,6 +280,7 @@
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
+ List<Future<CompletedBatchOperation>> installFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
@@ -268,17 +288,20 @@
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.requiredLinks());
- getInstaller(installable).install(installable);
+ Future<CompletedBatchOperation> future = getInstaller(installable).install(installable);
+ installFutures.add(future);
}
}
- eventDispatcher.post(store.setState(intent, INSTALLED));
-
+ // FIXME we have to wait for the installable intents
+ //eventDispatcher.post(store.setState(intent, INSTALLED));
+ monitorExecutor.execute(new IntentInstallMonitor(intent, installFutures, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to: {}", intent.id(), e);
- uninstallIntent(intent);
+ uninstallIntent(intent, RECOMPILING);
// If compilation failed, kick off the recompiling phase.
- executeRecompilingPhase(intent);
+ // FIXME
+ //executeRecompilingPhase(intent);
}
}
@@ -327,12 +350,14 @@
private void executeWithdrawingPhase(Intent intent) {
// Indicate that the intent is being withdrawn.
store.setState(intent, WITHDRAWING);
- uninstallIntent(intent);
+ uninstallIntent(intent, WITHDRAWN);
// If all went well, disassociate the top-level intent with its
// installable derivatives and mark it as withdrawn.
- store.removeInstalledIntents(intent.id());
- eventDispatcher.post(store.setState(intent, WITHDRAWN));
+ // FIXME need to clean up
+ //store.removeInstalledIntents(intent.id());
+ // FIXME
+ //eventDispatcher.post(store.setState(intent, WITHDRAWN));
}
/**
@@ -340,14 +365,17 @@
*
* @param intent intent to be uninstalled
*/
- private void uninstallIntent(Intent intent) {
+ private void uninstallIntent(Intent intent, IntentState nextState) {
+ List<Future<CompletedBatchOperation>> uninstallFutures = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (InstallableIntent installable : installables) {
- getInstaller(installable).uninstall(installable);
+ Future<CompletedBatchOperation> future = getInstaller(installable).uninstall(installable);
+ uninstallFutures.add(future);
}
}
+ monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallFutures, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
@@ -422,9 +450,10 @@
// Attempt recompilation of the specified intents first.
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
- uninstallIntent(intent);
+ uninstallIntent(intent, RECOMPILING);
- executeRecompilingPhase(intent);
+ //FIXME
+ //executeRecompilingPhase(intent);
}
if (compileAllFailed) {
@@ -460,4 +489,44 @@
}
}
+ private class IntentInstallMonitor implements Runnable {
+
+ private final Intent intent;
+ private final List<Future<CompletedBatchOperation>> futures;
+ private final IntentState nextState;
+
+ public IntentInstallMonitor(Intent intent,
+ List<Future<CompletedBatchOperation>> futures, IntentState nextState) {
+ this.intent = intent;
+ this.futures = futures;
+ this.nextState = nextState;
+ }
+
+ private void updateIntent(Intent intent) {
+ if (nextState == RECOMPILING) {
+ executor.execute(new IntentTask(nextState, intent));
+ } else if (nextState == INSTALLED || nextState == WITHDRAWN) {
+ eventDispatcher.post(store.setState(intent, nextState));
+ } else {
+ log.warn("Invalid next intent state {} for intent {}", nextState, intent);
+ }
+ }
+
+ @Override
+ public void run() {
+ for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
+ Future<CompletedBatchOperation> future = i.next();
+ if (future.isDone()) {
+ // TODO: we may want to get the future here
+ i.remove();
+ }
+ }
+ if (futures.isEmpty()) {
+ updateIntent(intent);
+ } else {
+ // resubmit ourselves if we are not done yet
+ monitorExecutor.submit(this);
+ }
+ }
+ }
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index 0ca75c2..56214c6 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -5,7 +5,7 @@
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -15,6 +15,7 @@
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
@@ -57,8 +58,26 @@
intentManager.unregisterInstaller(PathIntent.class);
}
+ /**
+ * Apply a list of FlowRules.
+ *
+ * @param rules rules to apply
+ */
+ private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ Future<CompletedBatchOperation> future = flowRuleService.applyBatch(batch);
+ return future;
+// try {
+// //FIXME don't do this here
+// future.get();
+// } catch (InterruptedException | ExecutionException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+ }
+
@Override
- public void install(PathIntent intent) {
+ public Future<CompletedBatchOperation> install(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
@@ -74,20 +93,14 @@
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
- //flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
- try {
- flowRuleService.applyBatch(batch).get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+
+ return applyBatch(rules);
}
@Override
- public void uninstall(PathIntent intent) {
+ public Future<CompletedBatchOperation> uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
@@ -103,15 +116,131 @@
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
- //flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
- FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
- try {
- flowRuleService.applyBatch(batch).get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ return applyBatch(rules);
+ }
+
+ // TODO refactor below this line... ----------------------------
+
+ /**
+ * Generates the series of MatchActionOperations from the
+ * {@link FlowBatchOperation}.
+ * <p>
+ * FIXME: Currently supporting PacketPathFlow and SingleDstTreeFlow only.
+ * <p>
+ * FIXME: MatchActionOperations should have dependency field to the other
+ * match action operations, and this method should use this.
+ *
+ * @param op the {@link FlowBatchOperation} object
+ * @return the list of {@link MatchActionOperations} objects
+ */
+ /*
+ private List<MatchActionOperations>
+ generateMatchActionOperationsList(FlowBatchOperation op) {
+
+ // MatchAction operations at head (ingress) switches.
+ MatchActionOperations headOps = matchActionService.createOperationsList();
+
+ // MatchAction operations at rest of the switches.
+ MatchActionOperations tailOps = matchActionService.createOperationsList();
+
+ MatchActionOperations removeOps = matchActionService.createOperationsList();
+
+ for (BatchOperationEntry<Operator, ?> e : op.getOperations()) {
+
+ if (e.getOperator() == FlowBatchOperation.Operator.ADD) {
+ generateInstallMatchActionOperations(e, tailOps, headOps);
+ } else if (e.getOperator() == FlowBatchOperation.Operator.REMOVE) {
+ generateRemoveMatchActionOperations(e, removeOps);
+ } else {
+ throw new UnsupportedOperationException(
+ "FlowManager supports ADD and REMOVE operations only.");
+ }
+
+ }
+
+ return Arrays.asList(tailOps, headOps, removeOps);
+ }
+ */
+
+ /**
+ * Generates MatchActionOperations for an INSTALL FlowBatchOperation.
+ * <p/>
+ * FIXME: Currently only supports flows that generate exactly two match
+ * action operation sets.
+ *
+ * @param e Flow BatchOperationEntry
+ * @param tailOps MatchActionOperation set that the tail
+ * MatchActionOperations will be placed in
+ * @param headOps MatchActionOperation set that the head
+ * MatchActionOperations will be placed in
+ */
+ /*
+ private void generateInstallMatchActionOperations(
+ BatchOperationEntry<Operator, ?> e,
+ MatchActionOperations tailOps,
+ MatchActionOperations headOps) {
+
+ if (!(e.getTarget() instanceof Flow)) {
+ throw new IllegalStateException(
+ "The target is not Flow object: " + e.getTarget());
+ }
+
+ // Compile flows to match-actions
+ Flow flow = (Flow) e.getTarget();
+ List<MatchActionOperations> maOps = flow.compile(
+ e.getOperator(), matchActionService);
+ verifyNotNull(maOps, "Could not compile the flow: " + flow);
+ verify(maOps.size() == 2,
+ "The flow generates unspported match-action operations.");
+
+ // Map FlowId to MatchActionIds
+ for (MatchActionOperations maOp : maOps) {
+ for (MatchActionOperationEntry entry : maOp.getOperations()) {
+ flowMatchActionsMap.put(
+ KryoFactory.serialize(flow.getId()),
+ KryoFactory.serialize(entry.getTarget()));
+ }
+ }
+
+ // Merge match-action operations
+ for (MatchActionOperationEntry mae : maOps.get(0).getOperations()) {
+ verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
+ tailOps.addOperation(mae);
+ }
+ for (MatchActionOperationEntry mae : maOps.get(1).getOperations()) {
+ verify(mae.getOperator() == MatchActionOperations.Operator.INSTALL);
+ headOps.addOperation(mae);
}
}
+ */
+ /**
+ * Generates MatchActionOperations for a REMOVE FlowBatchOperation.
+ *
+ * @param e Flow BatchOperationEntry
+ * @param removeOps MatchActionOperation set that the remove
+ * MatchActionOperations will be placed in
+ */
+ /*
+ private void generateRemoveMatchActionOperations(
+ BatchOperationEntry<Operator, ?> e,
+ MatchActionOperations removeOps) {
+
+ if (!(e.getTarget() instanceof FlowId)) {
+ throw new IllegalStateException(
+ "The target is not a FlowId object: " + e.getTarget());
+ }
+
+ // Compile flows to match-actions
+ FlowId flowId = (FlowId) e.getTarget();
+
+ for (byte[] matchActionIdBytes :
+ flowMatchActionsMap.remove(KryoFactory.serialize(flowId))) {
+ MatchActionId matchActionId = KryoFactory.deserialize(matchActionIdBytes);
+ removeOps.addOperation(new MatchActionOperationEntry(
+ MatchActionOperations.Operator.REMOVE, matchActionId));
+ }
+ }
+ */
}