Improves VERIFY operations

Changes:
- Avoids to sends duplicate next when there are multiple sources
- Introduces a backpressure mechanism to not flood the pipeliners
- Guarantees there are at least 30s between each mcast corrector
execution
- Introduce a pool of 4 verifiers in FlowObjectiveManager to
separate the thread used for the installation/removal of the
FlowObjectives
- Improves logging in verifyGroup

Change-Id: I45aac0f80c9eb6afd763f21977d62df4a98f686e
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index 851715d..0c68a7a 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -2102,9 +2102,8 @@
                     "nextId:{}, nextObjective-size:{} next-size:{} .. correcting",
                     deviceId, nextObjective.id(), nextObjective.next().size(),
                     allActiveKeys.size());
-            List<Integer> otherIndices =
-                    indicesToRemoveFromNextGroup(allActiveKeys, nextObjective,
-                                                 groupService, deviceId);
+            List<Integer> otherIndices = indicesToRemoveFromNextGroup(allActiveKeys, nextObjective,
+                    groupService, deviceId);
             // Filter out the indices not present
             otherIndices = otherIndices.stream()
                     .filter(index -> !indicesToRemove.contains(index))
@@ -2142,11 +2141,12 @@
             log.info("removing {} buckets as part of nextId: {} verification",
                      indicesToRemove.size(), nextObjective.id());
             List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
-            indicesToRemove.forEach(index -> chainsToRemove
-                                                 .add(allActiveKeys.get(index)));
+            indicesToRemove.forEach(index -> chainsToRemove.add(allActiveKeys.get(index)));
             removeBucket(chainsToRemove, nextObjective);
         }
 
+        log.trace("Checking mismatch with GroupStore device:{} nextId:{}",
+                  deviceId, nextObjective.id());
         if (bucketsToCreate.isEmpty() && indicesToRemove.isEmpty()) {
             // flowObjective store record is in-sync with nextObjective passed-in
             // Nevertheless groupStore may not be in sync due to bug in the store
@@ -2181,8 +2181,7 @@
                         if (validChain.size() < 2) {
                             continue;
                         }
-                        GroupKey pointedGroupKey = validChain.stream()
-                                                       .collect(Collectors.toList()).get(1);
+                        GroupKey pointedGroupKey = validChain.stream().collect(Collectors.toList()).get(1);
                         Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
                         if (pointedGroup != null && gidToCheck.equals(pointedGroup.id())) {
                             matches = true;
@@ -2200,9 +2199,8 @@
                             + "buckets to remove");
                 } else {
                     GroupBuckets removeBuckets = new GroupBuckets(bucketsToRemove);
-                    groupService.removeBucketsFromGroup(deviceId, topGroupKey,
-                                                        removeBuckets, topGroupKey,
-                                                        nextObjective.appId());
+                    groupService.removeBucketsFromGroup(deviceId, topGroupKey, removeBuckets, topGroupKey,
+                            nextObjective.appId());
                 }
             } else if (actualGroupSize < objGroupSize) {
                 // Group in the device has less chains
@@ -2213,8 +2211,7 @@
                     if (validChain.size() < 2) {
                         continue;
                     }
-                    GroupKey pointedGroupKey = validChain.stream()
-                                                   .collect(Collectors.toList()).get(1);
+                    GroupKey pointedGroupKey = validChain.stream().collect(Collectors.toList()).get(1);
                     Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
                     if (pointedGroup == null) {
                         // group should exist, otherwise cannot be added as bucket
@@ -2254,7 +2251,7 @@
                 }
             }
         }
-
+        log.trace("Verify done for device:{} nextId:{}", deviceId, nextObjective.id());
         pass(nextObjective);
     }