ONOS-5855 Intent installer for Protection

- This patch is effectively combination of following to unblock
  protection related code:
  + Intent installer and operations for domain intents.
    Change-Id  Id4597baebf587b1bc9b3ae9013383ae50472fa4c
    by Andreas Papazois <andreas.papazois@gmail.com>
  + ONOS-5604 IntentInstaller for protection
    adds ProtectionConfigOperationContext
    Change-Id Ia9f5ddd44c9765867b5e2daaa7a7478c1dc2f2e3
  - removed dependency to Intent Domain related classes.

Change-Id: I027eedb020b3f4240c545dc92e47c4262d8b6ec6
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
index 49433af..378b9b8 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
@@ -16,9 +16,16 @@
 
 package org.onosproject.net.intent.impl;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.Beta;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
+import org.apache.commons.lang3.tuple.Pair;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.protection.ProtectedTransportEndpointDescription;
+import org.onosproject.net.behaviour.protection.ProtectionConfig;
+import org.onosproject.net.config.NetworkConfigService;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleOperationsContext;
@@ -32,6 +39,7 @@
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.net.intent.IntentStore;
+import org.onosproject.net.intent.ProtectionEndpointIntent;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -41,9 +49,12 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.onosproject.net.intent.IntentState.*;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -59,6 +70,7 @@
     private ObjectiveTrackerService trackerService;
     private FlowRuleService flowRuleService;
     private FlowObjectiveService flowObjectiveService;
+    private NetworkConfigService networkConfigService;
 
     private enum Direction {
         ADD,
@@ -72,17 +84,19 @@
      * @param trackerService       objective tracking service
      * @param flowRuleService      flow rule service
      * @param flowObjectiveService flow objective service
+     * @param networkConfigService network configuration service
      */
     void init(IntentStore intentStore, ObjectiveTrackerService trackerService,
-              FlowRuleService flowRuleService, FlowObjectiveService flowObjectiveService) {
+              FlowRuleService flowRuleService, FlowObjectiveService flowObjectiveService,
+              NetworkConfigService networkConfigService) {
         this.store = intentStore;
         this.trackerService = trackerService;
+        //TODO Various services should be plugged to the intent installer instead of being hardcoded
         this.flowRuleService = flowRuleService;
         this.flowObjectiveService = flowObjectiveService;
+        this.networkConfigService = networkConfigService;
     }
 
-
-    // FIXME: Refactor to accept both FlowObjectiveIntent and FlowRuleIntents
     // FIXME: Intent Manager should have never become dependent on a specific intent type(s).
     // This will be addressed in intent domains work; not now.
 
@@ -94,8 +108,8 @@
      * @param toInstall   optional intent to install
      */
     void apply(Optional<IntentData> toUninstall, Optional<IntentData> toInstall) {
-        // Hook for handling success
-        Consumer<OperationContext> successConsumer = (ctx) -> {
+        // Hook for handling success at intent installation level.
+        Consumer<IntentInstallationContext> successConsumer = (ctx) -> {
             if (toInstall.isPresent()) {
                 IntentData installData = toInstall.get();
                 log.debug("Completed installing: {}", installData.key());
@@ -118,13 +132,11 @@
             }
         };
 
-        // Hook for handling errors
-        Consumer<OperationContext> errorConsumer = (ctx) -> {
+        // Hook for handling errors at intent installation level
+        Consumer<IntentInstallationContext> errorConsumer = (ctx) -> {
             // if toInstall was cause of error, then recompile (manage/increment counter, when exceeded -> CORRUPT)
             if (toInstall.isPresent()) {
                 IntentData installData = toInstall.get();
-                log.warn("Failed installation: {} {} due to {}",
-                         installData.key(), installData.intent(), ctx.error());
                 installData.setState(CORRUPT);
                 installData.incrementErrorCount();
                 store.write(installData);
@@ -132,35 +144,95 @@
             // if toUninstall was cause of error, then CORRUPT (another job will clean this up)
             if (toUninstall.isPresent()) {
                 IntentData uninstallData = toUninstall.get();
-                log.warn("Failed withdrawal: {} {} due to {}",
-                         uninstallData.key(), uninstallData.intent(), ctx.error());
                 uninstallData.setState(CORRUPT);
                 uninstallData.incrementErrorCount();
                 store.write(uninstallData);
             }
         };
 
+        // Hooks at operation level
+        Consumer<OperationContext> successOperationConsumer = (ctx) -> {
+            ctx.intentContext.finishContext(ctx);
+        };
+        Consumer<OperationContext> errorOperationConsumer = (ctx) -> {
+            if (ctx.toInstall.isPresent()) {
+                IntentData installData = toInstall.get();
+                log.warn("Failed installation operation for: {} {} due to {}",
+                         installData.key(), installData.intent(), ctx.error());
+            }
+            if (ctx.toUninstall.isPresent()) {
+                IntentData uninstallData = toUninstall.get();
+                log.warn("Failed withdrawal operation for: {} {} due to {}",
+                         uninstallData.key(), uninstallData.intent(), ctx.error());
+            }
+            ctx.intentContext.handleError(ctx);
+        };
+
         // Create a context for tracking the backing operations for applying
         // the intents to the environment.
-        OperationContext context = createContext(toUninstall, toInstall);
-
-        context.prepare(toUninstall, toInstall, successConsumer, errorConsumer);
-        context.apply();
+        IntentInstallationContext intentContext =
+                new IntentInstallationContext(successConsumer, errorConsumer);
+        Set<OperationContext> contexts = createContext(intentContext, toUninstall, toInstall);
+        intentContext.pendingContexts = contexts;
+        contexts.forEach(ctx -> {
+            ctx.prepare(toUninstall, toInstall, successOperationConsumer, errorOperationConsumer);
+            ctx.apply();
+        });
     }
 
-    // ------ Utilities to support FlowRule vs. FlowObjective behavior -------
+    // Context for applying and tracking multiple kinds of operation contexts
+    // related to specific intent data.
+    private final class IntentInstallationContext {
+        private Set<OperationContext> pendingContexts = Sets.newHashSet();
+        private Set<OperationContext> errorContexts = Sets.newHashSet();
+        private Consumer<IntentInstallationContext> successConsumer;
+        private Consumer<IntentInstallationContext> errorConsumer;
 
-    // Creates the context appropriate for tracking operations of the
+        private IntentInstallationContext(Consumer<IntentInstallationContext> succesConsumer,
+                                          Consumer<IntentInstallationContext> errorConsumer) {
+            this.successConsumer = succesConsumer;
+            this.errorConsumer = errorConsumer;
+        }
+
+        private void handleError(OperationContext ctx) {
+            errorContexts.add(ctx);
+            finishContext(ctx);
+        }
+
+        private void finishContext(OperationContext ctx) {
+            synchronized (pendingContexts) {
+                pendingContexts.remove(ctx);
+                if (pendingContexts.isEmpty()) {
+                    if (errorContexts.isEmpty()) {
+                        successConsumer.accept(IntentInstallationContext.this);
+                    } else {
+                        errorConsumer.accept(IntentInstallationContext.this);
+                    }
+                }
+            }
+        }
+    }
+
+    // --- Utilities to support various installable Intent ----
+
+    // Creates the set of contexts appropriate for tracking operations of the
     // the specified intents.
-    private OperationContext createContext(Optional<IntentData> toUninstall,
-                                           Optional<IntentData> toInstall) {
+    private Set<OperationContext> createContext(IntentInstallationContext intentContext,
+                                                Optional<IntentData> toUninstall,
+                                                Optional<IntentData> toInstall) {
+
+        Set<OperationContext> contexts = Sets.newConcurrentHashSet();
         if (isInstallable(toUninstall, toInstall, FlowRuleIntent.class)) {
-            return new FlowRuleOperationContext();
+            contexts.add(new FlowRuleOperationContext(intentContext));
         }
         if (isInstallable(toUninstall, toInstall, FlowObjectiveIntent.class)) {
-            return new FlowObjectiveOperationContext();
+            contexts.add(new FlowObjectiveOperationContext(intentContext));
         }
-        return new ErrorContext();
+        if (isInstallable(toUninstall, toInstall, ProtectionEndpointIntent.class)) {
+            contexts.add(new ProtectionConfigOperationContext(intentContext));
+        }
+
+        return contexts.isEmpty() ? ImmutableSet.of(new ErrorContext(intentContext)) : contexts;
     }
 
     private boolean isInstallable(Optional<IntentData> toUninstall, Optional<IntentData> toInstall,
@@ -169,14 +241,14 @@
         if (toInstall.isPresent()) {
             notBothNull = true;
             if (!toInstall.get().installables().stream()
-                    .allMatch(i -> intentClass.isAssignableFrom(i.getClass()))) {
+                    .anyMatch(i -> intentClass.isAssignableFrom(i.getClass()))) {
                 return false;
             }
         }
         if (toUninstall.isPresent()) {
             notBothNull = true;
             if (!toUninstall.get().installables().stream()
-                    .allMatch(i -> intentClass.isAssignableFrom(i.getClass()))) {
+                    .anyMatch(i -> intentClass.isAssignableFrom(i.getClass()))) {
                 return false;
             }
         }
@@ -185,6 +257,7 @@
 
     // Base context for applying and tracking operations related to installable intents.
     private abstract class OperationContext {
+        protected IntentInstallationContext intentContext;
         protected Optional<IntentData> toUninstall;
         protected Optional<IntentData> toInstall;
         /**
@@ -196,6 +269,10 @@
          */
         protected Consumer<OperationContext> errorConsumer;
 
+        protected OperationContext(IntentInstallationContext context) {
+            this.intentContext = context;
+        }
+
         /**
          * Applies the Intents specified by
          * {@link #prepareIntents(List, Direction)} call(s) prior to this call.
@@ -329,16 +406,22 @@
         }
 
         private boolean isSupported(Intent intent) {
-            return intent instanceof FlowRuleIntent || intent instanceof FlowObjectiveIntent;
+            return intent instanceof FlowRuleIntent ||
+                   intent instanceof FlowObjectiveIntent ||
+                   intent instanceof ProtectionEndpointIntent;
         }
     }
 
 
-    // Context for applying and tracking operations related to flow rule intent.
+    // Context for applying and tracking operations related to flow rule intents.
     private class FlowRuleOperationContext extends OperationContext {
         FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
         FlowRuleOperationsContext flowRuleOperationsContext;
 
+        FlowRuleOperationContext(IntentInstallationContext context) {
+            super(context);
+        }
+
         @Override
         void apply() {
             flowRuleOperationsContext = new FlowRuleOperationsContext() {
@@ -371,6 +454,7 @@
             builder.newStage();
 
             List<Collection<FlowRule>> stages = intentsToApply.stream()
+                    .filter(x -> x instanceof FlowRuleIntent)
                     .map(x -> (FlowRuleIntent) x)
                     .map(FlowRuleIntent::flowRules)
                     .collect(Collectors.toList());
@@ -397,9 +481,14 @@
         final Set<ObjectiveContext> pendingContexts = Sets.newHashSet();
         final Set<ObjectiveContext> errorContexts = Sets.newConcurrentHashSet();
 
+        FlowObjectiveOperationContext(IntentInstallationContext context) {
+            super(context);
+        }
+
         @Override
         public void prepareIntents(List<Intent> intentsToApply, Direction direction) {
             intentsToApply.stream()
+                    .filter(x -> x instanceof FlowObjectiveIntent)
                     .flatMap(x -> buildObjectiveContexts((FlowObjectiveIntent) x, direction).stream())
                     .forEach(contexts::add);
         }
@@ -488,6 +577,9 @@
     }
 
     private class ErrorContext extends OperationContext {
+        ErrorContext(IntentInstallationContext context) {
+            super(context);
+        }
         @Override
         void apply() {
             throw new UnsupportedOperationException("Unsupported installable intent");
@@ -502,4 +594,110 @@
         void prepareIntents(List<Intent> intentsToApply, Direction direction) {
         }
     }
+
+
+    /**
+     * Context for applying and tracking operations related to
+     * {@link ProtectionEndpointIntent}.
+     */
+    @Beta
+    private class ProtectionConfigOperationContext extends OperationContext {
+
+        ProtectionConfigOperationContext(IntentInstallationContext context) {
+            super(context);
+        }
+
+        /**
+         * Stage of installable Intents which can be processed in parallel.
+         */
+        private final class Stage {
+            // should it have progress state, how far it went?
+            private final Collection<Pair<ProtectionEndpointIntent, Direction>> ops;
+
+            Stage(Collection<Pair<ProtectionEndpointIntent, Direction>> ops) {
+                this.ops = checkNotNull(ops);
+            }
+
+            CompletableFuture<Void> apply() {
+                return ops.stream()
+                    .map(op -> applyOp(op.getRight(), op.getLeft()))
+                    .reduce(CompletableFuture.completedFuture(null),
+                            (l, r) -> {
+                                l.join();
+                                return r;
+                            });
+            }
+
+            private CompletableFuture<Void> applyOp(Direction dir, ProtectionEndpointIntent intent) {
+                log.trace("applying {}: {}", dir, intent);
+                if (dir == Direction.REMOVE) {
+                    networkConfigService.removeConfig(intent.deviceId(), ProtectionConfig.class);
+                } else if (dir == Direction.ADD) {
+                    ProtectedTransportEndpointDescription description = intent.description();
+
+                    // Can't do following. Will trigger empty CONFIG_ADDED
+                    //ProtectionConfig cfg = networkConfigService.addConfig(intent.deviceId(),
+                    //                                                      ProtectionConfig.class);
+                    ProtectionConfig cfg = new ProtectionConfig(intent.deviceId());
+                    cfg.fingerprint(description.fingerprint());
+                    cfg.peer(description.peer());
+                    cfg.paths(description.paths());
+                    //cfg.apply();
+
+                    networkConfigService.applyConfig(intent.deviceId(),
+                                                     ProtectionConfig.class,
+                                                     cfg.node());
+                }
+                // TODO Should monitor progress and complete only after it's
+                // actually done.
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override
+            public String toString() {
+                return ops.toString();
+            }
+        }
+
+        /**
+         * List of Stages which must be executed in order.
+         */
+        private final List<Stage> stages = new ArrayList<>();
+
+        private final List<Stage> failed = new CopyOnWriteArrayList<>();
+
+        @Override
+        synchronized void apply() {
+            for (Stage stage : stages) {
+                log.trace("applying Stage {}", stage);
+                CompletableFuture<Void> result = stage.apply();
+                // wait for stage completion
+                result.join();
+                if (result.isCompletedExceptionally()) {
+                    log.error("Stage {} failed", stage);
+                    failed.add(stage);
+                    errorConsumer.accept(ProtectionConfigOperationContext.this);
+                    return;
+                }
+            }
+            successConsumer.accept(ProtectionConfigOperationContext.this);
+        }
+
+        @Override
+        Object error() {
+            // Something to represent error state
+            return failed;
+        }
+
+        @Override
+        synchronized void prepareIntents(List<Intent> intentsToApply,
+                                         Direction direction) {
+
+            stages.add(new Stage(intentsToApply.stream()
+                                 .filter(i -> i instanceof ProtectionEndpointIntent)
+                                 .map(i -> Pair.of((ProtectionEndpointIntent) i, direction))
+                                 .collect(Collectors.toList())));
+        }
+
+    }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 5524d63..68304eb 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -29,6 +29,7 @@
 import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.NetworkConfigService;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.group.GroupKey;
@@ -132,6 +133,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected GroupService groupService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private NetworkConfigService networkConfigService;
+
+
     private ExecutorService batchExecutor;
     private ExecutorService workerExecutor;
 
@@ -150,7 +155,8 @@
     public void activate() {
         configService.registerProperties(getClass());
 
-        intentInstaller.init(store, trackerService, flowRuleService, flowObjectiveService);
+        intentInstaller.init(store, trackerService, flowRuleService, flowObjectiveService,
+                             networkConfigService);
         if (skipReleaseResourcesOnWithdrawal) {
             store.setDelegate(testOnlyDelegate);
         } else {
@@ -167,7 +173,7 @@
 
     @Deactivate
     public void deactivate() {
-        intentInstaller.init(null, null, null, null);
+        intentInstaller.init(null, null, null, null, null);
         if (skipReleaseResourcesOnWithdrawal) {
             store.unsetDelegate(testOnlyDelegate);
         } else {
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index a9652cf..33f824c 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -86,6 +86,7 @@
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.TributarySlot;
+import org.onosproject.net.behaviour.protection.ProtectedTransportEndpointDescription;
 import org.onosproject.net.device.DefaultDeviceDescription;
 import org.onosproject.net.device.DefaultPortDescription;
 import org.onosproject.net.device.DefaultPortStatistics;
@@ -181,6 +182,7 @@
 import org.onosproject.net.intent.OpticalPathIntent;
 import org.onosproject.net.intent.PathIntent;
 import org.onosproject.net.intent.PointToPointIntent;
+import org.onosproject.net.intent.ProtectionEndpointIntent;
 import org.onosproject.net.intent.SinglePointToMultiPointIntent;
 import org.onosproject.net.intent.constraint.AnnotationConstraint;
 import org.onosproject.net.intent.constraint.BandwidthConstraint;
@@ -555,6 +557,8 @@
             .register(new ImmutableByteSequenceSerializer(), ImmutableByteSequence.class)
             .register(PathIntent.ProtectionType.class)
             .register(ProtectionConstraint.class)
+            .register(ProtectedTransportEndpointDescription.class)
+            .register(ProtectionEndpointIntent.class)
             .build("API");
 
     /**