Improves VERIFY operations

Changes:
- Avoids to sends duplicate next when there are multiple sources
- Introduces a backpressure mechanism to not flood the pipeliners
- Guarantees there are at least 30s between each mcast corrector
execution
- Introduce a pool of 4 verifiers in FlowObjectiveManager to
separate the thread used for the installation/removal of the
FlowObjectives
- Improves logging in verifyGroup

Change-Id: I45aac0f80c9eb6afd763f21977d62df4a98f686e
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 6d2b8cc..d08f111 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1014,21 +1014,11 @@
     }
 
     @Override
-    public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
-        return mcastHandler.getMcastRoles(mcastIp);
-    }
-
-    @Override
     public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp, ConnectPoint sourcecp) {
         return mcastHandler.getMcastRoles(mcastIp, sourcecp);
     }
 
     @Override
-    public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
-        return mcastHandler.getMcastPaths(mcastIp);
-    }
-
-    @Override
     public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
                                                                     ConnectPoint sourcecp) {
         return mcastHandler.getMcastTrees(mcastIp, sourcecp);
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
index 74db140..dd84eeb 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
@@ -296,18 +296,6 @@
     ImmutableMap<DeviceId, Set<PortNumber>> getDownedPortState();
 
     /**
-     * Returns the associated roles to the mcast groups or to the single
-     * group if mcastIp is present.
-     *
-     * @param mcastIp the group ip
-     * @return the mapping mcastIp-device to mcast role
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp);
-
-    /**
      * Returns the associated roles to the mcast groups.
      *
      * @param mcastIp the group ip
@@ -318,17 +306,6 @@
                                                     ConnectPoint sourcecp);
 
     /**
-     * Returns the associated paths to the mcast group.
-     *
-     * @param mcastIp the group ip
-     * @return the mapping egress point to mcast path
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp);
-
-    /**
      * Returns the associated trees to the mcast group.
      *
      * @param mcastIp the group ip
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 9db890d..f7e9cb3 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -71,6 +71,7 @@
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -212,6 +213,8 @@
     private static final long MCAST_STABLITY_THRESHOLD = 5;
     // Last change done
     private Instant lastMcastChange = Instant.now();
+    // Last bucker corrector execution
+    private Instant lastBktCorrExecution = Instant.now();
 
     /**
      * Determines if mcast in the network has been stable in the last
@@ -223,10 +226,23 @@
     private boolean isMcastStable() {
         long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
         long now = (long) (Instant.now().toEpochMilli() / 1000.0);
-        log.trace("Mcast stable since {}s", now - last);
+        log.trace("Multicast stable since {}s", now - last);
         return (now - last) > MCAST_STABLITY_THRESHOLD;
     }
 
+    /**
+     * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
+     * by comparing the current time with the last corrector execution.
+     *
+     * @return true if stable
+     */
+    private boolean wasBktCorrRunning() {
+        long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
+        long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+        log.trace("McastBucketCorrector executed {}s ago", now - last);
+        return (now - last) < MCAST_VERIFY_INTERVAL;
+    }
+
     // Verify interval for Mcast bucket corrector
     private static final long MCAST_VERIFY_INTERVAL = 30;
     // Executor for mcast bucket corrector and for cache
@@ -1767,89 +1783,121 @@
      */
     private final class McastBucketCorrector implements Runnable {
 
+        private static final int MAX_VERIFY_ON_FLIGHT = 10;
+        private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
+        // Define the context used for the back pressure mechanism
+        private final ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> {
+                    synchronized (verifyOnFlight) {
+                        verifyOnFlight.decrementAndGet();
+                        verifyOnFlight.notify();
+                    }
+                },
+                (objective, error) -> {
+                    synchronized (verifyOnFlight) {
+                        verifyOnFlight.decrementAndGet();
+                        verifyOnFlight.notify();
+                    }
+                });
+
         @Override
         public void run() {
-            if (!isMcastStable()) {
+            if (!isMcastStable() || wasBktCorrRunning()) {
                 return;
             }
             mcastLock();
             try {
                 // Iterates over the routes and verify the related next objectives
-                srManager.multicastRouteService.getRoutes()
-                    .stream().map(McastRoute::group)
-                    .forEach(mcastIp -> {
-                        log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
-                        // Verify leadership on the operation
-                        if (!mcastUtils.isLeader(mcastIp)) {
-                            log.trace("Skip {} due to lack of leadership", mcastIp);
-                            return;
+                for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
+                    IpAddress mcastIp = mcastRoute.group();
+                    log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
+                    // Verify leadership on the operation
+                    if (!mcastUtils.isLeader(mcastIp)) {
+                        log.trace("Skip {} due to lack of leadership", mcastIp);
+                        continue;
+                    }
+                    // Get sources and sinks from Mcast Route Service and warn about errors
+                    Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
+                    Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
+                            .flatMap(Collection::stream).collect(Collectors.toSet());
+                    // Do not proceed if sources of this group are missing
+                    if (sources.isEmpty()) {
+                        if (!sinks.isEmpty()) {
+                            log.warn("Unable to run buckets corrector. " +
+                                     "Missing source {} for group {}", sources, mcastIp);
                         }
-                        // Get sources and sinks from Mcast Route Service and warn about errors
-                        Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
-                        Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
-                                .flatMap(Collection::stream).collect(Collectors.toSet());
-                        // Do not proceed if sources of this group are missing
-                        if (sources.isEmpty()) {
+                        continue;
+                    }
+                    // For each group we get current information in the store
+                    // and issue a check of the next objectives in place
+                    Set<McastStoreKey> processedKeys = Sets.newHashSet();
+                    for (ConnectPoint source : sources) {
+                        Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
+                        Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
+                        Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
+                        // Do not proceed if ingress devices are missing
+                        if (ingressDevices.isEmpty()) {
                             if (!sinks.isEmpty()) {
                                 log.warn("Unable to run buckets corrector. " +
-                                                 "Missing source {} for group {}", sources, mcastIp);
+                                "Missing ingress {} for source {} and for group {}",
+                                         ingressDevices, source, mcastIp);
                             }
-                            return;
+                            continue;
                         }
-                        sources.forEach(source -> {
-                            // For each group we get current information in the store
-                            // and issue a check of the next objectives in place
-                            Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
-                            Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
-                            Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
-                            // Do not proceed if ingress devices are missing
-                            if (ingressDevices.isEmpty()) {
-                                if (!sinks.isEmpty()) {
-                                    log.warn("Unable to run buckets corrector. " +
-                                                 "Missing ingress {} for source {} and for group {}",
-                                             ingressDevices, source, mcastIp);
-                                }
+                        // Create the set of the devices to be processed
+                        ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
+                        if (!ingressDevices.isEmpty()) {
+                            devicesBuilder.addAll(ingressDevices);
+                        }
+                        if (!transitDevices.isEmpty()) {
+                            devicesBuilder.addAll(transitDevices);
+                        }
+                        if (!egressDevices.isEmpty()) {
+                            devicesBuilder.addAll(egressDevices);
+                        }
+                        Set<DeviceId> devicesToProcess = devicesBuilder.build();
+                        for (DeviceId deviceId : devicesToProcess) {
+                            if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+                                log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
                                 return;
                             }
-                            // Create the set of the devices to be processed
-                            ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
-                            if (!ingressDevices.isEmpty()) {
-                                devicesBuilder.addAll(ingressDevices);
-                            }
-                            if (!transitDevices.isEmpty()) {
-                                devicesBuilder.addAll(transitDevices);
-                            }
-                            if (!egressDevices.isEmpty()) {
-                                devicesBuilder.addAll(egressDevices);
-                            }
-                            Set<DeviceId> devicesToProcess = devicesBuilder.build();
-                            devicesToProcess.forEach(deviceId -> {
-                                if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
-                                    log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
-                                    return;
+                            synchronized (verifyOnFlight) {
+                                while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
+                                    verifyOnFlight.wait();
                                 }
-                                VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
-                                                                                      source : null);
-                                McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
-                                if (mcastNextObjStore.containsKey(currentKey)) {
-                                    NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
-                                    // Rebuild the next objective using assigned vlan
-                                    currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
-                                                mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify();
-                                    // Send to the flowobjective service
-                                    srManager.flowObjectiveService.next(deviceId, currentNext);
-                                } else {
-                                    log.warn("Unable to run buckets corrector. " +
-                                             "Missing next for {}, for source {} and for group {}",
-                                             deviceId, source, mcastIp);
-                                }
-                            });
-                        });
-                    });
+                            }
+                            VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
+                                                                                  source : null);
+                            McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
+                            // Check if we already processed this next - trees merge at some point
+                            if (processedKeys.contains(currentKey)) {
+                                continue;
+                            }
+                            // Verify the nextobjective or skip to next device
+                            if (mcastNextObjStore.containsKey(currentKey)) {
+                                NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
+                                // Rebuild the next objective using assigned vlan
+                                currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
+                                            mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
+                                // Send to the flowobjective service
+                                srManager.flowObjectiveService.next(deviceId, currentNext);
+                                verifyOnFlight.incrementAndGet();
+                                log.trace("Verify on flight {}", verifyOnFlight);
+                                processedKeys.add(currentKey);
+                            } else {
+                                log.warn("Unable to run buckets corrector. " +
+                                         "Missing next for {}, for source {} and for group {}",
+                                         deviceId, source, mcastIp);
+                            }
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                log.warn("BktCorr has been interrupted");
             } finally {
+                lastBktCorrExecution = Instant.now();
                 mcastUnlock();
             }
-
         }
     }
 
@@ -1884,28 +1932,6 @@
     }
 
     /**
-     * Returns the associated roles to the mcast groups or to the single
-     * group if mcastIp is present.
-     *
-     * @param mcastIp the group ip
-     * @return the mapping mcastIp-device to mcast role
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
-        if (mcastIp != null) {
-            return mcastRoleStore.entrySet().stream()
-                    .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
-                    .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
-                     entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
-        }
-        return mcastRoleStore.entrySet().stream()
-                .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
-                 entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
-    }
-
-    /**
      * Returns the associated roles to the mcast groups.
      *
      * @param mcastIp the group ip
@@ -1932,28 +1958,6 @@
                  entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
     }
 
-
-    /**
-     * Returns the associated paths to the mcast group.
-     *
-     * @param mcastIp the group ip
-     * @return the mapping egress point to mcast path
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
-        Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
-        ConnectPoint source = mcastUtils.getSource(mcastIp);
-        if (source != null) {
-            Set<DeviceId> visited = Sets.newHashSet();
-            List<ConnectPoint> currentPath = Lists.newArrayList(source);
-            mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
-                    currentPath, mcastIp, source);
-        }
-        return mcastPaths;
-    }
-
     /**
      * Returns the associated trees to the mcast group.
      *
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index 6d3f938..78f7cb3 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -250,24 +250,6 @@
     }
 
     /**
-     * Gets source connect point of given multicast group.
-     *
-     * @param mcastIp multicast IP
-     * @return source connect point or null if not found
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    ConnectPoint getSource(IpAddress mcastIp) {
-        McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
-                .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
-                .findFirst().orElse(null);
-        return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
-                .stream()
-                .findFirst().orElse(null);
-    }
-
-    /**
      * Gets sources connect points of given multicast group.
      *
      * @param mcastIp multicast IP
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 3c5c571..4ba0d6e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -105,8 +105,9 @@
     private static final int INSTALL_RETRY_ATTEMPTS = 5;
     private static final long INSTALL_RETRY_INTERVAL = 1000; // ms
 
-    private static final String WORKER_PATTERN = "objective-installer-%d";
-    private static final String GROUP_THREAD_NAME = "onos/objective-installer";
+    private static final String INSTALLER_PATTERN = "installer-%d";
+    private static final String VERIFIER_PATTERN = "verifier-%d";
+    private static final String GROUP_THREAD_NAME = "onos/objective";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -174,14 +175,18 @@
     // for debugging purposes
     private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
 
-    ExecutorService executorService;
+    ExecutorService installerExecutor;
+    ExecutorService verifierExecutor;
     protected ExecutorService devEventExecutor;
 
     @Activate
     protected void activate(ComponentContext context) {
         cfgService.registerProperties(FlowObjectiveManager.class);
-        executorService = newFixedThreadPool(numThreads,
-                                             groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+        installerExecutor = newFixedThreadPool(numThreads,
+                                           groupedThreads(GROUP_THREAD_NAME, INSTALLER_PATTERN, log));
+        verifierExecutor = newFixedThreadPool(numThreads,
+                                           groupedThreads(GROUP_THREAD_NAME, VERIFIER_PATTERN, log));
+
         modified(context);
         devEventExecutor = newSingleThreadScheduledExecutor(
                                        groupedThreads("onos/flowobj-dev-events", "events-%d", log));
@@ -197,7 +202,8 @@
         flowObjectiveStore.unsetDelegate(delegate);
         deviceService.removeListener(deviceListener);
         driverService.removeListener(driverListener);
-        executorService.shutdown();
+        installerExecutor.shutdown();
+        verifierExecutor.shutdown();
         devEventExecutor.shutdownNow();
         devEventExecutor = null;
         pipeliners.clear();
@@ -224,9 +230,15 @@
 
         if (newNumThreads != numThreads && newNumThreads > 0) {
             numThreads = newNumThreads;
-            ExecutorService oldWorkerExecutor = executorService;
-            executorService = newFixedThreadPool(numThreads,
-                                                 groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+            ExecutorService oldWorkerExecutor = installerExecutor;
+            installerExecutor = newFixedThreadPool(numThreads,
+                                         groupedThreads(GROUP_THREAD_NAME, INSTALLER_PATTERN, log));
+            if (oldWorkerExecutor != null) {
+                oldWorkerExecutor.shutdown();
+            }
+            oldWorkerExecutor = verifierExecutor;
+            verifierExecutor = newFixedThreadPool(numThreads,
+                                         groupedThreads(GROUP_THREAD_NAME, VERIFIER_PATTERN, log));
             if (oldWorkerExecutor != null) {
                 oldWorkerExecutor.shutdown();
             }
@@ -269,20 +281,24 @@
      * make a few attempts to find the appropriate driver, then eventually give
      * up and report an error if no suitable driver could be found.
      */
-    class ObjectiveInstaller implements Runnable {
+    class ObjectiveProcessor implements Runnable {
         final DeviceId deviceId;
         final Objective objective;
+        final ExecutorService executor;
 
         private final int numAttempts;
 
-        ObjectiveInstaller(DeviceId deviceId, Objective objective) {
-            this(deviceId, objective, 1);
+        ObjectiveProcessor(DeviceId deviceId, Objective objective,
+                           ExecutorService executorService) {
+            this(deviceId, objective, 1, executorService);
         }
 
-        ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
+        ObjectiveProcessor(DeviceId deviceId, Objective objective, int attempts,
+                           ExecutorService executorService) {
             this.deviceId = checkNotNull(deviceId);
             this.objective = checkNotNull(objective);
-            this.numAttempts = attemps;
+            this.executor = checkNotNull(executorService);
+            this.numAttempts = attempts;
         }
 
         @Override
@@ -302,7 +318,8 @@
                     //Attempts to check if pipeliner is null for retry attempts
                 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
                     Thread.sleep(INSTALL_RETRY_INTERVAL);
-                    executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
+                    executor.execute(new ObjectiveProcessor(deviceId, objective,
+                                                            numAttempts + 1, executor));
                 } else {
                     // Otherwise we've tried a few times and failed, report an
                     // error back to the user.
@@ -311,7 +328,7 @@
                 }
                 //Exception thrown
             } catch (Exception e) {
-                log.warn("Exception while installing flow objective", e);
+                log.warn("Exception while processing flow objective", e);
             }
         }
     }
@@ -319,7 +336,7 @@
     @Override
     public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
         checkPermission(FLOWRULE_WRITE);
-        executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
+        installerExecutor.execute(new ObjectiveProcessor(deviceId, filteringObjective, installerExecutor));
     }
 
     @Override
@@ -329,19 +346,21 @@
                 flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
                 !queueFwdObjective(deviceId, forwardingObjective)) {
             // fast path
-            executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
+            installerExecutor.execute(new ObjectiveProcessor(deviceId, forwardingObjective, installerExecutor));
         }
     }
 
     @Override
     public void next(DeviceId deviceId, NextObjective nextObjective) {
         checkPermission(FLOWRULE_WRITE);
-        if (nextObjective.op() == Operation.ADD ||
-                nextObjective.op() == Operation.VERIFY ||
+        if (nextObjective.op() == Operation.VERIFY) {
+            // Verify does not need to wait
+            verifierExecutor.execute(new ObjectiveProcessor(deviceId, nextObjective, verifierExecutor));
+        } else if (nextObjective.op() == Operation.ADD ||
                 flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
                 !queueNextObjective(deviceId, nextObjective)) {
             // either group exists or we are trying to create it - let it through
-            executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+            installerExecutor.execute(new ObjectiveProcessor(deviceId, nextObjective, installerExecutor));
         }
     }
 
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index 93b6337..8e88e77 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -193,7 +193,7 @@
         mgr = new InOrderFlowObjectiveManager();
         mgr.objTimeoutMs = objTimeoutMs;
         mgr.pipeliners.put(DEV1, pipeliner);
-        mgr.executorService = newFixedThreadPool(4, groupedThreads("foo", "bar"));
+        mgr.installerExecutor = newFixedThreadPool(4, groupedThreads("foo", "bar"));
         mgr.cfgService = createMock(ComponentConfigService.class);
         mgr.deviceService = createMock(DeviceService.class);
         mgr.driverService = createMock(DriverService.class);
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index 851715d..0c68a7a 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -2102,9 +2102,8 @@
                     "nextId:{}, nextObjective-size:{} next-size:{} .. correcting",
                     deviceId, nextObjective.id(), nextObjective.next().size(),
                     allActiveKeys.size());
-            List<Integer> otherIndices =
-                    indicesToRemoveFromNextGroup(allActiveKeys, nextObjective,
-                                                 groupService, deviceId);
+            List<Integer> otherIndices = indicesToRemoveFromNextGroup(allActiveKeys, nextObjective,
+                    groupService, deviceId);
             // Filter out the indices not present
             otherIndices = otherIndices.stream()
                     .filter(index -> !indicesToRemove.contains(index))
@@ -2142,11 +2141,12 @@
             log.info("removing {} buckets as part of nextId: {} verification",
                      indicesToRemove.size(), nextObjective.id());
             List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
-            indicesToRemove.forEach(index -> chainsToRemove
-                                                 .add(allActiveKeys.get(index)));
+            indicesToRemove.forEach(index -> chainsToRemove.add(allActiveKeys.get(index)));
             removeBucket(chainsToRemove, nextObjective);
         }
 
+        log.trace("Checking mismatch with GroupStore device:{} nextId:{}",
+                  deviceId, nextObjective.id());
         if (bucketsToCreate.isEmpty() && indicesToRemove.isEmpty()) {
             // flowObjective store record is in-sync with nextObjective passed-in
             // Nevertheless groupStore may not be in sync due to bug in the store
@@ -2181,8 +2181,7 @@
                         if (validChain.size() < 2) {
                             continue;
                         }
-                        GroupKey pointedGroupKey = validChain.stream()
-                                                       .collect(Collectors.toList()).get(1);
+                        GroupKey pointedGroupKey = validChain.stream().collect(Collectors.toList()).get(1);
                         Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
                         if (pointedGroup != null && gidToCheck.equals(pointedGroup.id())) {
                             matches = true;
@@ -2200,9 +2199,8 @@
                             + "buckets to remove");
                 } else {
                     GroupBuckets removeBuckets = new GroupBuckets(bucketsToRemove);
-                    groupService.removeBucketsFromGroup(deviceId, topGroupKey,
-                                                        removeBuckets, topGroupKey,
-                                                        nextObjective.appId());
+                    groupService.removeBucketsFromGroup(deviceId, topGroupKey, removeBuckets, topGroupKey,
+                            nextObjective.appId());
                 }
             } else if (actualGroupSize < objGroupSize) {
                 // Group in the device has less chains
@@ -2213,8 +2211,7 @@
                     if (validChain.size() < 2) {
                         continue;
                     }
-                    GroupKey pointedGroupKey = validChain.stream()
-                                                   .collect(Collectors.toList()).get(1);
+                    GroupKey pointedGroupKey = validChain.stream().collect(Collectors.toList()).get(1);
                     Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
                     if (pointedGroup == null) {
                         // group should exist, otherwise cannot be added as bucket
@@ -2254,7 +2251,7 @@
                 }
             }
         }
-
+        log.trace("Verify done for device:{} nextId:{}", deviceId, nextObjective.id());
         pass(nextObjective);
     }