[ONOS-6348] Intent installer redesign

Change-Id: I9ae2e8158dc1c686eaf848f330566f9dbb78405f
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/installer/DomainIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/DomainIntentInstaller.java
new file mode 100644
index 0000000..a713034
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/DomainIntentInstaller.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.intent.impl.installer;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.net.domain.DomainIntent;
+import org.onosproject.net.domain.DomainIntentOperations;
+import org.onosproject.net.domain.DomainIntentOperationsContext;
+import org.onosproject.net.domain.DomainIntentService;
+import org.onosproject.net.intent.IntentData;
+import org.onosproject.net.intent.IntentExtensionService;
+import org.onosproject.net.intent.IntentInstallCoordinator;
+import org.onosproject.net.intent.IntentInstaller;
+import org.onosproject.net.intent.IntentOperationContext;
+import org.onosproject.net.intent.impl.IntentManager;
+import org.onosproject.net.intent.impl.ObjectiveTrackerService;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Installer for domain Intent.
+ */
+@Component(immediate = true)
+public class DomainIntentInstaller implements IntentInstaller<DomainIntent> {
+
+    private final Logger log = getLogger(IntentManager.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentExtensionService intentExtensionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ObjectiveTrackerService trackerService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentInstallCoordinator intentInstallCoordinator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DomainIntentService domainIntentService;
+
+    @Activate
+    public void activated() {
+        intentExtensionService.registerInstaller(DomainIntent.class, this);
+    }
+
+    @Deactivate
+    public void deactivated() {
+        intentExtensionService.unregisterInstaller(DomainIntent.class);
+    }
+
+    @Override
+    public void apply(IntentOperationContext<DomainIntent> context) {
+        Optional<IntentData> toUninstall = context.toUninstall();
+        Optional<IntentData> toInstall = context.toInstall();
+
+        List<DomainIntent> uninstallIntents = context.intentsToUninstall();
+        List<DomainIntent> installIntents = context.intentsToInstall();
+
+        if (!toInstall.isPresent() && !toUninstall.isPresent()) {
+            intentInstallCoordinator.intentInstallSuccess(context);
+            return;
+        }
+
+        if (toUninstall.isPresent()) {
+            IntentData intentData = toUninstall.get();
+            trackerService.removeTrackedResources(intentData.key(), intentData.intent().resources());
+            uninstallIntents.forEach(installable ->
+                                             trackerService.removeTrackedResources(intentData.intent().key(),
+                                                                                   installable.resources()));
+        }
+
+        if (toInstall.isPresent()) {
+            IntentData intentData = toInstall.get();
+            trackerService.addTrackedResources(intentData.key(), intentData.intent().resources());
+            installIntents.forEach(installable ->
+                                           trackerService.addTrackedResources(intentData.key(),
+                                                                              installable.resources()));
+        }
+
+        // Generate domain Intent operations
+        DomainIntentOperations.Builder builder = DomainIntentOperations.builder();
+        DomainIntentOperationsContext domainOperationsContext;
+
+        uninstallIntents.forEach(builder::remove);
+        installIntents.forEach(builder::add);
+
+        domainOperationsContext = new DomainIntentOperationsContext() {
+            @Override
+            public void onSuccess(DomainIntentOperations idops) {
+                intentInstallCoordinator.intentInstallSuccess(context);
+            }
+
+            @Override
+            public void onError(DomainIntentOperations idos) {
+                intentInstallCoordinator.intentInstallFailed(context);
+            }
+        };
+        log.debug("submitting domain intent {} -> {}",
+                  toUninstall.map(x -> x.key().toString()).orElse("<empty>"),
+                  toInstall.map(x -> x.key().toString()).orElse("<empty>"));
+
+        // Submit domain Inten operations with domain context
+        domainIntentService.sumbit(builder.build(domainOperationsContext));
+    }
+}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/installer/FlowObjectiveIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/FlowObjectiveIntentInstaller.java
new file mode 100644
index 0000000..22ff970
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/FlowObjectiveIntentInstaller.java
@@ -0,0 +1,565 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.intent.impl.installer;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.intent.FlowObjectiveIntent;
+import org.onosproject.net.intent.IntentData;
+import org.onosproject.net.intent.IntentExtensionService;
+import org.onosproject.net.intent.IntentInstallCoordinator;
+import org.onosproject.net.intent.IntentOperationContext;
+import org.onosproject.net.intent.IntentInstaller;
+import org.onosproject.net.intent.impl.IntentManager;
+import org.onosproject.net.intent.impl.ObjectiveTrackerService;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.onosproject.net.flowobjective.ObjectiveError.INSTALLATIONTHRESHOLDEXCEEDED;
+import static org.onosproject.net.intent.IntentInstaller.Direction.ADD;
+import static org.onosproject.net.intent.IntentInstaller.Direction.REMOVE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Installer for FlowObjectiveIntent.
+ */
+@Component(immediate = true)
+public class FlowObjectiveIntentInstaller implements IntentInstaller<FlowObjectiveIntent> {
+    private static final int OBJECTIVE_RETRY_THRESHOLD = 5;
+    private static final String UNSUPPORT_OBJ = "unsupported objective {}";
+    private final Logger log = getLogger(IntentManager.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentExtensionService intentExtensionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ObjectiveTrackerService trackerService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentInstallCoordinator intentInstallCoordinator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowObjectiveService flowObjectiveService;
+
+    @Activate
+    public void activate() {
+        intentExtensionService.registerInstaller(FlowObjectiveIntent.class, this);
+    }
+
+    @Deactivate
+    public void deactivated() {
+        intentExtensionService.unregisterInstaller(FlowObjectiveIntent.class);
+    }
+
+    @Override
+    public void apply(IntentOperationContext<FlowObjectiveIntent> intentOperationContext) {
+        Objects.requireNonNull(intentOperationContext);
+        Optional<IntentData> toUninstall = intentOperationContext.toUninstall();
+        Optional<IntentData> toInstall = intentOperationContext.toInstall();
+
+        List<FlowObjectiveIntent> uninstallIntents = intentOperationContext.intentsToUninstall();
+        List<FlowObjectiveIntent> installIntents = intentOperationContext.intentsToInstall();
+
+        if (!toInstall.isPresent() && !toUninstall.isPresent()) {
+            intentInstallCoordinator.intentInstallSuccess(intentOperationContext);
+            return;
+        }
+
+        if (toUninstall.isPresent()) {
+            IntentData intentData = toUninstall.get();
+            trackerService.removeTrackedResources(intentData.key(), intentData.intent().resources());
+            uninstallIntents.forEach(installable ->
+                                             trackerService.removeTrackedResources(intentData.intent().key(),
+                                                                                   installable.resources()));
+        }
+
+        if (toInstall.isPresent()) {
+            IntentData intentData = toInstall.get();
+            trackerService.addTrackedResources(intentData.key(), intentData.intent().resources());
+            installIntents.forEach(installable ->
+                                           trackerService.addTrackedResources(intentData.key(),
+                                                                              installable.resources()));
+        }
+
+        FlowObjectiveIntentInstallationContext intentInstallationContext =
+                new FlowObjectiveIntentInstallationContext(intentOperationContext);
+
+        uninstallIntents.stream()
+                .map(intent -> buildObjectiveContexts(intent, REMOVE))
+                .flatMap(Collection::stream)
+                .forEach(context -> {
+                    context.intentInstallationContext(intentInstallationContext);
+                    intentInstallationContext.addContext(context);
+                    intentInstallationContext.addPendingContext(context);
+                });
+
+        installIntents.stream()
+                .map(intent -> buildObjectiveContexts(intent, ADD))
+                .flatMap(Collection::stream)
+                .forEach(context -> {
+                    context.intentInstallationContext(intentInstallationContext);
+                    intentInstallationContext.addContext(context);
+                    intentInstallationContext.addNextPendingContext(context);
+                });
+
+        intentInstallationContext.apply();
+    }
+
+    /**
+     * Builds all objective contexts for a given flow objective Intent with given
+     * operation.
+     *
+     * @param intent the flow objective Intent
+     * @param direction the operation of this Intent
+     * @return all objective context of the Intent with given operation
+     */
+    private Set<FlowObjectiveInstallationContext> buildObjectiveContexts(FlowObjectiveIntent intent,
+                                                                         Direction direction) {
+        Objects.requireNonNull(intent);
+        Objects.requireNonNull(direction);
+        Set<FlowObjectiveInstallationContext> contexts = Sets.newHashSet();
+        int size = intent.objectives().size();
+        List<Objective> objectives = intent.objectives();
+        List<DeviceId> deviceIds = intent.devices();
+
+        if (direction == ADD) {
+            // Install objectives
+            // The flow objective system will handle the installation order
+            for (int i = 0; i < size; i++) {
+                Objective objective = objectives.get(i);
+                DeviceId deviceId = deviceIds.get(i);
+                FlowObjectiveInstallationContext ctx = buildObjectiveContext(objective, deviceId, direction);
+                contexts.add(ctx);
+            }
+            return contexts;
+        } else {
+            // Uninstall objecitves
+            // we need to care about ordering here
+            // basic idea is to chain objective contexts
+            for (int i = 0; i < size; i++) {
+                Objective objective = intent.objectives().get(i);
+                DeviceId deviceId = intent.devices().get(i);
+                if (objective instanceof FilteringObjective) {
+                    // don't need to care ordering of filtering objective
+                    FlowObjectiveInstallationContext ctx =
+                            buildObjectiveContext(objective, deviceId, direction);
+                    contexts.add(ctx);
+                } else if (objective instanceof NextObjective) {
+                    // need to removed after forwarding objective
+                    // nothing to do here
+                } else if (objective instanceof ForwardingObjective) {
+                    // forwarding objective, also find next objective if
+                    // exist
+                    FlowObjectiveInstallationContext fwdCtx =
+                            buildObjectiveContext(objective, deviceId, direction);
+                    ForwardingObjective fwd = (ForwardingObjective) objective;
+                    NextObjective nxt = null;
+                    Integer nextId = fwd.nextId();
+                    if (nextId != null) {
+                        for (int j = 0; j < size; j++) {
+                            if (objectives.get(j).id() == nextId) {
+                                nxt = (NextObjective) objectives.get(j);
+                                break;
+                            }
+                        }
+                        // if a next objective exists in the Intent
+                        if (nxt != null) {
+                            FlowObjectiveInstallationContext nxtCtx =
+                                    buildObjectiveContext(nxt, deviceId, direction);
+                            fwdCtx.nextContext(nxtCtx);
+                        }
+                    }
+                    contexts.add(fwdCtx);
+                } else {
+                    // possible here?
+                    log.warn(UNSUPPORT_OBJ, objective);
+                }
+            }
+        }
+        return contexts;
+    }
+
+    private FlowObjectiveInstallationContext buildObjectiveContext(Objective objective,
+                                                                   DeviceId deviceId,
+                                                                   Direction direction) {
+        Objects.requireNonNull(objective);
+        Objects.requireNonNull(deviceId);
+        Objects.requireNonNull(direction);
+        Objective.Builder builder = objective.copy();
+        FlowObjectiveInstallationContext ctx = new FlowObjectiveInstallationContext();
+        switch (direction) {
+            case ADD:
+                objective = builder.add(ctx);
+                break;
+            case REMOVE:
+                objective = builder.remove(ctx);
+                break;
+            default:
+                break;
+        }
+        ctx.setObjective(objective, deviceId);
+        return ctx;
+    }
+
+    /**
+     * Installation context for flow objective.
+     * Manages installation state of a flow objective.
+     */
+    class FlowObjectiveInstallationContext implements ObjectiveContext {
+        private Objective objective;
+        private DeviceId deviceId;
+        private ObjectiveError error;
+        private AtomicInteger retry;
+        private FlowObjectiveInstallationContext nextContext;
+        private FlowObjectiveIntentInstallationContext intentInstallationContext;
+
+        /**
+         * Set flow objective Intent installation context to this context.
+         *
+         * @param intentInstallationContext the Intent installation context
+         */
+        public void intentInstallationContext(FlowObjectiveIntentInstallationContext intentInstallationContext) {
+            Objects.requireNonNull(intentInstallationContext);
+            this.intentInstallationContext = intentInstallationContext;
+
+            // Set Intent installation context to the next context if exists.
+            if (nextContext != null) {
+                nextContext.intentInstallationContext(intentInstallationContext);
+            }
+        }
+
+        /**
+         * Sets next flow objective installation context.
+         *
+         * @param nextContext the next flow objective installation context
+         */
+        public void nextContext(FlowObjectiveInstallationContext nextContext) {
+            Objects.requireNonNull(nextContext);
+            this.nextContext = nextContext;
+        }
+
+        /**
+         * Sets objective and device id to this context; reset error states.
+         *
+         * @param objective the objective
+         * @param deviceId the device id
+         */
+        void setObjective(Objective objective, DeviceId deviceId) {
+            Objects.requireNonNull(objective);
+            Objects.requireNonNull(deviceId);
+            this.objective = objective;
+            this.deviceId = deviceId;
+            this.error = null;
+            this.retry = new AtomicInteger(0);
+        }
+
+        /**
+         * Gets the number of retries.
+         *
+         * @return the retry count
+         */
+        int retryTimes() {
+            return this.retry.get();
+        }
+
+        /**
+         * Increases the number of retries.
+         */
+        void increaseRetryValue() {
+            this.retry.incrementAndGet();
+        }
+
+        /**
+         * Completed this context.
+         *
+         * @param error the error of this context if exist; null otherwise
+         */
+        private void finished(ObjectiveError error) {
+            synchronized (intentInstallationContext) {
+                if (error != null) {
+                    this.error = error;
+                    intentInstallationContext.handleObjectiveError(this, error);
+                } else {
+                    // apply next context if exist
+                    if (nextContext != null) {
+                        intentInstallationContext.addPendingContext(nextContext);
+                        flowObjectiveService.apply(nextContext.deviceId,
+                                                   nextContext.objective);
+                        intentInstallationContext.removePendingContext(this);
+                    } else {
+                        intentInstallationContext.removePendingContext(this);
+                    }
+                }
+                if (!intentInstallationContext.pendingContexts().isEmpty()) {
+                    return;
+                }
+                // Apply second stage pending contexts if it is not empty
+                if (!intentInstallationContext.nextPendingContexts().isEmpty()) {
+                    intentInstallationContext.moveNextPendingToPending();
+                    final Set<ObjectiveContext> contextsToApply =
+                            Sets.newHashSet(intentInstallationContext.pendingContexts());
+                    contextsToApply.forEach(ctx -> {
+                        FlowObjectiveInstallationContext foiCtx = (FlowObjectiveInstallationContext) ctx;
+                        flowObjectiveService.apply(foiCtx.deviceId,
+                                                   foiCtx.objective);
+                    });
+                    return;
+                }
+                if (intentInstallationContext.errorContexts().isEmpty()) {
+                    intentInstallCoordinator.intentInstallSuccess(intentInstallationContext.intentOperationContext());
+                } else {
+                    intentInstallCoordinator.intentInstallFailed(intentInstallationContext.intentOperationContext());
+                }
+            }
+        }
+
+        @Override
+        public void onSuccess(Objective objective) {
+            finished(null);
+        }
+
+        @Override
+        public void onError(Objective objective, ObjectiveError error) {
+            finished(error);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("(%s on %s for %s)", error, deviceId, objective);
+        }
+    }
+
+    /**
+     * Installation context for FlowObjective Intent.
+     * Manages states of pending and error flow objective contexts.
+     */
+    class FlowObjectiveIntentInstallationContext {
+        private final IntentOperationContext<FlowObjectiveIntent> intentOperationContext;
+        final List<ObjectiveContext> contexts = Lists.newArrayList();
+        final Set<ObjectiveContext> errorContexts = Sets.newConcurrentHashSet();
+        final Set<ObjectiveContext> pendingContexts = Sets.newConcurrentHashSet();
+
+        // Second stage of pending contexts
+        final Set<ObjectiveContext> nextPendingContexts = Sets.newConcurrentHashSet();
+
+        /**
+         * Creates a flow objective installation context.
+         *
+         * @param intentOperationContext the flow objective installation context
+         */
+        public FlowObjectiveIntentInstallationContext(
+                IntentOperationContext<FlowObjectiveIntent> intentOperationContext) {
+            Objects.requireNonNull(intentOperationContext);
+            this.intentOperationContext = intentOperationContext;
+        }
+
+        /**
+         * Gets Intent operation context of this context.
+         *
+         * @return the Intent operation context
+         */
+        public IntentOperationContext<FlowObjectiveIntent> intentOperationContext() {
+            return intentOperationContext;
+        }
+
+        /**
+         * Applies all contexts to flow objective service.
+         */
+        public void apply() {
+            if (pendingContexts.isEmpty()) {
+                moveNextPendingToPending();
+            }
+            final Set<ObjectiveContext> contextsToApply = pendingContexts();
+            contextsToApply.forEach(ctx -> {
+                FlowObjectiveInstallationContext foiCtx =
+                        (FlowObjectiveInstallationContext) ctx;
+                flowObjectiveService.apply(foiCtx.deviceId, foiCtx.objective);
+            });
+        }
+
+        /**
+         * Gets all error contexts.
+         *
+         * @return the error contexts
+         */
+        public Set<ObjectiveContext> errorContexts() {
+            return ImmutableSet.copyOf(errorContexts);
+        }
+
+        /**
+         * Gets all pending contexts.
+         *
+         * @return the pending contexts
+         */
+        public Set<ObjectiveContext> pendingContexts() {
+            return ImmutableSet.copyOf(pendingContexts);
+        }
+
+        /**
+         * Gets all pending contexts of next stage.
+         *
+         * @return the pending contexts for next stage
+         */
+        public Set<ObjectiveContext> nextPendingContexts() {
+            return ImmutableSet.copyOf(nextPendingContexts);
+        }
+
+        /**
+         * Adds a context.
+         *
+         * @param context the context
+         */
+        public void addContext(ObjectiveContext context) {
+            Objects.requireNonNull(context);
+            contexts.add(context);
+        }
+
+        /**
+         * Adds a context to pending context of next stage.
+         *
+         * @param context the context
+         */
+        public void addNextPendingContext(ObjectiveContext context) {
+            Objects.requireNonNull(context);
+            nextPendingContexts.add(context);
+        }
+
+        /**
+         * Adds a context to pending context.
+         *
+         * @param context the context
+         */
+        public void addPendingContext(ObjectiveContext context) {
+            Objects.requireNonNull(context);
+            pendingContexts.add(context);
+        }
+
+        /**
+         * Removes the pending context.
+         *
+         * @param context the context
+         */
+        public void removePendingContext(ObjectiveContext context) {
+            Objects.requireNonNull(context);
+            pendingContexts.remove(context);
+        }
+
+        /**
+         * Moves pending context from next stage to current stage.
+         */
+        public void moveNextPendingToPending() {
+            pendingContexts.addAll(nextPendingContexts);
+            nextPendingContexts.clear();
+        }
+
+        /**
+         * Handles error of objective context.
+         *
+         * @param ctx the objective context
+         * @param error the error
+         */
+        public void handleObjectiveError(FlowObjectiveInstallationContext ctx,
+                                         ObjectiveError error) {
+            Objects.requireNonNull(ctx);
+            Objects.requireNonNull(error);
+            log.debug("Got error(s) when install objective: {}, error: {}, retry: {}",
+                      ctx.objective, ctx.error, ctx.retry);
+            if (ctx.retryTimes() > OBJECTIVE_RETRY_THRESHOLD) {
+                ctx.error = INSTALLATIONTHRESHOLDEXCEEDED;
+                pendingContexts.remove(ctx);
+                errorContexts.add(ctx);
+                return;
+            }
+            // reset error
+            ctx.error = null;
+            // strategies for errors
+            switch (error) {
+                case GROUPEXISTS:
+                    if (ctx.objective.op() == Objective.Operation.ADD &&
+                            ctx.objective instanceof NextObjective) {
+                        // Next group exists
+                        // build new objective with new op ADD_TO_EXIST
+                        NextObjective newObj =
+                                ((NextObjective.Builder) ctx.objective.copy()).addToExisting(ctx);
+                        ctx.setObjective(newObj, ctx.deviceId);
+                        ctx.increaseRetryValue();
+                        flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    } else {
+                        pendingContexts.remove(ctx);
+                        errorContexts.add(ctx);
+                    }
+                    break;
+                case GROUPINSTALLATIONFAILED:
+                    // Group install failed, retry again
+                    ctx.increaseRetryValue();
+                    flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    break;
+                case GROUPMISSING:
+                    if (ctx.objective.op() == Objective.Operation.ADD_TO_EXISTING) {
+                        // Next group not exist, but we want to add new buckets
+                        // build new objective with new op ADD
+                        NextObjective newObj = (NextObjective) ctx.objective.copy().add(ctx);
+                        ctx.setObjective(newObj, ctx.deviceId);
+                        ctx.increaseRetryValue();
+                        flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    } else if (ctx.objective.op() == Objective.Operation.REMOVE ||
+                            ctx.objective.op() == Objective.Operation.REMOVE_FROM_EXISTING) {
+                        // Already removed, no need to do anything
+                        ctx.error = null;
+                        pendingContexts.remove(ctx);
+                        return;
+                    } else {
+                        // Next chaining group missing, try again.
+                        ctx.increaseRetryValue();
+                        flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    }
+                    break;
+                case FLOWINSTALLATIONFAILED:
+                case GROUPREMOVALFAILED:
+                case INSTALLATIONTIMEOUT:
+                    // Retry
+                    ctx.increaseRetryValue();
+                    flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    break;
+                default:
+                    pendingContexts.remove(ctx);
+                    errorContexts.add(ctx);
+                    break;
+            }
+        }
+    }
+}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/installer/FlowRuleIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/FlowRuleIntentInstaller.java
new file mode 100644
index 0000000..f29ceec
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/FlowRuleIntentInstaller.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.intent.impl.installer;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.intent.FlowRuleIntent;
+import org.onosproject.net.intent.IntentInstallCoordinator;
+import org.onosproject.net.intent.IntentData;
+import org.onosproject.net.intent.IntentExtensionService;
+import org.onosproject.net.intent.IntentOperationContext;
+import org.onosproject.net.intent.IntentInstaller;
+import org.onosproject.net.intent.impl.IntentManager;
+import org.onosproject.net.intent.impl.ObjectiveTrackerService;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import static org.onosproject.net.intent.IntentInstaller.Direction.ADD;
+import static org.onosproject.net.intent.IntentInstaller.Direction.REMOVE;
+import static org.onosproject.net.intent.IntentState.INSTALLED;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Installer for FlowRuleIntent.
+ */
+@Component(immediate = true)
+public class FlowRuleIntentInstaller implements IntentInstaller<FlowRuleIntent> {
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentExtensionService intentExtensionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ObjectiveTrackerService trackerService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentInstallCoordinator intentInstallCoordinator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowRuleService flowRuleService;
+
+    @Activate
+    public void activate() {
+        intentExtensionService.registerInstaller(FlowRuleIntent.class, this);
+    }
+
+    @Deactivate
+    public void deactivated() {
+        intentExtensionService.unregisterInstaller(FlowRuleIntent.class);
+    }
+
+    protected final Logger log = getLogger(IntentManager.class);
+
+    @Override
+    public void apply(IntentOperationContext<FlowRuleIntent> context) {
+        Optional<IntentData> toUninstall = context.toUninstall();
+        Optional<IntentData> toInstall = context.toInstall();
+
+        List<FlowRuleIntent> uninstallIntents = context.intentsToUninstall();
+        List<FlowRuleIntent> installIntents = context.intentsToInstall();
+
+        if (!toInstall.isPresent() && !toUninstall.isPresent()) {
+            intentInstallCoordinator.intentInstallSuccess(context);
+            return;
+        } else if (!toInstall.isPresent()) {
+            // Uninstall only
+            trackIntentResources(toUninstall.get(), uninstallIntents, REMOVE);
+        } else if (!toUninstall.isPresent()) {
+            // Install only
+            trackIntentResources(toInstall.get(), installIntents, ADD);
+        } else {
+            IntentData uninstall = toUninstall.get();
+            IntentData install = toInstall.get();
+
+            // Filter out same intents and intents with same flow rules
+            Iterator<FlowRuleIntent> iterator = installIntents.iterator();
+            while (iterator.hasNext()) {
+                FlowRuleIntent installIntent = iterator.next();
+                uninstallIntents.stream().filter(uIntent -> {
+                    if (uIntent.equals(installIntent)) {
+                        return true;
+                    } else {
+                        return !flowRuleIntentChanged(uIntent, installIntent);
+                    }
+                }).findFirst().ifPresent(common -> {
+                    uninstallIntents.remove(common);
+                    if (INSTALLED.equals(uninstall.state())) {
+                        // only remove the install intent if the existing
+                        // intent (i.e. the uninstall one) is already
+                        // installed or installing
+                        iterator.remove();
+                    }
+                });
+            }
+            trackIntentResources(uninstall, uninstallIntents, REMOVE);
+            trackIntentResources(install, installIntents, ADD);
+        }
+
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        builder.newStage();
+
+        toUninstall.ifPresent(intentData -> {
+            uninstallIntents.stream().map(FlowRuleIntent::flowRules)
+                    .flatMap(Collection::stream).forEach(builder::remove);
+        });
+
+        toInstall.ifPresent(intentData -> {
+            installIntents.stream().map(FlowRuleIntent::flowRules)
+                    .flatMap(Collection::stream).forEach(builder::add);
+        });
+
+        FlowRuleOperationsContext flowRuleOperationsContext = new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                intentInstallCoordinator.intentInstallSuccess(context);
+            }
+
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                intentInstallCoordinator.intentInstallFailed(context);
+            }
+        };
+
+        FlowRuleOperations operations = builder.build(flowRuleOperationsContext);
+
+
+        log.debug("applying intent {} -> {} with {} rules: {}",
+                  toUninstall.map(x -> x.key().toString()).orElse("<empty>"),
+                  toInstall.map(x -> x.key().toString()).orElse("<empty>"),
+                  operations.stages().stream().mapToLong(Set::size).sum(),
+                  operations.stages());
+        flowRuleService.apply(operations);
+    }
+
+    /**
+     * Track or un-track network resource of a Intent and it's installable
+     * Intents.
+     *
+     * @param intentData the Intent data
+     * @param intentsToApply the list of flow rule Intents from the Intent
+     * @param direction the direction to determine track or un-track
+     */
+    private void trackIntentResources(IntentData intentData, List<FlowRuleIntent> intentsToApply, Direction direction) {
+        switch (direction) {
+            case ADD:
+                trackerService.addTrackedResources(intentData.key(), intentData.intent().resources());
+                intentsToApply.forEach(installable ->
+                                               trackerService.addTrackedResources(intentData.key(),
+                                                                                  installable.resources()));
+                break;
+            default:
+                trackerService.removeTrackedResources(intentData.key(), intentData.intent().resources());
+                intentsToApply.forEach(installable ->
+                                               trackerService.removeTrackedResources(intentData.intent().key(),
+                                                                                     installable.resources()));
+                break;
+        }
+    }
+
+    /**
+     * Determines whether there is any flow rule changed
+     * (i.e., different set of flow rules or different treatments)
+     * between FlowRuleIntents to be uninstalled and to be installed.
+     *
+     * @param uninstallIntent FlowRuleIntent to uninstall
+     * @param installIntent   FlowRuleIntent to install
+     * @return true if flow rules which to be uninstalled contains all flow
+     *         rules which to be installed; false otherwise
+     */
+    private boolean flowRuleIntentChanged(FlowRuleIntent uninstallIntent,
+                                          FlowRuleIntent installIntent) {
+        Collection<FlowRule> flowRulesToUninstall = uninstallIntent.flowRules();
+        Collection<FlowRule> flowRulesToInstall = installIntent.flowRules();
+
+        // Check if any flow rule changed
+        for (FlowRule flowRuleToInstall : flowRulesToInstall) {
+            if (flowRulesToUninstall.stream().noneMatch(flowRuleToInstall::exactMatch)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}
\ No newline at end of file
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/installer/ProtectionEndpointIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/ProtectionEndpointIntentInstaller.java
new file mode 100644
index 0000000..58c24e6
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/ProtectionEndpointIntentInstaller.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.intent.impl.installer;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.protection.ProtectedTransportEndpointDescription;
+import org.onosproject.net.behaviour.protection.ProtectionConfig;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.intent.IntentData;
+import org.onosproject.net.intent.IntentException;
+import org.onosproject.net.intent.IntentExtensionService;
+import org.onosproject.net.intent.IntentInstallCoordinator;
+import org.onosproject.net.intent.IntentOperationContext;
+import org.onosproject.net.intent.IntentInstaller;
+import org.onosproject.net.intent.ProtectionEndpointIntent;
+import org.onosproject.net.intent.impl.IntentManager;
+import org.onosproject.net.intent.impl.ObjectiveTrackerService;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.config.NetworkConfigEvent.Type.*;
+import static org.onosproject.net.intent.IntentInstaller.Direction.ADD;
+import static org.onosproject.net.intent.IntentInstaller.Direction.REMOVE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Installer for ProtectionEndpointIntent.
+ */
+@Component(immediate = true)
+public class ProtectionEndpointIntentInstaller implements IntentInstaller<ProtectionEndpointIntent> {
+    private static final String CONFIG_FAILED = "Config operation unsuccessful, expected %s, actual %s.";
+    private final Logger log = getLogger(IntentManager.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentExtensionService intentExtensionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    NetworkConfigService networkConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    IntentInstallCoordinator intentInstallCoordinator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    ObjectiveTrackerService trackerService;
+
+    @Activate
+    public void activate() {
+        intentExtensionService.registerInstaller(ProtectionEndpointIntent.class, this);
+    }
+
+    @Deactivate
+    public void deactivated() {
+        intentExtensionService.unregisterInstaller(ProtectionEndpointIntent.class);
+    }
+
+    @Override
+    public void apply(IntentOperationContext<ProtectionEndpointIntent> context) {
+        Optional<IntentData> toUninstall = context.toUninstall();
+        Optional<IntentData> toInstall = context.toInstall();
+
+        List<ProtectionEndpointIntent> uninstallIntents = context.intentsToUninstall();
+        List<ProtectionEndpointIntent> installIntents = context.intentsToInstall();
+
+        if (!toInstall.isPresent() && !toUninstall.isPresent()) {
+            intentInstallCoordinator.intentInstallSuccess(context);
+            return;
+        }
+
+        if (toUninstall.isPresent()) {
+            IntentData intentData = toUninstall.get();
+            trackerService.removeTrackedResources(intentData.key(), intentData.intent().resources());
+            uninstallIntents.forEach(installable ->
+                                             trackerService.removeTrackedResources(intentData.intent().key(),
+                                                                                   installable.resources()));
+        }
+
+        if (toInstall.isPresent()) {
+            IntentData intentData = toInstall.get();
+            trackerService.addTrackedResources(intentData.key(), intentData.intent().resources());
+            installIntents.forEach(installable ->
+                                           trackerService.addTrackedResources(intentData.key(),
+                                                                              installable.resources()));
+        }
+
+        List<Stage> stages = new ArrayList<>();
+
+        stages.add(new Stage(uninstallIntents.stream()
+                                     .map(i -> Pair.of(i, REMOVE))
+                                     .collect(Collectors.toList())));
+
+        stages.add(new Stage(installIntents.stream()
+                                     .map(i -> Pair.of(i, ADD))
+                                     .collect(Collectors.toList())));
+        for (Stage stage : stages) {
+            log.debug("applying Stage {}", stage);
+            try {
+                // wait for stage completion
+                stage.apply();
+                stage.listeners().forEach(networkConfigService::removeListener);
+            } catch (IntentException e) {
+                log.error("Stage {} failed, reason: {}", stage, e.toString());
+                intentInstallCoordinator.intentInstallFailed(context);
+                return;
+            }
+        }
+        // All stage success
+        intentInstallCoordinator.intentInstallSuccess(context);
+    }
+
+    /**
+     * Stage of installable Intents which can be processed in parallel.
+     */
+    private final class Stage {
+        // should it have progress state, how far it went?
+        private final Collection<Pair<ProtectionEndpointIntent, Direction>> ops;
+        private final Set<NetworkConfigListener> listeners = Sets.newHashSet();
+
+        /**
+         * Create a stage with given operations.
+         *
+         * @param ops the operations
+         */
+        Stage(Collection<Pair<ProtectionEndpointIntent, Direction>> ops) {
+            this.ops = checkNotNull(ops);
+        }
+
+        /**
+         * Applies all operations for this stage.
+         *
+         * @return the CompletableFuture object for this operation
+         */
+        void apply() {
+            ops.stream()
+                    .map(op -> applyOp(op.getRight(), op.getLeft()))
+                    .forEach(future -> {
+                        try {
+                            future.get(100, TimeUnit.MILLISECONDS);
+                        } catch (TimeoutException | InterruptedException | ExecutionException e) {
+                            throw new IntentException(e.toString());
+                        }
+                    });
+        }
+
+        /**
+         * Applies the protection endpoint Intent with a given direction.
+         *
+         * @param dir the direction
+         * @param intent the protection endpoint Intent
+         * @return the CompletableFuture object for this operation
+         */
+        private CompletableFuture<Void> applyOp(Direction dir, ProtectionEndpointIntent intent) {
+            log.trace("applying {}: {}", dir, intent);
+            if (dir == REMOVE) {
+                ProtectionConfigListener listener =
+                        new ProtectionConfigListener(ImmutableSet.of(CONFIG_REMOVED),
+                                                     intent.deviceId());
+                networkConfigService.addListener(listener);
+                listeners.add(listener);
+                networkConfigService.removeConfig(intent.deviceId(), ProtectionConfig.class);
+                return listener.completableFuture();
+            } else {
+                ProtectedTransportEndpointDescription description = intent.description();
+
+                // Can't do following. Will trigger empty CONFIG_ADDED
+                //ProtectionConfig cfg = networkConfigService.addConfig(intent.deviceId(),
+                //                                                      ProtectionConfig.class);
+                ProtectionConfig cfg = new ProtectionConfig(intent.deviceId());
+                cfg.fingerprint(description.fingerprint());
+                cfg.peer(description.peer());
+                cfg.paths(description.paths());
+                ProtectionConfigListener listener =
+                        new ProtectionConfigListener(ImmutableSet.of(CONFIG_ADDED, CONFIG_UPDATED),
+                                                     intent.deviceId());
+
+                networkConfigService.addListener(listener);
+                listeners.add(listener);
+                networkConfigService.applyConfig(intent.deviceId(),
+                                                 ProtectionConfig.class,
+                                                 cfg.node());
+                return listener.completableFuture();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return ops.toString();
+        }
+
+        public Set<NetworkConfigListener> listeners() {
+            return listeners;
+        }
+
+        /**
+         * Listener for protection config for specific config event and device.
+         */
+        class ProtectionConfigListener implements NetworkConfigListener {
+            private CompletableFuture<Void> completableFuture;
+            private Set<NetworkConfigEvent.Type> listenTypes;
+            private DeviceId listenDevice;
+
+            public ProtectionConfigListener(Set<NetworkConfigEvent.Type> listenTypes, DeviceId listenDevice) {
+                completableFuture = new CompletableFuture<>();
+                this.listenTypes = listenTypes;
+                this.listenDevice = listenDevice;
+            }
+
+            @Override
+            public void event(NetworkConfigEvent event) {
+                if (!event.subject().equals(listenDevice)) {
+                    return;
+                }
+                if (!listenTypes.contains(event.type())) {
+                    String errorMsg = String.format(CONFIG_FAILED, listenTypes.toString(), event.type());
+                    completableFuture.completeExceptionally(new IntentException(errorMsg));
+                } else {
+                    completableFuture.complete(null);
+                }
+            }
+
+            public CompletableFuture<Void> completableFuture() {
+                return completableFuture;
+            }
+        }
+    }
+}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/installer/package-info.java b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/package-info.java
new file mode 100644
index 0000000..b971165
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/installer/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Installers for different installable Intents.
+ */
+package org.onosproject.net.intent.impl.installer;
\ No newline at end of file