Fix another installation timeout

There can be multiple next obj waiting for the same flow, i.e. the egress vlan flow.
The key of map is modified to make sure they won't override each other.

Change-Id: I3b3d5ccb850b534146476a2fb57cc7a364c0cbd3
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index e344404..ca209a3 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -98,7 +98,7 @@
     protected FabricForwardingPipeliner pipelinerForward;
     protected FabricNextPipeliner pipelinerNext;
 
-    private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
+    private Map<PendingFlowKey, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
     private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
     private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
             .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
@@ -106,7 +106,7 @@
                 RemovalCause cause = removalNotification.getCause();
                 PendingInstallObjective pio = removalNotification.getValue();
                 if (cause == RemovalCause.EXPIRED && pio != null) {
-                    pio.failed(ObjectiveError.INSTALLATIONTIMEOUT);
+                    pio.failed(pio.objective, ObjectiveError.INSTALLATIONTIMEOUT);
                 }
             })
             .build();
@@ -228,7 +228,8 @@
                 new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
 
         flowIds.forEach(flowId -> {
-            pendingInstallObjectiveFlows.put(flowId, pio);
+            PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id());
+            pendingInstallObjectiveFlows.put(pfk, pio);
         });
 
         pendingGroupKeys.forEach(pendingGroupKey -> {
@@ -251,7 +252,8 @@
                 ops.stages().forEach(stage -> {
                     stage.forEach(op -> {
                         FlowId flowId = op.rule().id();
-                        PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(flowId);
+                        PendingFlowKey pfk = new PendingFlowKey(flowId, objective.id());
+                        PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(pfk);
 
                         if (pio != null) {
                             pio.flowInstalled(flowId);
@@ -265,7 +267,7 @@
                 log.warn("Failed to install flow rules: {}", flowRules);
                 PendingInstallObjective pio = pendingInstallObjectives.getIfPresent(objective);
                 if (pio != null) {
-                    pio.failed(ObjectiveError.FLOWINSTALLATIONFAILED);
+                    pio.failed(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
                 }
             }
         };
@@ -275,7 +277,10 @@
             flowRuleService.apply(ops);
         } else {
             // remove pendings
-            flowRules.forEach(flowRule -> pendingInstallObjectiveFlows.remove(flowRule.id()));
+            flowRules.forEach(flowRule -> {
+                PendingFlowKey pfk = new PendingFlowKey(flowRule.id(), objective.id());
+                pendingInstallObjectiveFlows.remove(pfk);
+            });
         }
     }
 
@@ -375,7 +380,7 @@
             PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type());
             PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
             if (GROUP_FAILED_TYPES.contains(event.type())) {
-                pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
+                pio.failed(pio.objective, ObjectiveError.GROUPINSTALLATIONFAILED);
             }
             pio.groupInstalled(pendingGroupKey);
         }
@@ -403,13 +408,17 @@
         }
 
         void flowInstalled(FlowId flowId) {
-            flowIds.remove(flowId);
-            checkIfFinished();
+            synchronized (this) {
+                flowIds.remove(flowId);
+                checkIfFinished();
+            }
         }
 
         void groupInstalled(PendingGroupKey pendingGroupKey) {
-            pendingGroupKeys.remove(pendingGroupKey);
-            checkIfFinished();
+            synchronized (this) {
+                pendingGroupKeys.remove(pendingGroupKey);
+                checkIfFinished();
+            }
         }
 
         private void checkIfFinished() {
@@ -419,12 +428,81 @@
             }
         }
 
-        void failed(ObjectiveError error) {
-            flowIds.forEach(pendingInstallObjectiveFlows::remove);
+        void failed(Objective obj, ObjectiveError error) {
+            flowIds.forEach(flowId -> {
+                PendingFlowKey pfk = new PendingFlowKey(flowId, obj.id());
+                pendingInstallObjectiveFlows.remove(pfk);
+            });
             pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
             pendingInstallObjectives.invalidate(objective);
             callback.accept(error);
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PendingInstallObjective pio = (PendingInstallObjective) o;
+            return Objects.equal(objective, pio.objective) &&
+                    Objects.equal(flowIds, pio.flowIds) &&
+                    Objects.equal(pendingGroupKeys, pio.pendingGroupKeys) &&
+                    Objects.equal(callback, pio.callback);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(objective, flowIds, pendingGroupKeys, callback);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("obj", objective)
+                    .add("flowIds", flowIds)
+                    .add("pendingGroupKeys", pendingGroupKeys)
+                    .add("callback", callback)
+                    .toString();
+        }
+    }
+
+    class PendingFlowKey {
+        private FlowId flowId;
+        private int objId;
+
+        PendingFlowKey(FlowId flowId, int objId) {
+            this.flowId = flowId;
+            this.objId = objId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PendingFlowKey pendingFlowKey = (PendingFlowKey) o;
+            return Objects.equal(flowId, pendingFlowKey.flowId) &&
+                    objId == pendingFlowKey.objId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(flowId, objId);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("flowId", flowId)
+                    .add("objId", objId)
+                    .toString();
+        }
     }
 
     class PendingGroupKey {