Improves VERIFY operations

Changes:
- Avoids to sends duplicate next when there are multiple sources
- Introduces a backpressure mechanism to not flood the pipeliners
- Guarantees there are at least 30s between each mcast corrector
execution
- Introduce a pool of 4 verifiers in FlowObjectiveManager to
separate the thread used for the installation/removal of the
FlowObjectives
- Improves logging in verifyGroup

Change-Id: I45aac0f80c9eb6afd763f21977d62df4a98f686e
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 6d2b8cc..d08f111 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1014,21 +1014,11 @@
     }
 
     @Override
-    public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
-        return mcastHandler.getMcastRoles(mcastIp);
-    }
-
-    @Override
     public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp, ConnectPoint sourcecp) {
         return mcastHandler.getMcastRoles(mcastIp, sourcecp);
     }
 
     @Override
-    public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
-        return mcastHandler.getMcastPaths(mcastIp);
-    }
-
-    @Override
     public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
                                                                     ConnectPoint sourcecp) {
         return mcastHandler.getMcastTrees(mcastIp, sourcecp);
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
index 74db140..dd84eeb 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
@@ -296,18 +296,6 @@
     ImmutableMap<DeviceId, Set<PortNumber>> getDownedPortState();
 
     /**
-     * Returns the associated roles to the mcast groups or to the single
-     * group if mcastIp is present.
-     *
-     * @param mcastIp the group ip
-     * @return the mapping mcastIp-device to mcast role
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp);
-
-    /**
      * Returns the associated roles to the mcast groups.
      *
      * @param mcastIp the group ip
@@ -318,17 +306,6 @@
                                                     ConnectPoint sourcecp);
 
     /**
-     * Returns the associated paths to the mcast group.
-     *
-     * @param mcastIp the group ip
-     * @return the mapping egress point to mcast path
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp);
-
-    /**
      * Returns the associated trees to the mcast group.
      *
      * @param mcastIp the group ip
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 9db890d..f7e9cb3 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -71,6 +71,7 @@
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -212,6 +213,8 @@
     private static final long MCAST_STABLITY_THRESHOLD = 5;
     // Last change done
     private Instant lastMcastChange = Instant.now();
+    // Last bucker corrector execution
+    private Instant lastBktCorrExecution = Instant.now();
 
     /**
      * Determines if mcast in the network has been stable in the last
@@ -223,10 +226,23 @@
     private boolean isMcastStable() {
         long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
         long now = (long) (Instant.now().toEpochMilli() / 1000.0);
-        log.trace("Mcast stable since {}s", now - last);
+        log.trace("Multicast stable since {}s", now - last);
         return (now - last) > MCAST_STABLITY_THRESHOLD;
     }
 
+    /**
+     * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
+     * by comparing the current time with the last corrector execution.
+     *
+     * @return true if stable
+     */
+    private boolean wasBktCorrRunning() {
+        long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
+        long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+        log.trace("McastBucketCorrector executed {}s ago", now - last);
+        return (now - last) < MCAST_VERIFY_INTERVAL;
+    }
+
     // Verify interval for Mcast bucket corrector
     private static final long MCAST_VERIFY_INTERVAL = 30;
     // Executor for mcast bucket corrector and for cache
@@ -1767,89 +1783,121 @@
      */
     private final class McastBucketCorrector implements Runnable {
 
+        private static final int MAX_VERIFY_ON_FLIGHT = 10;
+        private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
+        // Define the context used for the back pressure mechanism
+        private final ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> {
+                    synchronized (verifyOnFlight) {
+                        verifyOnFlight.decrementAndGet();
+                        verifyOnFlight.notify();
+                    }
+                },
+                (objective, error) -> {
+                    synchronized (verifyOnFlight) {
+                        verifyOnFlight.decrementAndGet();
+                        verifyOnFlight.notify();
+                    }
+                });
+
         @Override
         public void run() {
-            if (!isMcastStable()) {
+            if (!isMcastStable() || wasBktCorrRunning()) {
                 return;
             }
             mcastLock();
             try {
                 // Iterates over the routes and verify the related next objectives
-                srManager.multicastRouteService.getRoutes()
-                    .stream().map(McastRoute::group)
-                    .forEach(mcastIp -> {
-                        log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
-                        // Verify leadership on the operation
-                        if (!mcastUtils.isLeader(mcastIp)) {
-                            log.trace("Skip {} due to lack of leadership", mcastIp);
-                            return;
+                for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
+                    IpAddress mcastIp = mcastRoute.group();
+                    log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
+                    // Verify leadership on the operation
+                    if (!mcastUtils.isLeader(mcastIp)) {
+                        log.trace("Skip {} due to lack of leadership", mcastIp);
+                        continue;
+                    }
+                    // Get sources and sinks from Mcast Route Service and warn about errors
+                    Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
+                    Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
+                            .flatMap(Collection::stream).collect(Collectors.toSet());
+                    // Do not proceed if sources of this group are missing
+                    if (sources.isEmpty()) {
+                        if (!sinks.isEmpty()) {
+                            log.warn("Unable to run buckets corrector. " +
+                                     "Missing source {} for group {}", sources, mcastIp);
                         }
-                        // Get sources and sinks from Mcast Route Service and warn about errors
-                        Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
-                        Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
-                                .flatMap(Collection::stream).collect(Collectors.toSet());
-                        // Do not proceed if sources of this group are missing
-                        if (sources.isEmpty()) {
+                        continue;
+                    }
+                    // For each group we get current information in the store
+                    // and issue a check of the next objectives in place
+                    Set<McastStoreKey> processedKeys = Sets.newHashSet();
+                    for (ConnectPoint source : sources) {
+                        Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
+                        Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
+                        Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
+                        // Do not proceed if ingress devices are missing
+                        if (ingressDevices.isEmpty()) {
                             if (!sinks.isEmpty()) {
                                 log.warn("Unable to run buckets corrector. " +
-                                                 "Missing source {} for group {}", sources, mcastIp);
+                                "Missing ingress {} for source {} and for group {}",
+                                         ingressDevices, source, mcastIp);
                             }
-                            return;
+                            continue;
                         }
-                        sources.forEach(source -> {
-                            // For each group we get current information in the store
-                            // and issue a check of the next objectives in place
-                            Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
-                            Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
-                            Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
-                            // Do not proceed if ingress devices are missing
-                            if (ingressDevices.isEmpty()) {
-                                if (!sinks.isEmpty()) {
-                                    log.warn("Unable to run buckets corrector. " +
-                                                 "Missing ingress {} for source {} and for group {}",
-                                             ingressDevices, source, mcastIp);
-                                }
+                        // Create the set of the devices to be processed
+                        ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
+                        if (!ingressDevices.isEmpty()) {
+                            devicesBuilder.addAll(ingressDevices);
+                        }
+                        if (!transitDevices.isEmpty()) {
+                            devicesBuilder.addAll(transitDevices);
+                        }
+                        if (!egressDevices.isEmpty()) {
+                            devicesBuilder.addAll(egressDevices);
+                        }
+                        Set<DeviceId> devicesToProcess = devicesBuilder.build();
+                        for (DeviceId deviceId : devicesToProcess) {
+                            if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+                                log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
                                 return;
                             }
-                            // Create the set of the devices to be processed
-                            ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
-                            if (!ingressDevices.isEmpty()) {
-                                devicesBuilder.addAll(ingressDevices);
-                            }
-                            if (!transitDevices.isEmpty()) {
-                                devicesBuilder.addAll(transitDevices);
-                            }
-                            if (!egressDevices.isEmpty()) {
-                                devicesBuilder.addAll(egressDevices);
-                            }
-                            Set<DeviceId> devicesToProcess = devicesBuilder.build();
-                            devicesToProcess.forEach(deviceId -> {
-                                if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
-                                    log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
-                                    return;
+                            synchronized (verifyOnFlight) {
+                                while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
+                                    verifyOnFlight.wait();
                                 }
-                                VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
-                                                                                      source : null);
-                                McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
-                                if (mcastNextObjStore.containsKey(currentKey)) {
-                                    NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
-                                    // Rebuild the next objective using assigned vlan
-                                    currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
-                                                mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify();
-                                    // Send to the flowobjective service
-                                    srManager.flowObjectiveService.next(deviceId, currentNext);
-                                } else {
-                                    log.warn("Unable to run buckets corrector. " +
-                                             "Missing next for {}, for source {} and for group {}",
-                                             deviceId, source, mcastIp);
-                                }
-                            });
-                        });
-                    });
+                            }
+                            VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
+                                                                                  source : null);
+                            McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
+                            // Check if we already processed this next - trees merge at some point
+                            if (processedKeys.contains(currentKey)) {
+                                continue;
+                            }
+                            // Verify the nextobjective or skip to next device
+                            if (mcastNextObjStore.containsKey(currentKey)) {
+                                NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
+                                // Rebuild the next objective using assigned vlan
+                                currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
+                                            mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
+                                // Send to the flowobjective service
+                                srManager.flowObjectiveService.next(deviceId, currentNext);
+                                verifyOnFlight.incrementAndGet();
+                                log.trace("Verify on flight {}", verifyOnFlight);
+                                processedKeys.add(currentKey);
+                            } else {
+                                log.warn("Unable to run buckets corrector. " +
+                                         "Missing next for {}, for source {} and for group {}",
+                                         deviceId, source, mcastIp);
+                            }
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                log.warn("BktCorr has been interrupted");
             } finally {
+                lastBktCorrExecution = Instant.now();
                 mcastUnlock();
             }
-
         }
     }
 
@@ -1884,28 +1932,6 @@
     }
 
     /**
-     * Returns the associated roles to the mcast groups or to the single
-     * group if mcastIp is present.
-     *
-     * @param mcastIp the group ip
-     * @return the mapping mcastIp-device to mcast role
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
-        if (mcastIp != null) {
-            return mcastRoleStore.entrySet().stream()
-                    .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
-                    .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
-                     entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
-        }
-        return mcastRoleStore.entrySet().stream()
-                .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
-                 entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
-    }
-
-    /**
      * Returns the associated roles to the mcast groups.
      *
      * @param mcastIp the group ip
@@ -1932,28 +1958,6 @@
                  entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
     }
 
-
-    /**
-     * Returns the associated paths to the mcast group.
-     *
-     * @param mcastIp the group ip
-     * @return the mapping egress point to mcast path
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
-        Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
-        ConnectPoint source = mcastUtils.getSource(mcastIp);
-        if (source != null) {
-            Set<DeviceId> visited = Sets.newHashSet();
-            List<ConnectPoint> currentPath = Lists.newArrayList(source);
-            mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
-                    currentPath, mcastIp, source);
-        }
-        return mcastPaths;
-    }
-
     /**
      * Returns the associated trees to the mcast group.
      *
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index 6d3f938..78f7cb3 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -250,24 +250,6 @@
     }
 
     /**
-     * Gets source connect point of given multicast group.
-     *
-     * @param mcastIp multicast IP
-     * @return source connect point or null if not found
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    ConnectPoint getSource(IpAddress mcastIp) {
-        McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
-                .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
-                .findFirst().orElse(null);
-        return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
-                .stream()
-                .findFirst().orElse(null);
-    }
-
-    /**
      * Gets sources connect points of given multicast group.
      *
      * @param mcastIp multicast IP