[ONOS-6016] Enhance IntentInstaller for FlowObjective

Note:
Provide order of objective context (for remove only)
Handling objective errors in intent installer

Change-Id: I50bb9d7a17a0ae71d22ba035cd5bc80f485ec45a
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
index bd51902..e00f2b8 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
@@ -66,5 +66,20 @@
     /**
      * An unknown error occurred.
      */
-    UNKNOWN
+    UNKNOWN,
+
+    /**
+     * Flow/Group installation retry threshold exceeded.
+     */
+    INSTALLATIONTHRESHOLDEXCEEDED,
+
+    /**
+     * Installation timeout.
+     */
+    INSTALLATIONTIMEOUT,
+
+    /**
+     * Group already exists.
+     */
+    GROUPEXISTS
 }
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 44fc986..37bf9d2 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -265,7 +265,8 @@
 
     private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
         if (fwd.nextId() == null ||
-                flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
+                flowObjectiveStore.getNextGroup(fwd.nextId()) != null ||
+                fwd.op() == Objective.Operation.REMOVE) {
             // fast path
             return false;
         }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
index 0fd57a9..538aa0e 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
@@ -32,7 +32,10 @@
 import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleOperationsContext;
 import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
@@ -53,12 +56,14 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.net.flowobjective.ObjectiveError.INSTALLATIONTHRESHOLDEXCEEDED;
 import static org.onosproject.net.intent.IntentState.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -68,6 +73,7 @@
 class IntentInstaller {
 
     private static final Logger log = getLogger(IntentInstaller.class);
+    private static final long OBJECTIVE_RETRY_THRESHOLD = 5;
 
     private IntentStore store;
     private ObjectiveTrackerService trackerService;
@@ -516,8 +522,13 @@
 
     // Context for applying and tracking operations related to flow objective intents.
     private class FlowObjectiveOperationContext extends OperationContext {
-        List<FlowObjectiveInstallationContext> contexts = Lists.newLinkedList();
-        final Set<ObjectiveContext> pendingContexts = Sets.newHashSet();
+        private static final String UNSUPPORT_OBJ = "unsupported objective {}";
+        final List<ObjectiveContext> contexts = Lists.newArrayList();
+
+        final Set<ObjectiveContext> pendingContexts = Sets.newConcurrentHashSet();
+
+        // Second stage of pending contexts
+        final Set<ObjectiveContext> nextPendingContexts = Sets.newConcurrentHashSet();
         final Set<ObjectiveContext> errorContexts = Sets.newConcurrentHashSet();
 
         FlowObjectiveOperationContext(IntentInstallationContext context) {
@@ -526,46 +537,127 @@
 
         @Override
         public void prepareIntents(List<Intent> intentsToApply, Direction direction) {
-            intentsToApply.stream()
-                    .filter(x -> x instanceof FlowObjectiveIntent)
-                    .flatMap(x -> buildObjectiveContexts((FlowObjectiveIntent) x, direction).stream())
-                    .forEach(contexts::add);
+            intentsToApply
+                .stream()
+                .filter(intent -> intent instanceof FlowObjectiveIntent)
+                .map(intent -> buildObjectiveContexts((FlowObjectiveIntent) intent, direction))
+                .flatMap(Collection::stream)
+                .forEach(contexts::add);
+
+            // Two stage for different direction context
+            // We will apply REMOVE context first, and apply ADD context.
+            contexts.forEach(context -> {
+                switch (direction) {
+                    case REMOVE:
+                        pendingContexts.add(context);
+                        break;
+                    case ADD:
+                        nextPendingContexts.add(context);
+                        break;
+                    default:
+                        break;
+                }
+            });
         }
 
         // Builds the specified objective in the appropriate direction
-        private List<FlowObjectiveInstallationContext> buildObjectiveContexts(FlowObjectiveIntent intent,
-                                                                              Direction direction) {
+        private Set<? extends ObjectiveContext> buildObjectiveContexts(FlowObjectiveIntent intent,
+                                            Direction direction) {
+            Set<FlowObjectiveInstallationContext> contexts = Sets.newHashSet();
             int size = intent.objectives().size();
-            List<FlowObjectiveInstallationContext> contexts = new ArrayList<>(size);
-            for (int i = 0; i < size; i++) {
-                DeviceId deviceId = intent.devices().get(i);
-                Objective.Builder builder = intent.objectives().get(i).copy();
-                FlowObjectiveInstallationContext context = new FlowObjectiveInstallationContext();
+            List<Objective> objectives = intent.objectives();
+            List<DeviceId> deviceIds = intent.devices();
 
-                final Objective objective;
-                switch (direction) {
-                    case ADD:
-                        objective = builder.add(context);
-                        break;
-                    case REMOVE:
-                        objective = builder.remove(context);
-                        break;
-                    default:
-                        throw new UnsupportedOperationException("Unsupported direction " + direction);
+            if (direction == Direction.ADD) {
+                for (int i = 0; i < size; i++) {
+                    Objective objective = objectives.get(i);
+                    DeviceId deviceId = deviceIds.get(i);
+                    FlowObjectiveInstallationContext ctx =
+                            buildObjectiveContext(objective, deviceId, direction);
+                    contexts.add(ctx);
                 }
-                context.setObjective(objective, deviceId);
-                contexts.add(context);
+                return contexts;
+            } else {
+                // we need to care about ordering here
+                // basic idea is to chain objective contexts
+                for (int i = 0; i < size; i++) {
+                    Objective objective = intent.objectives().get(i);
+                    DeviceId deviceId = intent.devices().get(i);
+
+                    if (objective instanceof FilteringObjective) {
+                        // don't need to care ordering of filtering objective
+                        FlowObjectiveInstallationContext ctx =
+                                buildObjectiveContext(objective, deviceId, direction);
+                        contexts.add(ctx);
+                    } else if (objective instanceof NextObjective) {
+                        // need to removed after forwarding objective
+                        // nothing to do here
+                    } else if (objective instanceof ForwardingObjective) {
+                        // forwarding objective, also find next objective if
+                        // exist
+                        FlowObjectiveInstallationContext fwdCtx =
+                                buildObjectiveContext(objective, deviceId, direction);
+                        ForwardingObjective fwd = (ForwardingObjective) objective;
+                        NextObjective nxt = null;
+                        Integer nextId = fwd.nextId();
+                        if (nextId != null) {
+                            for (int j = 0; j < size; j++) {
+                                if (objectives.get(j).id() == nextId) {
+                                    nxt = (NextObjective) objectives.get(j);
+                                    break;
+                                }
+                            }
+                            // if a next objective exists in the Intent
+                            if (nxt != null) {
+                                FlowObjectiveInstallationContext nxtCtx =
+                                        buildObjectiveContext(nxt, deviceId, direction);
+                                fwdCtx.nextContext(nxtCtx);
+                            }
+                        }
+                        contexts.add(fwdCtx);
+                    } else {
+                        // possible here?
+                        log.warn(UNSUPPORT_OBJ, objective);
+                    }
+                }
             }
             return contexts;
         }
 
+        private FlowObjectiveInstallationContext buildObjectiveContext(Objective objective,
+                                                                       DeviceId deviceId,
+                                                                       Direction direction) {
+            Objective.Builder builder = objective.copy();
+            FlowObjectiveInstallationContext ctx = new FlowObjectiveInstallationContext();
+            switch (direction) {
+                case ADD:
+                    objective = builder.add(ctx);
+                    break;
+                case REMOVE:
+                    objective = builder.remove(ctx);
+                    break;
+                default:
+                    break;
+            }
+            ctx.setObjective(objective, deviceId);
+            return ctx;
+        }
+
         @Override
         void apply() {
-            pendingContexts.addAll(contexts);
-            contexts.forEach(objectiveContext ->
-                flowObjectiveService.apply(objectiveContext.deviceId,
-                                           objectiveContext.objective)
-            );
+            // If there is no pending contexts, try apply second stage
+            // pending contexts
+            if (pendingContexts.isEmpty()) {
+                pendingContexts.addAll(nextPendingContexts);
+                nextPendingContexts.clear();
+            }
+            final Set<ObjectiveContext> contextsToApply = Sets.newHashSet(pendingContexts);
+            contextsToApply.forEach(ctx -> {
+                FlowObjectiveInstallationContext foiCtx =
+                        (FlowObjectiveInstallationContext) ctx;
+
+                flowObjectiveService.apply(foiCtx.deviceId, foiCtx.objective);
+            });
         }
 
         @Override
@@ -581,46 +673,156 @@
                     .add("errorContexts", errorContexts);
         }
 
-
         private class FlowObjectiveInstallationContext implements ObjectiveContext {
             Objective objective;
             DeviceId deviceId;
             ObjectiveError error;
+            AtomicInteger retry;
+            FlowObjectiveInstallationContext nextContext;
 
             void setObjective(Objective objective, DeviceId deviceId) {
+                // init function
                 this.objective = objective;
                 this.deviceId = deviceId;
+                this.error = null;
+                this.retry = new AtomicInteger(0);
+                this.nextContext = null;
             }
 
-            @Override
-            public void onSuccess(Objective objective) {
-                finish();
+            int retryTimes() {
+                return this.retry.get();
             }
 
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                this.error = error;
-                errorContexts.add(this);
-                finish();
+            void increaseRetryValue() {
+                this.retry.incrementAndGet();
             }
 
-            private void finish() {
+            private void finished(ObjectiveError error) {
+
                 synchronized (pendingContexts) {
-                    pendingContexts.remove(this);
-                    if (pendingContexts.isEmpty()) {
-                        if (errorContexts.isEmpty()) {
-                            successConsumer.accept(FlowObjectiveOperationContext.this);
+                    if (error != null) {
+                        this.error = error;
+                        handleObjectiveError(this, error);
+                    } else {
+                        // apply next context if exist
+                        if (nextContext != null) {
+                            pendingContexts.add(nextContext);
+                            flowObjectiveService.apply(nextContext.deviceId,
+                                                       nextContext.objective);
+                            pendingContexts.remove(this);
                         } else {
-                            errorConsumer.accept(FlowObjectiveOperationContext.this);
+                            pendingContexts.remove(this);
                         }
                     }
+                    if (!pendingContexts.isEmpty()) {
+                        return;
+                    }
+                    // Apply second stage pending contexts if it is not empty
+                    if (!nextPendingContexts.isEmpty()) {
+                        pendingContexts.addAll(nextPendingContexts);
+                        nextPendingContexts.clear();
+                        final Set<ObjectiveContext> contextsToApply =
+                                Sets.newHashSet(pendingContexts);
+                        contextsToApply.forEach(ctx -> {
+                            FlowObjectiveInstallationContext foiCtx =
+                                    (FlowObjectiveInstallationContext) ctx;
+                            flowObjectiveService.apply(foiCtx.deviceId,
+                                                       foiCtx.objective);
+                        });
+                        return;
+                    }
+                    if (errorContexts.isEmpty()) {
+                        successConsumer.accept(FlowObjectiveOperationContext.this);
+                    } else {
+                        errorConsumer.accept(FlowObjectiveOperationContext.this);
+                    }
                 }
             }
 
             @Override
+            public void onSuccess(Objective objective) {
+                finished(null);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                finished(error);
+            }
+
+            @Override
             public String toString() {
                 return String.format("(%s on %s for %s)", error, deviceId, objective);
             }
+
+            public void nextContext(FlowObjectiveInstallationContext nextContext) {
+                this.nextContext = nextContext;
+            }
+        }
+
+        private void handleObjectiveError(FlowObjectiveInstallationContext ctx,
+                                          ObjectiveError error) {
+            log.debug("Got error(s) when install objective: {}, error: {}, retry: {}",
+                      ctx.objective, ctx.error, ctx.retry);
+            if (ctx.retryTimes() > OBJECTIVE_RETRY_THRESHOLD) {
+                ctx.error = INSTALLATIONTHRESHOLDEXCEEDED;
+                errorContexts.add(ctx);
+                return;
+            }
+            // reset error
+            ctx.error = null;
+            // strategies for errors
+            switch (error) {
+                case GROUPEXISTS:
+                    if (ctx.objective.op() == Objective.Operation.ADD) {
+                        // Next group exists
+                        // build new objective with new op ADD_TO_EXIST
+                        NextObjective newObj =
+                                ((NextObjective.Builder) ctx.objective.copy()).addToExisting(ctx);
+                        ctx.setObjective(newObj, ctx.deviceId);
+                        ctx.increaseRetryValue();
+                        flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    } else {
+                        pendingContexts.remove(ctx);
+                        errorContexts.add(ctx);
+                    }
+                    break;
+                case GROUPINSTALLATIONFAILED:
+                    // Group install failed, retry again
+                    ctx.increaseRetryValue();
+                    flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    break;
+                case GROUPMISSING:
+                    if (ctx.objective.op() == Objective.Operation.ADD_TO_EXISTING) {
+                        // Next group not exist, but we want to add new buckets
+                        // build new objective with new op ADD
+                        NextObjective newObj = (NextObjective) ctx.objective.copy().add(ctx);
+                        ctx.setObjective(newObj, ctx.deviceId);
+                        ctx.increaseRetryValue();
+                        flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    } else if (ctx.objective.op() == Objective.Operation.REMOVE ||
+                            ctx.objective.op() == Objective.Operation.REMOVE_FROM_EXISTING) {
+                        // Already removed, no need to do anything
+                        ctx.error = null;
+                        pendingContexts.remove(ctx);
+                        return;
+                    } else {
+                        // Next chaining group missing, try again.
+                        ctx.increaseRetryValue();
+                        flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    }
+                    break;
+                case FLOWINSTALLATIONFAILED:
+                case GROUPREMOVALFAILED:
+                case INSTALLATIONTIMEOUT:
+                    // Retry
+                    ctx.increaseRetryValue();
+                    flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+                    break;
+                default:
+                    pendingContexts.remove(ctx);
+                    errorContexts.add(ctx);
+                    break;
+            }
         }
     }