Fix timeout problem of fabric pipeliner

Including a workaround for ONOS-7785

Change-Id: I867161f5edf63e82c42a731a4b107ea326d4675c
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index 363a95c..e344404 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.pipelines.fabric.pipeliner;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalCause;
@@ -24,6 +26,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
 import org.onosproject.core.GroupId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
@@ -56,7 +59,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -81,6 +87,7 @@
 
     // TODO: make this configurable
     private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
+    private static final int NUM_CALLBACK_THREAD = 2;
 
     protected DeviceId deviceId;
     protected FlowRuleService flowRuleService;
@@ -92,7 +99,7 @@
     protected FabricNextPipeliner pipelinerNext;
 
     private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
-    private Map<GroupId, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
+    private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
     private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
             .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
             .removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
@@ -103,6 +110,8 @@
                 }
             })
             .build();
+    private static ExecutorService flowObjCallbackExecutor =
+            Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
 
 
     @Override
@@ -126,8 +135,8 @@
             return;
         }
 
-        applyTranslationResult(filterObjective, result, success -> {
-            if (success) {
+        applyTranslationResult(filterObjective, result, error -> {
+            if (error == null) {
                 success(filterObjective);
             } else {
                 fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
@@ -143,8 +152,8 @@
             return;
         }
 
-        applyTranslationResult(forwardObjective, result, success -> {
-            if (success) {
+        applyTranslationResult(forwardObjective, result, error -> {
+            if (error == null) {
                 success(forwardObjective);
             } else {
                 fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
@@ -168,25 +177,22 @@
             return;
         }
 
-        applyTranslationResult(nextObjective, result, success -> {
-            if (!success) {
-                fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
+        applyTranslationResult(nextObjective, result, error -> {
+            if (error != null) {
+                fail(nextObjective, error);
                 return;
             }
 
             // Success, put next group to objective store
             List<PortNumber> portNumbers = Lists.newArrayList();
             nextObjective.next().forEach(treatment -> {
-                Instructions.OutputInstruction outputInst = treatment.allInstructions()
+                treatment.allInstructions()
                         .stream()
                         .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
                         .map(inst -> (Instructions.OutputInstruction) inst)
                         .findFirst()
-                        .orElse(null);
-
-                if (outputInst != null) {
-                    portNumbers.add(outputInst.port());
-                }
+                        .map(Instructions.OutputInstruction::port)
+                        .ifPresent(portNumbers::add);
             });
             FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
                                                             portNumbers);
@@ -208,23 +214,25 @@
 
     private void applyTranslationResult(Objective objective,
                                         PipelinerTranslationResult result,
-                                        Consumer<Boolean> callback) {
+                                        Consumer<ObjectiveError> callback) {
         Collection<GroupDescription> groups = result.groups();
         Collection<FlowRule> flowRules = result.flowRules();
 
         Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
-        Set<GroupId> groupIds = groups.stream().map(GroupDescription::givenGroupId)
-                .map(GroupId::new).collect(Collectors.toSet());
+        Set<PendingGroupKey> pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId)
+                .map(GroupId::new)
+                .map(gid -> new PendingGroupKey(gid, objective.op()))
+                .collect(Collectors.toSet());
 
         PendingInstallObjective pio =
-                new PendingInstallObjective(objective, flowIds, groupIds, callback);
+                new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
 
         flowIds.forEach(flowId -> {
             pendingInstallObjectiveFlows.put(flowId, pio);
         });
 
-        groupIds.forEach(groupId -> {
-            pendingInstallObjectiveGroups.put(groupId, pio);
+        pendingGroupKeys.forEach(pendingGroupKey -> {
+            pendingInstallObjectiveGroups.put(pendingGroupKey, pio);
         });
 
         pendingInstallObjectives.put(objective, pio);
@@ -305,11 +313,16 @@
     }
 
     static void fail(Objective objective, ObjectiveError error) {
-        objective.context().ifPresent(ctx -> ctx.onError(objective, error));
+        CompletableFuture.runAsync(() -> {
+            objective.context().ifPresent(ctx -> ctx.onError(objective, error));
+        }, flowObjCallbackExecutor);
+
     }
 
     static void success(Objective objective) {
-        objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
+        CompletableFuture.runAsync(() -> {
+            objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
+        }, flowObjCallbackExecutor);
     }
 
     static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
@@ -317,16 +330,13 @@
         FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
         switch (objective.op()) {
             case ADD:
+            case ADD_TO_EXISTING: // For egress VLAN
                 flowRules.forEach(ops::add);
                 break;
             case REMOVE:
+            case REMOVE_FROM_EXISTING: // For egress VLAN
                 flowRules.forEach(ops::remove);
                 break;
-            case ADD_TO_EXISTING:
-            case REMOVE_FROM_EXISTING:
-                // Next objective may use ADD_TO_EXIST or REMOVE_FROM_EXIST op
-                // No need to update FlowRuls for vlan_meta table.
-                return null;
             default:
                 log.warn("Unsupported op {} for {}", objective.op(), objective);
                 fail(objective, ObjectiveError.BADPARAMS);
@@ -362,33 +372,33 @@
         @Override
         public void event(GroupEvent event) {
             GroupId groupId = event.subject().id();
-            PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId);
-            if (pio == null) {
-                return;
-            }
+            PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type());
+            PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
             if (GROUP_FAILED_TYPES.contains(event.type())) {
                 pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
             }
-            pio.groupInstalled(groupId);
+            pio.groupInstalled(pendingGroupKey);
         }
 
         @Override
         public boolean isRelevant(GroupEvent event) {
-            return pendingInstallObjectiveGroups.containsKey(event.subject().id());
+            PendingGroupKey pendingGroupKey = new PendingGroupKey(event.subject().id(), event.type());
+            return pendingInstallObjectiveGroups.containsKey(pendingGroupKey);
         }
     }
 
     class PendingInstallObjective {
         Objective objective;
         Collection<FlowId> flowIds;
-        Collection<GroupId> groupIds;
-        Consumer<Boolean> callback;
+        Collection<PendingGroupKey> pendingGroupKeys;
+        Consumer<ObjectiveError> callback;
 
         public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
-                                       Collection<GroupId> groupIds, Consumer<Boolean> callback) {
+                                       Collection<PendingGroupKey> pendingGroupKeys,
+                                       Consumer<ObjectiveError> callback) {
             this.objective = objective;
             this.flowIds = flowIds;
-            this.groupIds = groupIds;
+            this.pendingGroupKeys = pendingGroupKeys;
             this.callback = callback;
         }
 
@@ -397,23 +407,79 @@
             checkIfFinished();
         }
 
-        void groupInstalled(GroupId groupId) {
-            groupIds.remove(groupId);
+        void groupInstalled(PendingGroupKey pendingGroupKey) {
+            pendingGroupKeys.remove(pendingGroupKey);
             checkIfFinished();
         }
 
         private void checkIfFinished() {
-            if (flowIds.isEmpty() && groupIds.isEmpty()) {
+            if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
                 pendingInstallObjectives.invalidate(objective);
-                callback.accept(true);
+                callback.accept(null);
             }
         }
 
         void failed(ObjectiveError error) {
             flowIds.forEach(pendingInstallObjectiveFlows::remove);
-            groupIds.forEach(pendingInstallObjectiveGroups::remove);
+            pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
             pendingInstallObjectives.invalidate(objective);
-            fail(objective, error);
+            callback.accept(error);
+        }
+    }
+
+    class PendingGroupKey {
+        private GroupId groupId;
+        private GroupEvent.Type expectedEventType;
+
+        PendingGroupKey(GroupId groupId, GroupEvent.Type expectedEventType) {
+            this.groupId = groupId;
+            this.expectedEventType = expectedEventType;
+        }
+
+        PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
+            this.groupId = groupId;
+
+            switch (objOp) {
+                case ADD:
+                    expectedEventType = GroupEvent.Type.GROUP_ADDED;
+                    break;
+                case REMOVE:
+                    expectedEventType = GroupEvent.Type.GROUP_REMOVED;
+                    break;
+                case MODIFY:
+                case ADD_TO_EXISTING:
+                case REMOVE_FROM_EXISTING:
+                    expectedEventType = GroupEvent.Type.GROUP_UPDATED;
+                    break;
+                default:
+                    expectedEventType = null;
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PendingGroupKey pendingGroupKey = (PendingGroupKey) o;
+            return Objects.equal(groupId, pendingGroupKey.groupId) &&
+                    expectedEventType == pendingGroupKey.expectedEventType;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(groupId, expectedEventType);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("groupId", groupId)
+                    .add("expectedEventType", expectedEventType)
+                    .toString();
         }
     }
 }