Support multiple pending add buckets

Change-Id: Ia528a9b52ad9cb935b4a5d0bc41263baabbdb3d3
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
index fe605e9..8637e0b 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
@@ -43,8 +43,10 @@
 import org.onosproject.net.flow.instructions.Instruction;
 import org.onosproject.net.flow.instructions.Instructions;
 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveStore;
 import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.group.DefaultGroupBucket;
 import org.onosproject.net.group.DefaultGroupDescription;
@@ -140,9 +142,9 @@
     // index number for group creation
     private AtomicCounter nextIndex;
 
-    // local store for pending bucketAdds - by design there can only be one
+    // local store for pending bucketAdds - by design there can be multiple
     // pending bucket for a group
-    protected ConcurrentHashMap<Integer, NextObjective> pendingBuckets =
+    protected ConcurrentHashMap<Integer, Set<NextObjective>> pendingBuckets =
             new ConcurrentHashMap<>();
 
     /**
@@ -1054,6 +1056,8 @@
         Set<TrafficTreatment> duplicateBuckets = Sets.newHashSet();
         List<Deque<GroupKey>> allActiveKeys = Ofdpa2Pipeline.appKryo.deserialize(next.data());
         Set<PortNumber> existingPorts = getExistingOutputPorts(allActiveKeys);
+        Set<TrafficTreatment> nonDuplicateBuckets = Sets.newHashSet();
+        NextObjective objectiveToAdd;
 
         nextObjective.next().forEach(trafficTreatment -> {
             PortNumber portNumber = readOutPortFromTreatment(trafficTreatment);
@@ -1064,18 +1068,33 @@
 
             if (existingPorts.contains(portNumber)) {
                 duplicateBuckets.add(trafficTreatment);
+            } else {
+                nonDuplicateBuckets.add(trafficTreatment);
             }
         });
 
         if (!duplicateBuckets.isEmpty()) {
-            log.warn("Some buckets {} already exists in next id {}, abort.",
+            log.debug("Some buckets {} already exists in next id {}, duplicate buckets will be ignored.",
                      duplicateBuckets, nextObjective.id());
+
+            // new next objective with new treatments
+            NextObjective.Builder builder = DefaultNextObjective.builder()
+                    .withType(nextObjective.type())
+                    .withId(nextObjective.id())
+                    .withMeta(nextObjective.meta())
+                    .fromApp(nextObjective.appId());
+            nonDuplicateBuckets.forEach(builder::addTreatment);
+
+            ObjectiveContext context = nextObjective.context().orElse(null);
+            objectiveToAdd = builder.addToExisting(context);
+        } else {
+            objectiveToAdd = nextObjective;
         }
 
         if (nextObjective.type() == NextObjective.Type.HASHED) {
-            addBucketToHashGroup(nextObjective, allActiveKeys);
+            addBucketToHashGroup(objectiveToAdd, allActiveKeys);
         } else if (nextObjective.type() == NextObjective.Type.BROADCAST) {
-            addBucketToBroadcastGroup(nextObjective, allActiveKeys);
+            addBucketToBroadcastGroup(objectiveToAdd, allActiveKeys);
         }
     }
 
@@ -1703,12 +1722,14 @@
                                     .givenGroupId()));
                     Ofdpa2Pipeline.pass(nextGrp.nextObjective());
                     flowObjectiveStore.putNextGroup(nextGrp.nextObjective().id(), nextGrp);
+
                     // check if addBuckets waiting for this completion
-                    NextObjective pendBkt = pendingBuckets
-                            .remove(nextGrp.nextObjective().id());
-                    if (pendBkt != null) {
-                        addBucketToGroup(pendBkt, nextGrp);
-                    }
+                    pendingBuckets.compute(nextGrp.nextObjective().id(), (nextId, pendBkts) -> {
+                        if (pendBkts != null) {
+                            pendBkts.forEach(pendBkt -> addBucketToGroup(pendBkt, nextGrp));
+                        }
+                        return null;
+                    });
                 });
             }
         }
@@ -1917,23 +1938,15 @@
      * types.
      */
     protected enum OfdpaMplsGroupSubType {
-
         MPLS_INTF((short) 0),
-
         L2_VPN((short) 1),
-
         L3_VPN((short) 2),
-
         MPLS_TUNNEL_LABEL_1((short) 3),
-
         MPLS_TUNNEL_LABEL_2((short) 4),
-
         MPLS_SWAP_LABEL((short) 5),
-
         MPLS_ECMP((short) 8);
 
         private short value;
-
         public static final int OFDPA_GROUP_TYPE_SHIFT = 28;
         public static final int OFDPA_MPLS_SUBTYPE_SHIFT = 24;
 
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
index a698a81..4b2800c 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
@@ -16,6 +16,7 @@
 package org.onosproject.driver.pipeline;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpPrefix;
@@ -290,8 +291,15 @@
                 // it is possible that group-chain has not been fully created yet
                 log.debug("Waiting to add bucket to group for next-id:{} in dev:{}",
                           nextObjective.id(), deviceId);
-                // by design only one pending bucket is allowed for the group
-                groupHandler.pendingBuckets.put(nextObjective.id(), nextObjective);
+
+                // by design multiple pending bucket is allowed for the group
+                groupHandler.pendingBuckets.compute(nextObjective.id(), (nextId, pendBkts) -> {
+                    if (pendBkts == null) {
+                        pendBkts = Sets.newHashSet();
+                    }
+                    pendBkts.add(nextObjective);
+                    return pendBkts;
+                });
             }
             break;
         case REMOVE: