Fix timeout problem of fabric pipeliner

Including a workaround for ONOS-7785

Change-Id: I867161f5edf63e82c42a731a4b107ea326d4675c
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index bd5c8c4..ed2a678 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -472,6 +472,11 @@
                                       "staleLinkAge", "15000");
         compCfgService.preSetProperty("org.onosproject.net.host.impl.HostManager",
                                       "allowDuplicateIps", "false");
+        // For P4 switches
+        compCfgService.preSetProperty("org.onosproject.net.flow.impl.FlowRuleManager",
+                                      "fallbackFlowPollFrequency", "5");
+        compCfgService.preSetProperty("org.onosproject.net.group.impl.GroupManager",
+                                      "fallbackGroupPollFrequency", "5");
         compCfgService.registerProperties(getClass());
         modified(context);
 
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
index 15c1a0d..0dd5f21 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
@@ -19,12 +19,12 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Striped;
-import org.apache.commons.lang3.tuple.Pair;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMulticastGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.group.DefaultGroup;
+import org.onosproject.net.group.DefaultGroupDescription;
 import org.onosproject.net.group.Group;
 import org.onosproject.net.group.GroupDescription;
 import org.onosproject.net.group.GroupOperation;
@@ -116,17 +116,23 @@
         if (!setupBehaviour()) {
             return;
         }
-        groupOps.operations().stream()
-                // Get group type and operation type
-                .map(op -> Pair.of(groupStore.getGroup(deviceId, op.groupId()),
-                                   op.opType()))
-                .forEach(pair -> {
-                    if (pair.getLeft().type().equals(GroupDescription.Type.ALL)) {
-                        processMcGroupOp(deviceId, pair.getLeft(), pair.getRight());
-                    } else {
-                        processGroupOp(deviceId, pair.getLeft(), pair.getRight());
-                    }
-                });
+        groupOps.operations().forEach(op -> {
+            // ONOS-7785 We need app cookie (action profile id) from the group
+            Group groupOnStore = groupStore.getGroup(deviceId, op.groupId());
+            GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
+                                                                     op.groupType(),
+                                                                     op.buckets(),
+                                                                     groupOnStore.appCookie(),
+                                                                     op.groupId().id(),
+                                                                     groupOnStore.appId());
+            DefaultGroup groupToApply = new DefaultGroup(op.groupId(), groupDesc);
+            if (op.groupType().equals(GroupDescription.Type.ALL)) {
+                processMcGroupOp(deviceId, groupToApply, op.opType());
+            } else {
+
+                processGroupOp(deviceId, groupToApply, op.opType());
+            }
+        });
     }
 
     @Override
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index 363a95c..e344404 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.pipelines.fabric.pipeliner;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalCause;
@@ -24,6 +26,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
 import org.onosproject.core.GroupId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
@@ -56,7 +59,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -81,6 +87,7 @@
 
     // TODO: make this configurable
     private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
+    private static final int NUM_CALLBACK_THREAD = 2;
 
     protected DeviceId deviceId;
     protected FlowRuleService flowRuleService;
@@ -92,7 +99,7 @@
     protected FabricNextPipeliner pipelinerNext;
 
     private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
-    private Map<GroupId, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
+    private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
     private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
             .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
             .removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
@@ -103,6 +110,8 @@
                 }
             })
             .build();
+    private static ExecutorService flowObjCallbackExecutor =
+            Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
 
 
     @Override
@@ -126,8 +135,8 @@
             return;
         }
 
-        applyTranslationResult(filterObjective, result, success -> {
-            if (success) {
+        applyTranslationResult(filterObjective, result, error -> {
+            if (error == null) {
                 success(filterObjective);
             } else {
                 fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
@@ -143,8 +152,8 @@
             return;
         }
 
-        applyTranslationResult(forwardObjective, result, success -> {
-            if (success) {
+        applyTranslationResult(forwardObjective, result, error -> {
+            if (error == null) {
                 success(forwardObjective);
             } else {
                 fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
@@ -168,25 +177,22 @@
             return;
         }
 
-        applyTranslationResult(nextObjective, result, success -> {
-            if (!success) {
-                fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
+        applyTranslationResult(nextObjective, result, error -> {
+            if (error != null) {
+                fail(nextObjective, error);
                 return;
             }
 
             // Success, put next group to objective store
             List<PortNumber> portNumbers = Lists.newArrayList();
             nextObjective.next().forEach(treatment -> {
-                Instructions.OutputInstruction outputInst = treatment.allInstructions()
+                treatment.allInstructions()
                         .stream()
                         .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
                         .map(inst -> (Instructions.OutputInstruction) inst)
                         .findFirst()
-                        .orElse(null);
-
-                if (outputInst != null) {
-                    portNumbers.add(outputInst.port());
-                }
+                        .map(Instructions.OutputInstruction::port)
+                        .ifPresent(portNumbers::add);
             });
             FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
                                                             portNumbers);
@@ -208,23 +214,25 @@
 
     private void applyTranslationResult(Objective objective,
                                         PipelinerTranslationResult result,
-                                        Consumer<Boolean> callback) {
+                                        Consumer<ObjectiveError> callback) {
         Collection<GroupDescription> groups = result.groups();
         Collection<FlowRule> flowRules = result.flowRules();
 
         Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
-        Set<GroupId> groupIds = groups.stream().map(GroupDescription::givenGroupId)
-                .map(GroupId::new).collect(Collectors.toSet());
+        Set<PendingGroupKey> pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId)
+                .map(GroupId::new)
+                .map(gid -> new PendingGroupKey(gid, objective.op()))
+                .collect(Collectors.toSet());
 
         PendingInstallObjective pio =
-                new PendingInstallObjective(objective, flowIds, groupIds, callback);
+                new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
 
         flowIds.forEach(flowId -> {
             pendingInstallObjectiveFlows.put(flowId, pio);
         });
 
-        groupIds.forEach(groupId -> {
-            pendingInstallObjectiveGroups.put(groupId, pio);
+        pendingGroupKeys.forEach(pendingGroupKey -> {
+            pendingInstallObjectiveGroups.put(pendingGroupKey, pio);
         });
 
         pendingInstallObjectives.put(objective, pio);
@@ -305,11 +313,16 @@
     }
 
     static void fail(Objective objective, ObjectiveError error) {
-        objective.context().ifPresent(ctx -> ctx.onError(objective, error));
+        CompletableFuture.runAsync(() -> {
+            objective.context().ifPresent(ctx -> ctx.onError(objective, error));
+        }, flowObjCallbackExecutor);
+
     }
 
     static void success(Objective objective) {
-        objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
+        CompletableFuture.runAsync(() -> {
+            objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
+        }, flowObjCallbackExecutor);
     }
 
     static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
@@ -317,16 +330,13 @@
         FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
         switch (objective.op()) {
             case ADD:
+            case ADD_TO_EXISTING: // For egress VLAN
                 flowRules.forEach(ops::add);
                 break;
             case REMOVE:
+            case REMOVE_FROM_EXISTING: // For egress VLAN
                 flowRules.forEach(ops::remove);
                 break;
-            case ADD_TO_EXISTING:
-            case REMOVE_FROM_EXISTING:
-                // Next objective may use ADD_TO_EXIST or REMOVE_FROM_EXIST op
-                // No need to update FlowRuls for vlan_meta table.
-                return null;
             default:
                 log.warn("Unsupported op {} for {}", objective.op(), objective);
                 fail(objective, ObjectiveError.BADPARAMS);
@@ -362,33 +372,33 @@
         @Override
         public void event(GroupEvent event) {
             GroupId groupId = event.subject().id();
-            PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId);
-            if (pio == null) {
-                return;
-            }
+            PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type());
+            PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
             if (GROUP_FAILED_TYPES.contains(event.type())) {
                 pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
             }
-            pio.groupInstalled(groupId);
+            pio.groupInstalled(pendingGroupKey);
         }
 
         @Override
         public boolean isRelevant(GroupEvent event) {
-            return pendingInstallObjectiveGroups.containsKey(event.subject().id());
+            PendingGroupKey pendingGroupKey = new PendingGroupKey(event.subject().id(), event.type());
+            return pendingInstallObjectiveGroups.containsKey(pendingGroupKey);
         }
     }
 
     class PendingInstallObjective {
         Objective objective;
         Collection<FlowId> flowIds;
-        Collection<GroupId> groupIds;
-        Consumer<Boolean> callback;
+        Collection<PendingGroupKey> pendingGroupKeys;
+        Consumer<ObjectiveError> callback;
 
         public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
-                                       Collection<GroupId> groupIds, Consumer<Boolean> callback) {
+                                       Collection<PendingGroupKey> pendingGroupKeys,
+                                       Consumer<ObjectiveError> callback) {
             this.objective = objective;
             this.flowIds = flowIds;
-            this.groupIds = groupIds;
+            this.pendingGroupKeys = pendingGroupKeys;
             this.callback = callback;
         }
 
@@ -397,23 +407,79 @@
             checkIfFinished();
         }
 
-        void groupInstalled(GroupId groupId) {
-            groupIds.remove(groupId);
+        void groupInstalled(PendingGroupKey pendingGroupKey) {
+            pendingGroupKeys.remove(pendingGroupKey);
             checkIfFinished();
         }
 
         private void checkIfFinished() {
-            if (flowIds.isEmpty() && groupIds.isEmpty()) {
+            if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
                 pendingInstallObjectives.invalidate(objective);
-                callback.accept(true);
+                callback.accept(null);
             }
         }
 
         void failed(ObjectiveError error) {
             flowIds.forEach(pendingInstallObjectiveFlows::remove);
-            groupIds.forEach(pendingInstallObjectiveGroups::remove);
+            pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
             pendingInstallObjectives.invalidate(objective);
-            fail(objective, error);
+            callback.accept(error);
+        }
+    }
+
+    class PendingGroupKey {
+        private GroupId groupId;
+        private GroupEvent.Type expectedEventType;
+
+        PendingGroupKey(GroupId groupId, GroupEvent.Type expectedEventType) {
+            this.groupId = groupId;
+            this.expectedEventType = expectedEventType;
+        }
+
+        PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
+            this.groupId = groupId;
+
+            switch (objOp) {
+                case ADD:
+                    expectedEventType = GroupEvent.Type.GROUP_ADDED;
+                    break;
+                case REMOVE:
+                    expectedEventType = GroupEvent.Type.GROUP_REMOVED;
+                    break;
+                case MODIFY:
+                case ADD_TO_EXISTING:
+                case REMOVE_FROM_EXISTING:
+                    expectedEventType = GroupEvent.Type.GROUP_UPDATED;
+                    break;
+                default:
+                    expectedEventType = null;
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PendingGroupKey pendingGroupKey = (PendingGroupKey) o;
+            return Objects.equal(groupId, pendingGroupKey.groupId) &&
+                    expectedEventType == pendingGroupKey.expectedEventType;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(groupId, expectedEventType);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("groupId", groupId)
+                    .add("expectedEventType", expectedEventType)
+                    .toString();
         }
     }
 }