CORD-1583 Bug fixes for dual ToRs

Two things:
  - In dual (paired) ToR scenarios it is possible to have the same outport
    in multiple buckets in a hash group, as long as they have different labels.
    When adding buckets this was taken into account. But when removing buckets,
    only outport was being checked. This bug fix ensures that labels are checked
    as well when removing buckets.
  - In dual ToR scenarios, getting the right set of hash buckets proved difficult
    with existing 'retryHash' mechanism. Repealed and replaced with a bucket corrector
    mechanism that periodically corrects the hash group buckets when the topology
    has been stable for the last 10 secs. Required the introduction of a VERIFY
    operation in Next Objectives. Also added a cli command to trigger this
    operation manually.

Change-Id: Ib0d2734060fadc6e7a4bd0d75f3409e194413a97
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index b4af52e..4f16cff 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -66,6 +66,7 @@
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
@@ -1031,9 +1032,10 @@
                 if (label == -1) {
                     duplicateBuckets.add(trafficTreatment);
                 } else {
-                    boolean exists = existingPortAndLabel(allActiveKeys, groupService,
-                                                          deviceId, portNumber, label);
-                    if (exists) {
+                    List<Integer> existing = existingPortAndLabel(allActiveKeys,
+                                                 groupService, deviceId,
+                                                 portNumber, label);
+                    if (!existing.isEmpty()) {
                         duplicateBuckets.add(trafficTreatment);
                     } else {
                         nonDuplicateBuckets.add(trafficTreatment);
@@ -1316,12 +1318,14 @@
     }
 
     /**
-     * Removes the bucket in the top level group of a possible group-chain. Does
+     * Removes buckets in the top level group of a possible group-chain. Does
      * not remove the groups in the group-chain pointed to by this bucket, as they
      * may be in use (referenced by other groups) elsewhere.
      *
-     * @param nextObjective the bucket information for a next group
-     * @param next the representation of the existing group-chain for this next objective
+     * @param nextObjective a next objective that contains information for the
+     *                          buckets to be removed from the group
+     * @param next the representation of the existing group-chains for this next
+     *          objective, from which the top-level buckets to remove are determined
      */
     protected void removeBucketFromGroup(NextObjective nextObjective, NextGroup next) {
         if (nextObjective.type() != NextObjective.Type.HASHED &&
@@ -1331,55 +1335,47 @@
             fail(nextObjective, ObjectiveError.UNSUPPORTED);
             return;
         }
-        Set<PortNumber> portsToRemove = Sets.newHashSet();
-        Collection<TrafficTreatment> treatments = nextObjective.next();
-        for (TrafficTreatment treatment : treatments) {
-            // find the bucket to remove by noting the outport, and figuring out the
-            // top-level group in the group-chain that indirectly references the port
-            PortNumber portToRemove = readOutPortFromTreatment(treatment);
-            if (portToRemove == null) {
-                log.warn("treatment {} of next objective {} has no outport.. cannot remove bucket"
-                       + "from group in dev: {}", treatment, nextObjective.id(), deviceId);
-            } else {
-                portsToRemove.add(portToRemove);
-            }
-        }
-
-        if (portsToRemove.isEmpty()) {
-            log.warn("next objective {} has no outport.. cannot remove bucket"
-                             + "from group in dev: {}", nextObjective.id(), deviceId);
-            fail(nextObjective, ObjectiveError.BADPARAMS);
-        }
-
         List<Deque<GroupKey>> allActiveKeys = appKryo.deserialize(next.data());
-        List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
-        for (Deque<GroupKey> gkeys : allActiveKeys) {
-            // last group in group chain should have a single bucket pointing to port
-            GroupKey groupWithPort = gkeys.peekLast();
-            Group group = groupService.getGroup(deviceId, groupWithPort);
-            if (group == null) {
-                log.warn("Inconsistent group chain found when removing bucket"
-                        + "for next:{} in dev:{}", nextObjective.id(), deviceId);
+        List<Integer> indicesToRemove = Lists.newArrayList();
+        for (TrafficTreatment treatment : nextObjective.next()) {
+            // find the top-level bucket in the group-chain by matching the
+            // outport and label from different groups in the chain
+            PortNumber portToRemove = readOutPortFromTreatment(treatment);
+            int labelToRemove = readLabelFromTreatment(treatment);
+            if (portToRemove == null) {
+                log.warn("treatment {} of next objective {} has no outport.. "
+                        + "cannot remove bucket from group in dev: {}", treatment,
+                        nextObjective.id(), deviceId);
                 continue;
             }
-            if (group.buckets().buckets().isEmpty()) {
-                log.warn("Can't get output port information from group {} " +
-                                 "because there is no bucket in the group.",
-                         group.id().toString());
-                continue;
-            }
-            PortNumber pout = readOutPortFromTreatment(
-                                  group.buckets().buckets().get(0).treatment());
-            if (portsToRemove.contains(pout)) {
-                chainsToRemove.add(gkeys);
-            }
+            List<Integer> existing = existingPortAndLabel(allActiveKeys,
+                                                          groupService, deviceId,
+                                                          portToRemove, labelToRemove);
+            indicesToRemove.addAll(existing);
+
         }
+
+        List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
+        indicesToRemove.forEach(index -> chainsToRemove
+                                .add(allActiveKeys.get(index)));
         if (chainsToRemove.isEmpty()) {
             log.warn("Could not find appropriate group-chain for removing bucket"
                     + " for next id {} in dev:{}", nextObjective.id(), deviceId);
             fail(nextObjective, ObjectiveError.BADPARAMS);
             return;
         }
+        removeBucket(chainsToRemove, nextObjective);
+    }
+
+    /**
+     * Removes top-level buckets from a group that represents the given next objective.
+     *
+     * @param chainsToRemove a list of group bucket chains to remove
+     * @param nextObjective the next objective that contains information for the
+     *                  buckets to be removed from the group
+     */
+    protected void removeBucket(List<Deque<GroupKey>> chainsToRemove,
+                                NextObjective nextObjective) {
         List<GroupBucket> bucketsToRemove = Lists.newArrayList();
         //first group key is the one we want to modify
         GroupKey modGroupKey = chainsToRemove.get(0).peekFirst();
@@ -1387,15 +1383,15 @@
         for (Deque<GroupKey> foundChain : chainsToRemove) {
             //second group key is the one we wish to remove the reference to
             if (foundChain.size() < 2) {
-                // additional check to make sure second group key exist in
+                // additional check to make sure second group key exists in
                 // the chain.
                 log.warn("Can't find second group key from chain {}",
                          foundChain);
                 continue;
             }
-            GroupKey pointedGroupKey = foundChain.stream().collect(Collectors.toList()).get(1);
+            GroupKey pointedGroupKey = foundChain.stream()
+                                           .collect(Collectors.toList()).get(1);
             Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
-
             if (pointedGroup == null) {
                 continue;
             }
@@ -1412,7 +1408,6 @@
                                 .group(pointedGroup.id())
                                 .build());
             }
-
             bucketsToRemove.add(bucket);
         }
 
@@ -1437,15 +1432,20 @@
         groupService.removeBucketsFromGroup(deviceId, modGroupKey,
                                             removeBuckets, modGroupKey,
                                             nextObjective.appId());
-        // update store - synchronize access
+        // update store - synchronize access as there may be multiple threads
+        // trying to remove buckets from the same group, each with its own
+        // potentially stale copy of allActiveKeys
         synchronized (flowObjectiveStore) {
-            // get fresh copy of what the store holds
-            next = flowObjectiveStore.getNextGroup(nextObjective.id());
-            allActiveKeys = appKryo.deserialize(next.data());
+            // get a fresh copy of what the store holds
+            NextGroup next = flowObjectiveStore.getNextGroup(nextObjective.id());
+            List<Deque<GroupKey>> allActiveKeys = appKryo.deserialize(next.data());
             // Note that since we got a new object, and ArrayDeque does not implement
-            // Object.equals(), we have to check the deque last elems one by one
-            allActiveKeys.removeIf(active -> chainsToRemove.stream().anyMatch(remove ->
-                                       remove.peekLast().equals(active.peekLast())));
+            // Object.equals(), we have to check the deque elems one by one
+            allActiveKeys
+                .removeIf(active ->
+                    chainsToRemove.stream().anyMatch(remove ->
+                        Arrays.equals(remove.toArray(new GroupKey[0]),
+                                      active.toArray(new GroupKey[0]))));
             // If no buckets in the group, then retain an entry for the
             // top level group which still exists.
             if (allActiveKeys.isEmpty()) {
@@ -1454,7 +1454,8 @@
                 allActiveKeys.add(top);
             }
             flowObjectiveStore.putNextGroup(nextObjective.id(),
-                                            new OfdpaNextGroup(allActiveKeys, nextObjective));
+                                            new OfdpaNextGroup(allActiveKeys,
+                                                               nextObjective));
         }
     }
 
@@ -1477,6 +1478,79 @@
         flowObjectiveStore.removeNextGroup(nextObjective.id());
     }
 
+    /**
+     *  Checks existing buckets in {@link NextGroup}  to verify if they match
+     *  the buckets in the given {@link NextObjective}. Adds or removes buckets
+     *  to ensure that the buckets match up.
+     *
+     * @param nextObjective the next objective to verify
+     * @param next the representation of the existing group which has to be
+     *             modified to match the given next objective
+     */
+    protected void verifyGroup(NextObjective nextObjective, NextGroup next) {
+        if (nextObjective.type() != NextObjective.Type.HASHED) {
+            log.warn("verification not supported for {} group", nextObjective.type());
+            fail(nextObjective, ObjectiveError.UNSUPPORTED);
+            return;
+        }
+        log.debug("Call to verify device:{} nextId:{}", deviceId, nextObjective.id());
+        List<Deque<GroupKey>> allActiveKeys = appKryo.deserialize(next.data());
+        List<TrafficTreatment> bucketsToCreate = Lists.newArrayList();
+        List<Integer> indicesToRemove = Lists.newArrayList();
+        // XXX verify empty group
+        for (TrafficTreatment bkt : nextObjective.next()) {
+            PortNumber portNumber = readOutPortFromTreatment(bkt);
+            int label = readLabelFromTreatment(bkt);
+            if (portNumber == null) {
+                log.warn("treatment {} of next objective {} has no outport.. "
+                        + "cannot remove bucket from group in dev: {}", bkt,
+                        nextObjective.id(), deviceId);
+                fail(nextObjective, ObjectiveError.BADPARAMS);
+                return;
+            }
+            List<Integer> existing = existingPortAndLabel(allActiveKeys,
+                                                          groupService, deviceId,
+                                                          portNumber, label);
+            if (existing.isEmpty()) {
+                // if it doesn't exist, mark this bucket for creation
+                bucketsToCreate.add(bkt);
+            }
+            if (existing.size() > 1) {
+                // if it exists but there are duplicates, mark the others for removal
+                existing.remove(0);
+                indicesToRemove.addAll(existing);
+            }
+        }
+
+        if (!bucketsToCreate.isEmpty()) {
+            log.info("creating {} buckets as part of nextId: {} verification",
+                     bucketsToCreate.size(), nextObjective.id());
+            //create a nextObjective only with these buckets
+            NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
+                    .withId(nextObjective.id())
+                    .withType(NextObjective.Type.HASHED)
+                    .withMeta(nextObjective.meta())
+                    .fromApp(nextObjective.appId());
+            bucketsToCreate.forEach(bucket -> nextObjBuilder.addTreatment(bucket));
+            addBucketToHashGroup(nextObjBuilder.addToExisting(), allActiveKeys);
+        }
+
+        if (!indicesToRemove.isEmpty()) {
+            log.info("removing {} buckets as part of nextId: {} verification",
+                     indicesToRemove.size(), nextObjective.id());
+            List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
+            indicesToRemove.forEach(index -> chainsToRemove
+                                                 .add(allActiveKeys.get(index)));
+            removeBucket(chainsToRemove, nextObjective);
+        }
+
+        pass(nextObjective);
+    }
+
+    //////////////////////////////////////
+    //  Helper methods and classes
+    //////////////////////////////////////
+
     protected void updatePendingNextObjective(GroupKey groupKey, OfdpaNextGroup nextGrp) {
         pendingAddNextObjectives.asMap().compute(groupKey, (k, val) -> {
             if (val == null) {