Improves VERIFY operations

Changes:
- Avoids to sends duplicate next when there are multiple sources
- Introduces a backpressure mechanism to not flood the pipeliners
- Guarantees there are at least 30s between each mcast corrector
execution
- Introduce a pool of 4 verifiers in FlowObjectiveManager to
separate the thread used for the installation/removal of the
FlowObjectives
- Improves logging in verifyGroup

Change-Id: I45aac0f80c9eb6afd763f21977d62df4a98f686e
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 3c5c571..4ba0d6e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -105,8 +105,9 @@
     private static final int INSTALL_RETRY_ATTEMPTS = 5;
     private static final long INSTALL_RETRY_INTERVAL = 1000; // ms
 
-    private static final String WORKER_PATTERN = "objective-installer-%d";
-    private static final String GROUP_THREAD_NAME = "onos/objective-installer";
+    private static final String INSTALLER_PATTERN = "installer-%d";
+    private static final String VERIFIER_PATTERN = "verifier-%d";
+    private static final String GROUP_THREAD_NAME = "onos/objective";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -174,14 +175,18 @@
     // for debugging purposes
     private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
 
-    ExecutorService executorService;
+    ExecutorService installerExecutor;
+    ExecutorService verifierExecutor;
     protected ExecutorService devEventExecutor;
 
     @Activate
     protected void activate(ComponentContext context) {
         cfgService.registerProperties(FlowObjectiveManager.class);
-        executorService = newFixedThreadPool(numThreads,
-                                             groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+        installerExecutor = newFixedThreadPool(numThreads,
+                                           groupedThreads(GROUP_THREAD_NAME, INSTALLER_PATTERN, log));
+        verifierExecutor = newFixedThreadPool(numThreads,
+                                           groupedThreads(GROUP_THREAD_NAME, VERIFIER_PATTERN, log));
+
         modified(context);
         devEventExecutor = newSingleThreadScheduledExecutor(
                                        groupedThreads("onos/flowobj-dev-events", "events-%d", log));
@@ -197,7 +202,8 @@
         flowObjectiveStore.unsetDelegate(delegate);
         deviceService.removeListener(deviceListener);
         driverService.removeListener(driverListener);
-        executorService.shutdown();
+        installerExecutor.shutdown();
+        verifierExecutor.shutdown();
         devEventExecutor.shutdownNow();
         devEventExecutor = null;
         pipeliners.clear();
@@ -224,9 +230,15 @@
 
         if (newNumThreads != numThreads && newNumThreads > 0) {
             numThreads = newNumThreads;
-            ExecutorService oldWorkerExecutor = executorService;
-            executorService = newFixedThreadPool(numThreads,
-                                                 groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+            ExecutorService oldWorkerExecutor = installerExecutor;
+            installerExecutor = newFixedThreadPool(numThreads,
+                                         groupedThreads(GROUP_THREAD_NAME, INSTALLER_PATTERN, log));
+            if (oldWorkerExecutor != null) {
+                oldWorkerExecutor.shutdown();
+            }
+            oldWorkerExecutor = verifierExecutor;
+            verifierExecutor = newFixedThreadPool(numThreads,
+                                         groupedThreads(GROUP_THREAD_NAME, VERIFIER_PATTERN, log));
             if (oldWorkerExecutor != null) {
                 oldWorkerExecutor.shutdown();
             }
@@ -269,20 +281,24 @@
      * make a few attempts to find the appropriate driver, then eventually give
      * up and report an error if no suitable driver could be found.
      */
-    class ObjectiveInstaller implements Runnable {
+    class ObjectiveProcessor implements Runnable {
         final DeviceId deviceId;
         final Objective objective;
+        final ExecutorService executor;
 
         private final int numAttempts;
 
-        ObjectiveInstaller(DeviceId deviceId, Objective objective) {
-            this(deviceId, objective, 1);
+        ObjectiveProcessor(DeviceId deviceId, Objective objective,
+                           ExecutorService executorService) {
+            this(deviceId, objective, 1, executorService);
         }
 
-        ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
+        ObjectiveProcessor(DeviceId deviceId, Objective objective, int attempts,
+                           ExecutorService executorService) {
             this.deviceId = checkNotNull(deviceId);
             this.objective = checkNotNull(objective);
-            this.numAttempts = attemps;
+            this.executor = checkNotNull(executorService);
+            this.numAttempts = attempts;
         }
 
         @Override
@@ -302,7 +318,8 @@
                     //Attempts to check if pipeliner is null for retry attempts
                 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
                     Thread.sleep(INSTALL_RETRY_INTERVAL);
-                    executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
+                    executor.execute(new ObjectiveProcessor(deviceId, objective,
+                                                            numAttempts + 1, executor));
                 } else {
                     // Otherwise we've tried a few times and failed, report an
                     // error back to the user.
@@ -311,7 +328,7 @@
                 }
                 //Exception thrown
             } catch (Exception e) {
-                log.warn("Exception while installing flow objective", e);
+                log.warn("Exception while processing flow objective", e);
             }
         }
     }
@@ -319,7 +336,7 @@
     @Override
     public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
         checkPermission(FLOWRULE_WRITE);
-        executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
+        installerExecutor.execute(new ObjectiveProcessor(deviceId, filteringObjective, installerExecutor));
     }
 
     @Override
@@ -329,19 +346,21 @@
                 flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
                 !queueFwdObjective(deviceId, forwardingObjective)) {
             // fast path
-            executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
+            installerExecutor.execute(new ObjectiveProcessor(deviceId, forwardingObjective, installerExecutor));
         }
     }
 
     @Override
     public void next(DeviceId deviceId, NextObjective nextObjective) {
         checkPermission(FLOWRULE_WRITE);
-        if (nextObjective.op() == Operation.ADD ||
-                nextObjective.op() == Operation.VERIFY ||
+        if (nextObjective.op() == Operation.VERIFY) {
+            // Verify does not need to wait
+            verifierExecutor.execute(new ObjectiveProcessor(deviceId, nextObjective, verifierExecutor));
+        } else if (nextObjective.op() == Operation.ADD ||
                 flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
                 !queueNextObjective(deviceId, nextObjective)) {
             // either group exists or we are trying to create it - let it through
-            executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+            installerExecutor.execute(new ObjectiveProcessor(deviceId, nextObjective, installerExecutor));
         }
     }