Report successful objective operation immediately once the flows/groups are translated

P4 pipeliners are fundamentally different from OpenFlow pipeliners.
It can verify if the flow objective can be translated into a flow that is compatible with the pipeline.
Once verified, it is almost certain the flow can be installed on the switch.
The flow/group subsystem retry mechanism will take care of some rare cases where the flow/group doesn’t get in for the first time.

Therefore, we only fail the objective if there is a translation error in the FabricPipeliner

Change-Id: I868016c0859930fa15b9cdbacb6f72d8c3df307f
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index ca209a3..61d3824 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -18,13 +18,9 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.core.GroupId;
@@ -38,7 +34,6 @@
 import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleOperations;
-import org.onosproject.net.flow.FlowRuleOperationsContext;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.instructions.Instruction;
 import org.onosproject.net.flow.instructions.Instructions;
@@ -50,7 +45,6 @@
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.group.GroupDescription;
 import org.onosproject.net.group.GroupEvent;
-import org.onosproject.net.group.GroupListener;
 import org.onosproject.net.group.GroupService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
@@ -63,7 +57,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -80,36 +73,20 @@
             .register(FabricNextGroup.class)
             .build("FabricPipeliner");
 
-    private static final Set<GroupEvent.Type> GROUP_FAILED_TYPES =
-            Sets.newHashSet(GroupEvent.Type.GROUP_ADD_FAILED,
-                            GroupEvent.Type.GROUP_REMOVE_FAILED,
-                            GroupEvent.Type.GROUP_UPDATE_FAILED);
-
-    // TODO: make this configurable
-    private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
     private static final int NUM_CALLBACK_THREAD = 2;
 
     protected DeviceId deviceId;
     protected FlowRuleService flowRuleService;
     protected GroupService groupService;
-    protected GroupListener groupListener = new InternalGroupListener();
     protected FlowObjectiveStore flowObjectiveStore;
-    protected FabricFilteringPipeliner pipelinerFilter;
-    protected FabricForwardingPipeliner pipelinerForward;
-    protected FabricNextPipeliner pipelinerNext;
+    FabricFilteringPipeliner pipelinerFilter;
+    FabricForwardingPipeliner pipelinerForward;
+    FabricNextPipeliner pipelinerNext;
 
     private Map<PendingFlowKey, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
     private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
-    private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
-            .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
-            .removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
-                RemovalCause cause = removalNotification.getCause();
-                PendingInstallObjective pio = removalNotification.getValue();
-                if (cause == RemovalCause.EXPIRED && pio != null) {
-                    pio.failed(pio.objective, ObjectiveError.INSTALLATIONTIMEOUT);
-                }
-            })
-            .build();
+    private Map<Objective, PendingInstallObjective> pendingInstallObjectives = Maps.newConcurrentMap();
+
     private static ExecutorService flowObjCallbackExecutor =
             Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
 
@@ -120,7 +97,6 @@
         this.deviceId = deviceId;
         this.flowRuleService = context.directory().get(FlowRuleService.class);
         this.groupService = context.directory().get(GroupService.class);
-        this.groupService.addListener(groupListener);
         this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
         this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
         this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
@@ -139,7 +115,8 @@
             if (error == null) {
                 success(filterObjective);
             } else {
-                fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
+                log.info("Ignore error {}. Let flow subsystem retry", error);
+                success(filterObjective);
             }
         });
     }
@@ -156,7 +133,8 @@
             if (error == null) {
                 success(forwardObjective);
             } else {
-                fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
+                log.info("Ignore error {}. Let flow subsystem retry", error);
+                success(forwardObjective);
             }
         });
     }
@@ -177,23 +155,31 @@
             return;
         }
 
+        if (nextObjective.op() == Objective.Operation.MODIFY) {
+            // TODO: support MODIFY operation
+            log.debug("Currently we don't support MODIFY operation, return failure directly to the context");
+            fail(nextObjective, ObjectiveError.UNSUPPORTED);
+            return;
+        }
+
         applyTranslationResult(nextObjective, result, error -> {
             if (error != null) {
-                fail(nextObjective, error);
+                log.info("Ignore error {}. Let flow/group subsystem retry", error);
+                success(nextObjective);
                 return;
             }
 
             // Success, put next group to objective store
             List<PortNumber> portNumbers = Lists.newArrayList();
-            nextObjective.next().forEach(treatment -> {
+            nextObjective.next().forEach(treatment ->
                 treatment.allInstructions()
                         .stream()
                         .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
                         .map(inst -> (Instructions.OutputInstruction) inst)
                         .findFirst()
                         .map(Instructions.OutputInstruction::port)
-                        .ifPresent(portNumbers::add);
-            });
+                        .ifPresent(portNumbers::add)
+            );
             FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
                                                             portNumbers);
             flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
@@ -232,9 +218,9 @@
             pendingInstallObjectiveFlows.put(pfk, pio);
         });
 
-        pendingGroupKeys.forEach(pendingGroupKey -> {
-            pendingInstallObjectiveGroups.put(pendingGroupKey, pio);
-        });
+        pendingGroupKeys.forEach(pendingGroupKey ->
+            pendingInstallObjectiveGroups.put(pendingGroupKey, pio)
+        );
 
         pendingInstallObjectives.put(objective, pio);
         installGroups(objective, groups);
@@ -246,42 +232,17 @@
             return;
         }
 
-        FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
-            @Override
-            public void onSuccess(FlowRuleOperations ops) {
-                ops.stages().forEach(stage -> {
-                    stage.forEach(op -> {
-                        FlowId flowId = op.rule().id();
-                        PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id());
-                        PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
+        FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules);
+        flowRuleService.apply(ops);
 
-                        if (pio != null) {
-                            pio.flowInstalled(flowId);
-                        }
-                    });
-                });
+        flowRules.forEach(flow -> {
+            PendingFlowKey pfk = new PendingFlowKey(flow.id(), objective.id());
+            PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
+
+            if (pio != null) {
+                pio.flowInstalled(flow.id());
             }
-
-            @Override
-            public void onError(FlowRuleOperations ops) {
-                log.warn("Failed to install flow rules: {}", flowRules);
-                PendingInstallObjective pio = pendingInstallObjectives.getIfPresent(objective);
-                if (pio != null) {
-                    pio.failed(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
-                }
-            }
-        };
-
-        FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx);
-        if (ops != null) {
-            flowRuleService.apply(ops);
-        } else {
-            // remove pendings
-            flowRules.forEach(flowRule -> {
-                PendingFlowKey pfk = new PendingFlowKey(flowRule.id(), objective.id());
-                pendingInstallObjectiveFlows.remove(pfk);
-            });
-        }
+        });
     }
 
     private void installGroups(Objective objective, Collection<GroupDescription> groups) {
@@ -297,41 +258,40 @@
                 groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
                 break;
             case ADD_TO_EXISTING:
-                groups.forEach(group -> {
-                    groupService.addBucketsToGroup(deviceId, group.appCookie(),
-                                                   group.buckets(),
-                                                   group.appCookie(),
-                                                   group.appId());
-                });
+                groups.forEach(group -> groupService.addBucketsToGroup(deviceId, group.appCookie(),
+                        group.buckets(), group.appCookie(), group.appId())
+                );
                 break;
             case REMOVE_FROM_EXISTING:
-                groups.forEach(group -> {
-                    groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
-                                                        group.buckets(),
-                                                        group.appCookie(),
-                                                        group.appId());
-                });
+                groups.forEach(group -> groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
+                        group.buckets(), group.appCookie(), group.appId())
+                );
                 break;
             default:
                 log.warn("Unsupported objective operation {}", objective.op());
+                return;
         }
-    }
 
-    static void fail(Objective objective, ObjectiveError error) {
-        CompletableFuture.runAsync(() -> {
-            objective.context().ifPresent(ctx -> ctx.onError(objective, error));
-        }, flowObjCallbackExecutor);
+        groups.forEach(group -> {
+            PendingGroupKey pendingGroupKey = new PendingGroupKey(new GroupId(group.givenGroupId()), objective.op());
+            PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
+            pio.groupInstalled(pendingGroupKey);
+        });
 
     }
 
-    static void success(Objective objective) {
-        CompletableFuture.runAsync(() -> {
-            objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
-        }, flowObjCallbackExecutor);
+    private static void fail(Objective objective, ObjectiveError error) {
+        CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onError(objective, error)),
+                flowObjCallbackExecutor);
+
     }
 
-    static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
-                                               FlowRuleOperationsContext ctx) {
+    private static void success(Objective objective) {
+        CompletableFuture.runAsync(() -> objective.context().ifPresent(ctx -> ctx.onSuccess(objective)),
+                flowObjCallbackExecutor);
+    }
+
+    private static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules) {
         FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
         switch (objective.op()) {
             case ADD:
@@ -347,23 +307,23 @@
                 fail(objective, ObjectiveError.BADPARAMS);
                 return null;
         }
-        return ops.build(ctx);
+        return ops.build();
     }
 
     class FabricNextGroup implements NextGroup {
         private NextObjective.Type type;
         private Collection<PortNumber> outputPorts;
 
-        public FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
+        FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
             this.type = type;
             this.outputPorts = ImmutableList.copyOf(outputPorts);
         }
 
-        public NextObjective.Type type() {
+        NextObjective.Type type() {
             return type;
         }
 
-        public Collection<PortNumber> outputPorts() {
+        Collection<PortNumber> outputPorts() {
             return outputPorts;
         }
 
@@ -373,32 +333,13 @@
         }
     }
 
-    class InternalGroupListener implements GroupListener {
-        @Override
-        public void event(GroupEvent event) {
-            GroupId groupId = event.subject().id();
-            PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type());
-            PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
-            if (GROUP_FAILED_TYPES.contains(event.type())) {
-                pio.failed(pio.objective, ObjectiveError.GROUPINSTALLATIONFAILED);
-            }
-            pio.groupInstalled(pendingGroupKey);
-        }
-
-        @Override
-        public boolean isRelevant(GroupEvent event) {
-            PendingGroupKey pendingGroupKey = new PendingGroupKey(event.subject().id(), event.type());
-            return pendingInstallObjectiveGroups.containsKey(pendingGroupKey);
-        }
-    }
-
     class PendingInstallObjective {
         Objective objective;
         Collection<FlowId> flowIds;
         Collection<PendingGroupKey> pendingGroupKeys;
         Consumer<ObjectiveError> callback;
 
-        public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
+        PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
                                        Collection<PendingGroupKey> pendingGroupKeys,
                                        Consumer<ObjectiveError> callback) {
             this.objective = objective;
@@ -423,7 +364,7 @@
 
         private void checkIfFinished() {
             if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
-                pendingInstallObjectives.invalidate(objective);
+                pendingInstallObjectives.remove(objective);
                 callback.accept(null);
             }
         }
@@ -434,7 +375,7 @@
                 pendingInstallObjectiveFlows.remove(pfk);
             });
             pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
-            pendingInstallObjectives.invalidate(objective);
+            pendingInstallObjectives.remove(objective);
             callback.accept(error);
         }
 
@@ -509,11 +450,6 @@
         private GroupId groupId;
         private GroupEvent.Type expectedEventType;
 
-        PendingGroupKey(GroupId groupId, GroupEvent.Type expectedEventType) {
-            this.groupId = groupId;
-            this.expectedEventType = expectedEventType;
-        }
-
         PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
             this.groupId = groupId;