[ONOS-7598] Fix P4 fabric pipeliner performance issue

Change-Id: I2dc175a2e0036ac3b1af41873b29714db15900e3
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index 93f53db..5c66474 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -16,10 +16,15 @@
 
 package org.onosproject.pipelines.fabric.pipeliner;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.core.GroupId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.behaviour.NextGroup;
@@ -27,6 +32,7 @@
 import org.onosproject.net.behaviour.PipelinerContext;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.net.driver.Driver;
+import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleOperationsContext;
@@ -39,7 +45,6 @@
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.group.Group;
 import org.onosproject.net.group.GroupDescription;
 import org.onosproject.net.group.GroupEvent;
 import org.onosproject.net.group.GroupListener;
@@ -50,11 +55,9 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -71,24 +74,36 @@
             .register(FabricNextGroup.class)
             .build("FabricPipeliner");
 
+    private static final Set<GroupEvent.Type> GROUP_FAILED_TYPES =
+            Sets.newHashSet(GroupEvent.Type.GROUP_ADD_FAILED,
+                            GroupEvent.Type.GROUP_REMOVE_FAILED,
+                            GroupEvent.Type.GROUP_UPDATE_FAILED);
+
     // TODO: make this configurable
     private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
-    private static final Map<Objective.Operation, GroupEvent.Type> OBJ_OP_TO_GRP_EVENT_TYPE =
-            ImmutableMap.<Objective.Operation, GroupEvent.Type>builder()
-                    .put(Objective.Operation.ADD, GroupEvent.Type.GROUP_ADDED)
-                    .put(Objective.Operation.ADD_TO_EXISTING, GroupEvent.Type.GROUP_UPDATED)
-                    .put(Objective.Operation.REMOVE, GroupEvent.Type.GROUP_REMOVED)
-                    .put(Objective.Operation.REMOVE_FROM_EXISTING, GroupEvent.Type.GROUP_UPDATED)
-            .build();
 
     protected DeviceId deviceId;
     protected FlowRuleService flowRuleService;
     protected GroupService groupService;
+    protected GroupListener groupListener = new InternalGroupListener();
     protected FlowObjectiveStore flowObjectiveStore;
     protected FabricFilteringPipeliner pipelinerFilter;
     protected FabricForwardingPipeliner pipelinerForward;
     protected FabricNextPipeliner pipelinerNext;
 
+    private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
+    private Map<GroupId, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
+    private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
+            .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
+            .removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
+                RemovalCause cause = removalNotification.getCause();
+                PendingInstallObjective pio = removalNotification.getValue();
+                if (cause == RemovalCause.EXPIRED && pio != null) {
+                    pio.failed(ObjectiveError.INSTALLATIONTIMEOUT);
+                }
+            })
+            .build();
+
 
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
@@ -96,6 +111,7 @@
         this.deviceId = deviceId;
         this.flowRuleService = context.directory().get(FlowRuleService.class);
         this.groupService = context.directory().get(GroupService.class);
+        this.groupService.addListener(groupListener);
         this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
         this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
         this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
@@ -195,77 +211,65 @@
                                         Consumer<Boolean> callback) {
         Collection<GroupDescription> groups = result.groups();
         Collection<FlowRule> flowRules = result.flowRules();
-        CompletableFuture.supplyAsync(() -> installGroups(objective, groups))
-                .thenApplyAsync(groupSuccess -> groupSuccess && installFlows(objective, flowRules))
-                .thenAcceptAsync(callback)
-                .exceptionally((ex) -> {
-                    log.warn("Got unexpected exception while applying translation result {}: {}",
-                             result, ex);
-                    fail(objective, ObjectiveError.UNKNOWN);
-                    return null;
-                });
+
+        Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
+        Set<GroupId> groupIds = groups.stream().map(GroupDescription::givenGroupId)
+                .map(GroupId::new).collect(Collectors.toSet());
+
+        PendingInstallObjective pio =
+                new PendingInstallObjective(objective, flowIds, groupIds, callback);
+
+        flowIds.forEach(flowId -> {
+            pendingInstallObjectiveFlows.put(flowId, pio);
+        });
+
+        groupIds.forEach(groupId -> {
+            pendingInstallObjectiveGroups.put(groupId, pio);
+        });
+
+        pendingInstallObjectives.put(objective, pio);
+        installGroups(objective, groups);
+        installFlows(objective, flowRules);
     }
 
-    private boolean installFlows(Objective objective, Collection<FlowRule> flowRules) {
+    private void installFlows(Objective objective, Collection<FlowRule> flowRules) {
         if (flowRules.isEmpty()) {
-            return true;
+            return;
         }
-        CompletableFuture<Boolean> flowInstallFuture = new CompletableFuture<>();
+
         FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
             @Override
             public void onSuccess(FlowRuleOperations ops) {
-                flowInstallFuture.complete(true);
+                ops.stages().forEach(stage -> {
+                    stage.forEach(op -> {
+                        FlowId flowId = op.rule().id();
+                        PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(flowId);
+
+                        if (pio != null) {
+                            pio.flowInstalled(flowId);
+                        }
+                    });
+                });
             }
 
             @Override
             public void onError(FlowRuleOperations ops) {
                 log.warn("Failed to install flow rules: {}", flowRules);
-                flowInstallFuture.complete(false);
+                PendingInstallObjective pio = pendingInstallObjectives.getIfPresent(objective);
+                if (pio != null) {
+                    pio.failed(ObjectiveError.FLOWINSTALLATIONFAILED);
+                }
             }
         };
 
         FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx);
         flowRuleService.apply(ops);
-
-        try {
-            return flowInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.warn("Got exception while installing flows:{}", e.getMessage());
-            return false;
-        }
     }
 
-    private boolean installGroups(Objective objective, Collection<GroupDescription> groups) {
+    private void installGroups(Objective objective, Collection<GroupDescription> groups) {
         if (groups.isEmpty()) {
-            return true;
+            return;
         }
-        Collection<Integer> groupIds = groups.stream()
-                .map(GroupDescription::givenGroupId)
-                .collect(Collectors.toSet());
-
-        int numGroupsToBeInstalled = groups.size();
-        CompletableFuture<Boolean> groupInstallFuture = new CompletableFuture<>();
-        AtomicInteger numGroupsInstalled = new AtomicInteger(0);
-
-        GroupListener listener = new GroupListener() {
-            @Override
-            public void event(GroupEvent event) {
-                log.debug("Receive group event for group {}", event.subject());
-                int currentNumGroupInstalled = numGroupsInstalled.incrementAndGet();
-                if (currentNumGroupInstalled == numGroupsToBeInstalled) {
-                    // install completed
-                    groupService.removeListener(this);
-                    groupInstallFuture.complete(true);
-                }
-            }
-            @Override
-            public boolean isRelevant(GroupEvent event) {
-                Group group = event.subject();
-                return groupIds.contains(group.givenGroupId());
-            }
-        };
-
-        groupService.addListener(listener);
 
         switch (objective.op()) {
             case ADD:
@@ -292,14 +296,6 @@
                 break;
             default:
                 log.warn("Unsupported objective operation {}", objective.op());
-                groupService.removeListener(listener);
-        }
-        try {
-            return groupInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            groupService.removeListener(listener);
-            log.warn("Got exception while installing groups: {}", e.getMessage());
-            return false;
         }
     }
 
@@ -352,4 +348,62 @@
         }
     }
 
+    class InternalGroupListener implements GroupListener {
+        @Override
+        public void event(GroupEvent event) {
+            GroupId groupId = event.subject().id();
+            PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId);
+            if (pio == null) {
+                return;
+            }
+            if (GROUP_FAILED_TYPES.contains(event.type())) {
+                pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
+            }
+            pio.groupInstalled(groupId);
+        }
+
+        @Override
+        public boolean isRelevant(GroupEvent event) {
+            return pendingInstallObjectiveGroups.containsKey(event.subject().id());
+        }
+    }
+
+    class PendingInstallObjective {
+        Objective objective;
+        Collection<FlowId> flowIds;
+        Collection<GroupId> groupIds;
+        Consumer<Boolean> callback;
+
+        public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
+                                       Collection<GroupId> groupIds, Consumer<Boolean> callback) {
+            this.objective = objective;
+            this.flowIds = flowIds;
+            this.groupIds = groupIds;
+            this.callback = callback;
+        }
+
+        void flowInstalled(FlowId flowId) {
+            flowIds.remove(flowId);
+            checkIfFinished();
+        }
+
+        void groupInstalled(GroupId groupId) {
+            groupIds.remove(groupId);
+            checkIfFinished();
+        }
+
+        private void checkIfFinished() {
+            if (flowIds.isEmpty() && groupIds.isEmpty()) {
+                pendingInstallObjectives.invalidate(objective);
+                callback.accept(true);
+            }
+        }
+
+        void failed(ObjectiveError error) {
+            flowIds.forEach(pendingInstallObjectiveFlows::remove);
+            groupIds.forEach(pendingInstallObjectiveGroups::remove);
+            pendingInstallObjectives.invalidate(objective);
+            fail(objective, error);
+        }
+    }
 }
diff --git a/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java b/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
index a7b6dce..5414998 100644
--- a/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
+++ b/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
@@ -31,6 +31,7 @@
 import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverHandler;
 import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.group.GroupService;
 import org.onosproject.pipelines.fabric.FabricConstants;
 import org.onosproject.pipelines.fabric.FabricInterpreter;
 
@@ -82,6 +83,7 @@
     public void setup() {
         pipeliner = new FabricPipeliner();
 
+        GroupService mockGroupService = createNiceMock(GroupService.class);
         ServiceDirectory serviceDirectory = createNiceMock(ServiceDirectory.class);
         PipelinerContext pipelinerContext = createNiceMock(PipelinerContext.class);
         DriverHandler driverHandler = createNiceMock(DriverHandler.class);
@@ -90,6 +92,7 @@
         expect(mockDriver.getProperty("noHashedTable")).andReturn("false").anyTimes();
         expect(driverHandler.driver()).andReturn(mockDriver).anyTimes();
         expect(pipelinerContext.directory()).andReturn(serviceDirectory).anyTimes();
+        expect(serviceDirectory.get(GroupService.class)).andReturn(mockGroupService).anyTimes();
         replay(serviceDirectory, pipelinerContext, driverHandler, mockDriver);
         TestUtils.setField(pipeliner, "handler", driverHandler);