ONOS-5855 Intent installer for Protection
- This patch is effectively combination of following to unblock
protection related code:
+ Intent installer and operations for domain intents.
Change-Id Id4597baebf587b1bc9b3ae9013383ae50472fa4c
by Andreas Papazois <andreas.papazois@gmail.com>
+ ONOS-5604 IntentInstaller for protection
adds ProtectionConfigOperationContext
Change-Id Ia9f5ddd44c9765867b5e2daaa7a7478c1dc2f2e3
- removed dependency to Intent Domain related classes.
Change-Id: I027eedb020b3f4240c545dc92e47c4262d8b6ec6
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
index 49433af..378b9b8 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
@@ -16,9 +16,16 @@
package org.onosproject.net.intent.impl;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.Beta;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
+import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.protection.ProtectedTransportEndpointDescription;
+import org.onosproject.net.behaviour.protection.ProtectionConfig;
+import org.onosproject.net.config.NetworkConfigService;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
@@ -32,6 +39,7 @@
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentStore;
+import org.onosproject.net.intent.ProtectionEndpointIntent;
import org.slf4j.Logger;
import java.util.ArrayList;
@@ -41,9 +49,12 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
@@ -59,6 +70,7 @@
private ObjectiveTrackerService trackerService;
private FlowRuleService flowRuleService;
private FlowObjectiveService flowObjectiveService;
+ private NetworkConfigService networkConfigService;
private enum Direction {
ADD,
@@ -72,17 +84,19 @@
* @param trackerService objective tracking service
* @param flowRuleService flow rule service
* @param flowObjectiveService flow objective service
+ * @param networkConfigService network configuration service
*/
void init(IntentStore intentStore, ObjectiveTrackerService trackerService,
- FlowRuleService flowRuleService, FlowObjectiveService flowObjectiveService) {
+ FlowRuleService flowRuleService, FlowObjectiveService flowObjectiveService,
+ NetworkConfigService networkConfigService) {
this.store = intentStore;
this.trackerService = trackerService;
+ //TODO Various services should be plugged to the intent installer instead of being hardcoded
this.flowRuleService = flowRuleService;
this.flowObjectiveService = flowObjectiveService;
+ this.networkConfigService = networkConfigService;
}
-
- // FIXME: Refactor to accept both FlowObjectiveIntent and FlowRuleIntents
// FIXME: Intent Manager should have never become dependent on a specific intent type(s).
// This will be addressed in intent domains work; not now.
@@ -94,8 +108,8 @@
* @param toInstall optional intent to install
*/
void apply(Optional<IntentData> toUninstall, Optional<IntentData> toInstall) {
- // Hook for handling success
- Consumer<OperationContext> successConsumer = (ctx) -> {
+ // Hook for handling success at intent installation level.
+ Consumer<IntentInstallationContext> successConsumer = (ctx) -> {
if (toInstall.isPresent()) {
IntentData installData = toInstall.get();
log.debug("Completed installing: {}", installData.key());
@@ -118,13 +132,11 @@
}
};
- // Hook for handling errors
- Consumer<OperationContext> errorConsumer = (ctx) -> {
+ // Hook for handling errors at intent installation level
+ Consumer<IntentInstallationContext> errorConsumer = (ctx) -> {
// if toInstall was cause of error, then recompile (manage/increment counter, when exceeded -> CORRUPT)
if (toInstall.isPresent()) {
IntentData installData = toInstall.get();
- log.warn("Failed installation: {} {} due to {}",
- installData.key(), installData.intent(), ctx.error());
installData.setState(CORRUPT);
installData.incrementErrorCount();
store.write(installData);
@@ -132,35 +144,95 @@
// if toUninstall was cause of error, then CORRUPT (another job will clean this up)
if (toUninstall.isPresent()) {
IntentData uninstallData = toUninstall.get();
- log.warn("Failed withdrawal: {} {} due to {}",
- uninstallData.key(), uninstallData.intent(), ctx.error());
uninstallData.setState(CORRUPT);
uninstallData.incrementErrorCount();
store.write(uninstallData);
}
};
+ // Hooks at operation level
+ Consumer<OperationContext> successOperationConsumer = (ctx) -> {
+ ctx.intentContext.finishContext(ctx);
+ };
+ Consumer<OperationContext> errorOperationConsumer = (ctx) -> {
+ if (ctx.toInstall.isPresent()) {
+ IntentData installData = toInstall.get();
+ log.warn("Failed installation operation for: {} {} due to {}",
+ installData.key(), installData.intent(), ctx.error());
+ }
+ if (ctx.toUninstall.isPresent()) {
+ IntentData uninstallData = toUninstall.get();
+ log.warn("Failed withdrawal operation for: {} {} due to {}",
+ uninstallData.key(), uninstallData.intent(), ctx.error());
+ }
+ ctx.intentContext.handleError(ctx);
+ };
+
// Create a context for tracking the backing operations for applying
// the intents to the environment.
- OperationContext context = createContext(toUninstall, toInstall);
-
- context.prepare(toUninstall, toInstall, successConsumer, errorConsumer);
- context.apply();
+ IntentInstallationContext intentContext =
+ new IntentInstallationContext(successConsumer, errorConsumer);
+ Set<OperationContext> contexts = createContext(intentContext, toUninstall, toInstall);
+ intentContext.pendingContexts = contexts;
+ contexts.forEach(ctx -> {
+ ctx.prepare(toUninstall, toInstall, successOperationConsumer, errorOperationConsumer);
+ ctx.apply();
+ });
}
- // ------ Utilities to support FlowRule vs. FlowObjective behavior -------
+ // Context for applying and tracking multiple kinds of operation contexts
+ // related to specific intent data.
+ private final class IntentInstallationContext {
+ private Set<OperationContext> pendingContexts = Sets.newHashSet();
+ private Set<OperationContext> errorContexts = Sets.newHashSet();
+ private Consumer<IntentInstallationContext> successConsumer;
+ private Consumer<IntentInstallationContext> errorConsumer;
- // Creates the context appropriate for tracking operations of the
+ private IntentInstallationContext(Consumer<IntentInstallationContext> succesConsumer,
+ Consumer<IntentInstallationContext> errorConsumer) {
+ this.successConsumer = succesConsumer;
+ this.errorConsumer = errorConsumer;
+ }
+
+ private void handleError(OperationContext ctx) {
+ errorContexts.add(ctx);
+ finishContext(ctx);
+ }
+
+ private void finishContext(OperationContext ctx) {
+ synchronized (pendingContexts) {
+ pendingContexts.remove(ctx);
+ if (pendingContexts.isEmpty()) {
+ if (errorContexts.isEmpty()) {
+ successConsumer.accept(IntentInstallationContext.this);
+ } else {
+ errorConsumer.accept(IntentInstallationContext.this);
+ }
+ }
+ }
+ }
+ }
+
+ // --- Utilities to support various installable Intent ----
+
+ // Creates the set of contexts appropriate for tracking operations of the
// the specified intents.
- private OperationContext createContext(Optional<IntentData> toUninstall,
- Optional<IntentData> toInstall) {
+ private Set<OperationContext> createContext(IntentInstallationContext intentContext,
+ Optional<IntentData> toUninstall,
+ Optional<IntentData> toInstall) {
+
+ Set<OperationContext> contexts = Sets.newConcurrentHashSet();
if (isInstallable(toUninstall, toInstall, FlowRuleIntent.class)) {
- return new FlowRuleOperationContext();
+ contexts.add(new FlowRuleOperationContext(intentContext));
}
if (isInstallable(toUninstall, toInstall, FlowObjectiveIntent.class)) {
- return new FlowObjectiveOperationContext();
+ contexts.add(new FlowObjectiveOperationContext(intentContext));
}
- return new ErrorContext();
+ if (isInstallable(toUninstall, toInstall, ProtectionEndpointIntent.class)) {
+ contexts.add(new ProtectionConfigOperationContext(intentContext));
+ }
+
+ return contexts.isEmpty() ? ImmutableSet.of(new ErrorContext(intentContext)) : contexts;
}
private boolean isInstallable(Optional<IntentData> toUninstall, Optional<IntentData> toInstall,
@@ -169,14 +241,14 @@
if (toInstall.isPresent()) {
notBothNull = true;
if (!toInstall.get().installables().stream()
- .allMatch(i -> intentClass.isAssignableFrom(i.getClass()))) {
+ .anyMatch(i -> intentClass.isAssignableFrom(i.getClass()))) {
return false;
}
}
if (toUninstall.isPresent()) {
notBothNull = true;
if (!toUninstall.get().installables().stream()
- .allMatch(i -> intentClass.isAssignableFrom(i.getClass()))) {
+ .anyMatch(i -> intentClass.isAssignableFrom(i.getClass()))) {
return false;
}
}
@@ -185,6 +257,7 @@
// Base context for applying and tracking operations related to installable intents.
private abstract class OperationContext {
+ protected IntentInstallationContext intentContext;
protected Optional<IntentData> toUninstall;
protected Optional<IntentData> toInstall;
/**
@@ -196,6 +269,10 @@
*/
protected Consumer<OperationContext> errorConsumer;
+ protected OperationContext(IntentInstallationContext context) {
+ this.intentContext = context;
+ }
+
/**
* Applies the Intents specified by
* {@link #prepareIntents(List, Direction)} call(s) prior to this call.
@@ -329,16 +406,22 @@
}
private boolean isSupported(Intent intent) {
- return intent instanceof FlowRuleIntent || intent instanceof FlowObjectiveIntent;
+ return intent instanceof FlowRuleIntent ||
+ intent instanceof FlowObjectiveIntent ||
+ intent instanceof ProtectionEndpointIntent;
}
}
- // Context for applying and tracking operations related to flow rule intent.
+ // Context for applying and tracking operations related to flow rule intents.
private class FlowRuleOperationContext extends OperationContext {
FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
FlowRuleOperationsContext flowRuleOperationsContext;
+ FlowRuleOperationContext(IntentInstallationContext context) {
+ super(context);
+ }
+
@Override
void apply() {
flowRuleOperationsContext = new FlowRuleOperationsContext() {
@@ -371,6 +454,7 @@
builder.newStage();
List<Collection<FlowRule>> stages = intentsToApply.stream()
+ .filter(x -> x instanceof FlowRuleIntent)
.map(x -> (FlowRuleIntent) x)
.map(FlowRuleIntent::flowRules)
.collect(Collectors.toList());
@@ -397,9 +481,14 @@
final Set<ObjectiveContext> pendingContexts = Sets.newHashSet();
final Set<ObjectiveContext> errorContexts = Sets.newConcurrentHashSet();
+ FlowObjectiveOperationContext(IntentInstallationContext context) {
+ super(context);
+ }
+
@Override
public void prepareIntents(List<Intent> intentsToApply, Direction direction) {
intentsToApply.stream()
+ .filter(x -> x instanceof FlowObjectiveIntent)
.flatMap(x -> buildObjectiveContexts((FlowObjectiveIntent) x, direction).stream())
.forEach(contexts::add);
}
@@ -488,6 +577,9 @@
}
private class ErrorContext extends OperationContext {
+ ErrorContext(IntentInstallationContext context) {
+ super(context);
+ }
@Override
void apply() {
throw new UnsupportedOperationException("Unsupported installable intent");
@@ -502,4 +594,110 @@
void prepareIntents(List<Intent> intentsToApply, Direction direction) {
}
}
+
+
+ /**
+ * Context for applying and tracking operations related to
+ * {@link ProtectionEndpointIntent}.
+ */
+ @Beta
+ private class ProtectionConfigOperationContext extends OperationContext {
+
+ ProtectionConfigOperationContext(IntentInstallationContext context) {
+ super(context);
+ }
+
+ /**
+ * Stage of installable Intents which can be processed in parallel.
+ */
+ private final class Stage {
+ // should it have progress state, how far it went?
+ private final Collection<Pair<ProtectionEndpointIntent, Direction>> ops;
+
+ Stage(Collection<Pair<ProtectionEndpointIntent, Direction>> ops) {
+ this.ops = checkNotNull(ops);
+ }
+
+ CompletableFuture<Void> apply() {
+ return ops.stream()
+ .map(op -> applyOp(op.getRight(), op.getLeft()))
+ .reduce(CompletableFuture.completedFuture(null),
+ (l, r) -> {
+ l.join();
+ return r;
+ });
+ }
+
+ private CompletableFuture<Void> applyOp(Direction dir, ProtectionEndpointIntent intent) {
+ log.trace("applying {}: {}", dir, intent);
+ if (dir == Direction.REMOVE) {
+ networkConfigService.removeConfig(intent.deviceId(), ProtectionConfig.class);
+ } else if (dir == Direction.ADD) {
+ ProtectedTransportEndpointDescription description = intent.description();
+
+ // Can't do following. Will trigger empty CONFIG_ADDED
+ //ProtectionConfig cfg = networkConfigService.addConfig(intent.deviceId(),
+ // ProtectionConfig.class);
+ ProtectionConfig cfg = new ProtectionConfig(intent.deviceId());
+ cfg.fingerprint(description.fingerprint());
+ cfg.peer(description.peer());
+ cfg.paths(description.paths());
+ //cfg.apply();
+
+ networkConfigService.applyConfig(intent.deviceId(),
+ ProtectionConfig.class,
+ cfg.node());
+ }
+ // TODO Should monitor progress and complete only after it's
+ // actually done.
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public String toString() {
+ return ops.toString();
+ }
+ }
+
+ /**
+ * List of Stages which must be executed in order.
+ */
+ private final List<Stage> stages = new ArrayList<>();
+
+ private final List<Stage> failed = new CopyOnWriteArrayList<>();
+
+ @Override
+ synchronized void apply() {
+ for (Stage stage : stages) {
+ log.trace("applying Stage {}", stage);
+ CompletableFuture<Void> result = stage.apply();
+ // wait for stage completion
+ result.join();
+ if (result.isCompletedExceptionally()) {
+ log.error("Stage {} failed", stage);
+ failed.add(stage);
+ errorConsumer.accept(ProtectionConfigOperationContext.this);
+ return;
+ }
+ }
+ successConsumer.accept(ProtectionConfigOperationContext.this);
+ }
+
+ @Override
+ Object error() {
+ // Something to represent error state
+ return failed;
+ }
+
+ @Override
+ synchronized void prepareIntents(List<Intent> intentsToApply,
+ Direction direction) {
+
+ stages.add(new Stage(intentsToApply.stream()
+ .filter(i -> i instanceof ProtectionEndpointIntent)
+ .map(i -> Pair.of((ProtectionEndpointIntent) i, direction))
+ .collect(Collectors.toList())));
+ }
+
+ }
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 5524d63..68304eb 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -29,6 +29,7 @@
import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.NetworkConfigService;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.group.GroupKey;
@@ -132,6 +133,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected GroupService groupService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private NetworkConfigService networkConfigService;
+
+
private ExecutorService batchExecutor;
private ExecutorService workerExecutor;
@@ -150,7 +155,8 @@
public void activate() {
configService.registerProperties(getClass());
- intentInstaller.init(store, trackerService, flowRuleService, flowObjectiveService);
+ intentInstaller.init(store, trackerService, flowRuleService, flowObjectiveService,
+ networkConfigService);
if (skipReleaseResourcesOnWithdrawal) {
store.setDelegate(testOnlyDelegate);
} else {
@@ -167,7 +173,7 @@
@Deactivate
public void deactivate() {
- intentInstaller.init(null, null, null, null);
+ intentInstaller.init(null, null, null, null, null);
if (skipReleaseResourcesOnWithdrawal) {
store.unsetDelegate(testOnlyDelegate);
} else {
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index a9652cf..33f824c 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -86,6 +86,7 @@
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.TributarySlot;
+import org.onosproject.net.behaviour.protection.ProtectedTransportEndpointDescription;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DefaultPortStatistics;
@@ -181,6 +182,7 @@
import org.onosproject.net.intent.OpticalPathIntent;
import org.onosproject.net.intent.PathIntent;
import org.onosproject.net.intent.PointToPointIntent;
+import org.onosproject.net.intent.ProtectionEndpointIntent;
import org.onosproject.net.intent.SinglePointToMultiPointIntent;
import org.onosproject.net.intent.constraint.AnnotationConstraint;
import org.onosproject.net.intent.constraint.BandwidthConstraint;
@@ -555,6 +557,8 @@
.register(new ImmutableByteSequenceSerializer(), ImmutableByteSequence.class)
.register(PathIntent.ProtectionType.class)
.register(ProtectionConstraint.class)
+ .register(ProtectedTransportEndpointDescription.class)
+ .register(ProtectionEndpointIntent.class)
.build("API");
/**