[CORD-2607] Mcast buckets correction

Change-Id: Ib47b2d8e40babdbb2ccdba61b48365a141752016
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
index 94b3a4c..e917b30 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
@@ -50,6 +50,7 @@
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.mcast.McastEvent;
+import org.onosproject.net.mcast.McastRoute;
 import org.onosproject.net.mcast.McastRouteInfo;
 import org.onosproject.net.topology.TopologyService;
 import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
@@ -62,6 +63,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -69,9 +71,15 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
 
 /**
@@ -87,6 +95,49 @@
     private final KryoNamespace.Builder mcastKryo;
     private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
 
+    // Mcast lock to serialize local operations
+    private final Lock mcastLock = new ReentrantLock();
+
+    /**
+     * Acquires the lock used when making mcast changes.
+     */
+    private void mcastLock() {
+        mcastLock.lock();
+    }
+
+    /**
+     * Releases the lock used when making mcast changes.
+     */
+    private void mcastUnlock() {
+        mcastLock.unlock();
+    }
+
+    // Stability threshold for Mcast. Seconds
+    private static final long MCAST_STABLITY_THRESHOLD = 5;
+    // Last change done
+    private Instant lastMcastChange = Instant.now();
+
+    /**
+     * Determines if mcast in the network has been stable in the last
+     * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
+     * to the last mcast change timestamp.
+     *
+     * @return true if stable
+     */
+    private boolean isMcastStable() {
+        long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
+        long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+        log.debug("Mcast stable since {}s", now - last);
+        return (now - last) > MCAST_STABLITY_THRESHOLD;
+    }
+
+    // Verify interval for Mcast
+    private static final long MCAST_VERIFY_INTERVAL = 30;
+
+    // Executor for mcast bucket corrector
+    private ScheduledExecutorService executorService
+            = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
+
     /**
      * Role in the multicast tree.
      */
@@ -129,6 +180,10 @@
                 .withName("onos-mcast-role-store")
                 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
                 .build();
+        // Init the executor service and the buckets corrector
+        executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
+                                               MCAST_VERIFY_INTERVAL,
+                                               TimeUnit.SECONDS);
     }
 
     /**
@@ -145,6 +200,13 @@
     }
 
     /**
+     * Clean up when deactivating the application.
+     */
+    protected void terminate() {
+        executorService.shutdown();
+    }
+
+    /**
      * Processes the SOURCE_ADDED event.
      *
      * @param event McastEvent with SOURCE_ADDED type
@@ -160,9 +222,7 @@
         Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
         IpAddress mcastIp = mcastRouteInfo.route().group();
 
-        sinks.forEach(sink -> {
-            processSinkAddedInternal(source, sink, mcastIp);
-        });
+        sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
     }
 
     /**
@@ -200,43 +260,65 @@
         ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
         IpAddress mcastIp = mcastRouteInfo.route().group();
 
-        // Continue only when this instance is the master of source device
-        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-            log.info("Skip {} due to lack of mastership of the source device {}",
-                    mcastIp, source.deviceId());
-            return;
-        }
+        processSinkRemovedInternal(source, sink, mcastIp);
+    }
 
-        // When source and sink are on the same device
-        if (source.deviceId().equals(sink.deviceId())) {
-            // Source and sink are on even the same port. There must be something wrong.
-            if (source.port().equals(sink.port())) {
-                log.warn("Sink is on the same port of source. Abort");
+    /**
+     * Removes a path from source to sink for given multicast group.
+     *
+     * @param source connect point of the multicast source
+     * @param sink connection point of the multicast sink
+     * @param mcastIp multicast group IP address
+     */
+    private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
+                                          IpAddress mcastIp) {
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            // Continue only when this instance is the master of source device
+            if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+                log.debug("Skip {} due to lack of mastership of the source device {}",
+                         mcastIp, source.deviceId());
                 return;
             }
-            removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
-            return;
-        }
 
-        // Process the egress device
-        boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
-        if (isLast) {
-            mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
-        }
+            // When source and sink are on the same device
+            if (source.deviceId().equals(sink.deviceId())) {
+                // Source and sink are on even the same port. There must be something wrong.
+                if (source.port().equals(sink.port())) {
+                    log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
+                             mcastIp, sink, source);
+                    return;
+                }
+                removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
+                return;
+            }
 
-        // If this is the last sink on the device, also update upstream
-        Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
-        if (mcastPath.isPresent()) {
-            List<Link> links = Lists.newArrayList(mcastPath.get().links());
-            Collections.reverse(links);
-            for (Link link : links) {
-                if (isLast) {
-                    isLast = removePortFromDevice(link.src().deviceId(), link.src().port(),
-                            mcastIp,
-                            assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
-                    mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
+            // Process the egress device
+            boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+            if (isLast) {
+                mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
+            }
+
+            // If this is the last sink on the device, also update upstream
+            Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
+            if (mcastPath.isPresent()) {
+                List<Link> links = Lists.newArrayList(mcastPath.get().links());
+                Collections.reverse(links);
+                for (Link link : links) {
+                    if (isLast) {
+                        isLast = removePortFromDevice(
+                                link.src().deviceId(),
+                                link.src().port(),
+                                mcastIp,
+                                assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
+                        );
+                        mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
+                    }
                 }
             }
+        } finally {
+            mcastUnlock();
         }
     }
 
@@ -249,54 +331,61 @@
      */
     private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
             IpAddress mcastIp) {
-        // Continue only when this instance is the master of source device
-        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-            log.info("Skip {} due to lack of mastership of the source device {}",
-                    source.deviceId());
-            return;
-        }
-
-        // Process the ingress device
-        addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
-
-        // When source and sink are on the same device
-        if (source.deviceId().equals(sink.deviceId())) {
-            // Source and sink are on even the same port. There must be something wrong.
-            if (source.port().equals(sink.port())) {
-                log.warn("Sink is on the same port of source. Abort");
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            // Continue only when this instance is the master of source device
+            if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+                log.debug("Skip {} due to lack of mastership of the source device {}",
+                          mcastIp, source.deviceId());
                 return;
             }
-            addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
-            mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
-            return;
-        }
 
-        // Find a path. If present, create/update groups and flows for each hop
-        Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
-        if (mcastPath.isPresent()) {
-            List<Link> links = mcastPath.get().links();
-            checkState(links.size() == 2,
-                    "Path in leaf-spine topology should always be two hops: ", links);
+            // Process the ingress device
+            addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
 
-            links.forEach(link -> {
-                addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
-                        assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
-                addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
-            });
+            // When source and sink are on the same device
+            if (source.deviceId().equals(sink.deviceId())) {
+                // Source and sink are on even the same port. There must be something wrong.
+                if (source.port().equals(sink.port())) {
+                    log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
+                             mcastIp, sink, source);
+                    return;
+                }
+                addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
+                mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
+                return;
+            }
 
-            // Process the egress device
-            addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+            // Find a path. If present, create/update groups and flows for each hop
+            Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
+            if (mcastPath.isPresent()) {
+                List<Link> links = mcastPath.get().links();
+                checkState(links.size() == 2,
+                           "Path in leaf-spine topology should always be two hops: ", links);
 
-            // Setup mcast roles
-            mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
-                    McastRole.INGRESS);
-            mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
-                    McastRole.TRANSIT);
-            mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
-                    McastRole.EGRESS);
-        } else {
-            log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
-                    source.deviceId(), sink.deviceId());
+                links.forEach(link -> {
+                    addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
+                                    assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
+                    addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
+                });
+
+                // Process the egress device
+                addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+
+                // Setup mcast roles
+                mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
+                                   McastRole.INGRESS);
+                mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
+                                   McastRole.TRANSIT);
+                mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
+                                   McastRole.EGRESS);
+            } else {
+                log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
+                         source.deviceId(), sink.deviceId());
+            }
+        } finally {
+            mcastUnlock();
         }
     }
 
@@ -306,56 +395,63 @@
      * @param affectedLink Link that is going down
      */
     protected void processLinkDown(Link affectedLink) {
-        getAffectedGroups(affectedLink).forEach(mcastIp -> {
-            // TODO Optimize when the group editing is in place
-            log.debug("Processing link down {} for group {}",
-                      affectedLink, mcastIp);
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            // Get groups affected by the link down event
+            getAffectedGroups(affectedLink).forEach(mcastIp -> {
+                // TODO Optimize when the group editing is in place
+                log.debug("Processing link down {} for group {}",
+                          affectedLink, mcastIp);
 
-            // Find out the ingress, transit and egress device of affected group
-            DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
-                    .stream().findAny().orElse(null);
-            DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
-                    .stream().findAny().orElse(null);
-            Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
-            ConnectPoint source = getSource(mcastIp);
+                // Find out the ingress, transit and egress device of affected group
+                DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+                        .stream().findAny().orElse(null);
+                DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
+                        .stream().findAny().orElse(null);
+                Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
+                ConnectPoint source = getSource(mcastIp);
 
-            // Do not proceed if any of these info is missing
-            if (ingressDevice == null || transitDevice == null
-                    || egressDevices == null || source == null) {
-                log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
-                        ingressDevice, transitDevice, egressDevices, source);
-                return;
-            }
-
-            // Continue only when this instance is the master of source device
-            if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-                log.info("Skip {} due to lack of mastership of the source device {}",
-                        source.deviceId());
-                return;
-            }
-
-            // Remove entire transit
-            removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
-
-            // Remove transit-facing port on ingress device
-            PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
-            if (ingressTransitPort != null) {
-                removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
-                mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
-            }
-
-            // Construct a new path for each egress device
-            egressDevices.forEach(egressDevice -> {
-                Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
-                if (mcastPath.isPresent()) {
-                    installPath(mcastIp, source, mcastPath.get());
-                } else {
-                    log.warn("Fail to recover egress device {} from link failure {}",
-                            egressDevice, affectedLink);
-                    removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+                // Do not proceed if any of these info is missing
+                if (ingressDevice == null || transitDevice == null
+                        || egressDevices == null || source == null) {
+                    log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
+                             ingressDevice, transitDevice, egressDevices, source);
+                    return;
                 }
+
+                // Continue only when this instance is the master of source device
+                if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+                    log.debug("Skip {} due to lack of mastership of the source device {}",
+                             source.deviceId());
+                    return;
+                }
+
+                // Remove entire transit
+                removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
+
+                // Remove transit-facing port on ingress device
+                PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
+                if (ingressTransitPort != null) {
+                    removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
+                    mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
+                }
+
+                // Construct a new path for each egress device
+                egressDevices.forEach(egressDevice -> {
+                    Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+                    if (mcastPath.isPresent()) {
+                        installPath(mcastIp, source, mcastPath.get());
+                    } else {
+                        log.warn("Fail to recover egress device {} from link failure {}",
+                                 egressDevice, affectedLink);
+                        removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+                    }
+                });
             });
-        });
+        } finally {
+            mcastUnlock();
+        }
     }
 
     /**
@@ -364,103 +460,109 @@
      * @param deviceDown device going down
      */
     protected void processDeviceDown(DeviceId deviceDown) {
-        // Get the mcast groups affected by the device going down
-        getAffectedGroups(deviceDown).forEach(mcastIp -> {
-            // TODO Optimize when the group editing is in place
-            log.debug("Processing device down {} for group {}",
-                      deviceDown, mcastIp);
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            // Get the mcast groups affected by the device going down
+            getAffectedGroups(deviceDown).forEach(mcastIp -> {
+                // TODO Optimize when the group editing is in place
+                log.debug("Processing device down {} for group {}",
+                          deviceDown, mcastIp);
 
-            // Find out the ingress, transit and egress device of affected group
-            DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
-                    .stream().findAny().orElse(null);
-            DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
-                    .stream().findAny().orElse(null);
-            Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
-            ConnectPoint source = getSource(mcastIp);
+                // Find out the ingress, transit and egress device of affected group
+                DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+                        .stream().findAny().orElse(null);
+                DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
+                        .stream().findAny().orElse(null);
+                Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
+                ConnectPoint source = getSource(mcastIp);
 
-            // Do not proceed if ingress device or source of this group are missing
-            // If sinks are in other leafs, we have ingress, transit, egress, and source
-            // If sinks are in the same leaf, we have just ingress and source
-            if (ingressDevice == null || source == null) {
-                log.warn("Missing ingress {} or source {} for group {}",
-                         ingressDevice, source, mcastIp);
-                return;
-            }
-
-            // Continue only when we have the mastership on the operation
-            if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-                // When the source is available we just check the mastership
-                if (srManager.deviceService.isAvailable(source.deviceId())) {
-                    log.info("Skip {} due to lack of mastership of the source device {}",
-                             mcastIp, source.deviceId());
+                // Do not proceed if ingress device or source of this group are missing
+                // If sinks are in other leafs, we have ingress, transit, egress, and source
+                // If sinks are in the same leaf, we have just ingress and source
+                if (ingressDevice == null || source == null) {
+                    log.warn("Missing ingress {} or source {} for group {}",
+                             ingressDevice, source, mcastIp);
                     return;
                 }
-                // Fallback with Leadership service
-                // source id is used a topic
-                NodeId leader = srManager.leadershipService.runForLeadership(
-                        source.deviceId().toString()).leaderNodeId();
-                // Verify if this node is the leader
-                if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
-                    log.info("Skip {} due to lack of leadership on the topic {}",
-                             mcastIp, source.deviceId());
-                    return;
-                }
-            }
 
-            // If it exists, we have to remove it in any case
-            if (transitDevice != null) {
-                // Remove entire transit
-                removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
-            }
-            // If the ingress is down
-            if (ingressDevice.equals(deviceDown)) {
-                // Remove entire ingress
-                removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
-                // If other sinks different from the ingress exist
-                if (!egressDevices.isEmpty()) {
-                    // Remove all the remaining egress
-                    egressDevices.forEach(
-                            egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
-                    );
-                }
-            } else {
-                // Egress or transit could be down at this point
-                // Get the ingress-transit port if it exists
-                PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
-                if (ingressTransitPort != null) {
-                    // Remove transit-facing port on ingress device
-                    removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
-                }
-                // One of the egress device is down
-                if (egressDevices.contains(deviceDown)) {
-                    // Remove entire device down
-                    removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
-                    // Remove the device down from egress
-                    egressDevices.remove(deviceDown);
-                    // If there are no more egress and ingress does not have sinks
-                    if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
-                        // Remove entire ingress
-                        mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
-                        // We have done
+                // Continue only when we have the mastership on the operation
+                if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+                    // When the source is available we just check the mastership
+                    if (srManager.deviceService.isAvailable(source.deviceId())) {
+                        log.debug("Skip {} due to lack of mastership of the source device {}",
+                                 mcastIp, source.deviceId());
+                        return;
+                    }
+                    // Fallback with Leadership service
+                    // source id is used a topic
+                    NodeId leader = srManager.leadershipService.runForLeadership(
+                            source.deviceId().toString()).leaderNodeId();
+                    // Verify if this node is the leader
+                    if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
+                        log.debug("Skip {} due to lack of leadership on the topic {}",
+                                 mcastIp, source.deviceId());
                         return;
                     }
                 }
-                // Construct a new path for each egress device
-                egressDevices.forEach(egressDevice -> {
-                    Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
-                    // If there is a new path
-                    if (mcastPath.isPresent()) {
-                        // Let's install the new mcast path for this egress
-                        installPath(mcastIp, source, mcastPath.get());
-                    } else {
-                        // We were not able to find an alternative path for this egress
-                        log.warn("Fail to recover egress device {} from device down {}",
-                                 egressDevice, deviceDown);
-                        removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+
+                // If it exists, we have to remove it in any case
+                if (transitDevice != null) {
+                    // Remove entire transit
+                    removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
+                }
+                // If the ingress is down
+                if (ingressDevice.equals(deviceDown)) {
+                    // Remove entire ingress
+                    removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
+                    // If other sinks different from the ingress exist
+                    if (!egressDevices.isEmpty()) {
+                        // Remove all the remaining egress
+                        egressDevices.forEach(
+                                egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
+                        );
                     }
-                });
-            }
-        });
+                } else {
+                    // Egress or transit could be down at this point
+                    // Get the ingress-transit port if it exists
+                    PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
+                    if (ingressTransitPort != null) {
+                        // Remove transit-facing port on ingress device
+                        removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
+                    }
+                    // One of the egress device is down
+                    if (egressDevices.contains(deviceDown)) {
+                        // Remove entire device down
+                        removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
+                        // Remove the device down from egress
+                        egressDevices.remove(deviceDown);
+                        // If there are no more egress and ingress does not have sinks
+                        if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
+                            // Remove entire ingress
+                            mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
+                            // We have done
+                            return;
+                        }
+                    }
+                    // Construct a new path for each egress device
+                    egressDevices.forEach(egressDevice -> {
+                        Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+                        // If there is a new path
+                        if (mcastPath.isPresent()) {
+                            // Let's install the new mcast path for this egress
+                            installPath(mcastIp, source, mcastPath.get());
+                        } else {
+                            // We were not able to find an alternative path for this egress
+                            log.warn("Fail to recover egress device {} from device down {}",
+                                     egressDevice, deviceDown);
+                            removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+                        }
+                    });
+                }
+            });
+        } finally {
+            mcastUnlock();
+        }
     }
 
     /**
@@ -1056,7 +1158,9 @@
     }
 
     /**
-     * Adds or removes filtering objective for given device and port.
+     * Updates filtering objective for given device and port.
+     * It is called in general when the mcast config has been
+     * changed.
      *
      * @param deviceId device ID
      * @param portNum ingress port number
@@ -1065,15 +1169,118 @@
      */
     protected void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
                                         VlanId vlanId, boolean install) {
-        srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
-            ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
-            if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
-                if (install) {
-                    addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
-                } else {
-                    removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            // Iterates over the route and updates properly the filtering objective
+            // on the source device.
+            srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+                ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
+                if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
+                    if (install) {
+                        addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
+                    } else {
+                        removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
+                    }
                 }
+            });
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    /**
+     * Performs bucket verification operation for all mcast groups in the devices.
+     * Firstly, it verifies that mcast is stable before trying verification operation.
+     * Verification consists in creating new nexts with VERIFY operation. Actually,
+     * the operation is totally delegated to the driver.
+     */
+     private final class McastBucketCorrector implements Runnable {
+
+        @Override
+        public void run() {
+            // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
+            if (!isMcastStable()) {
+                return;
             }
-        });
+            // Acquires lock
+            mcastLock();
+            try {
+                // Iterates over the routes and verify the related next objectives
+                srManager.multicastRouteService.getRoutes()
+                    .stream()
+                    .map(McastRoute::group)
+                    .forEach(mcastIp -> {
+                        log.trace("Running mcast buckets corrector for mcast group: {}",
+                                  mcastIp);
+
+                        // For each group we get current information in the store
+                        // and issue a check of the next objectives in place
+                        DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+                                .stream().findAny().orElse(null);
+                        DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
+                                .stream().findAny().orElse(null);
+                        Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
+                        ConnectPoint source = getSource(mcastIp);
+
+                        // Do not proceed if ingress device or source of this group are missing
+                        if (ingressDevice == null || source == null) {
+                            log.warn("Unable to run buckets corrector. " +
+                                             "Missing ingress {} or source {} for group {}",
+                                     ingressDevice, source, mcastIp);
+                            return;
+                        }
+
+                        // Continue only when this instance is the master of source device
+                        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+                            log.trace("Unable to run buckets corrector. " +
+                                             "Skip {} due to lack of mastership " +
+                                             "of the source device {}",
+                                     mcastIp, source.deviceId());
+                            return;
+                        }
+
+                        // Create the set of the devices to be processed
+                        ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
+                        devicesBuilder.add(ingressDevice);
+                        if (transitDevice != null) {
+                            devicesBuilder.add(transitDevice);
+                        }
+                        if (!egressDevices.isEmpty()) {
+                            devicesBuilder.addAll(egressDevices);
+                        }
+                        Set<DeviceId> devicesToProcess = devicesBuilder.build();
+
+                        // Iterate over the devices
+                        devicesToProcess.forEach(deviceId -> {
+                            McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
+                            // If next exists in our store verify related next objective
+                            if (mcastNextObjStore.containsKey(currentKey)) {
+                                NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
+                                // Get current ports
+                                Set<PortNumber> currentPorts = getPorts(currentNext.next());
+                                // Rebuild the next objective
+                                currentNext = nextObjBuilder(
+                                        mcastIp,
+                                        assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
+                                        currentPorts,
+                                        currentNext.id()
+                                ).verify();
+                                // Send to the flowobjective service
+                                srManager.flowObjectiveService.next(deviceId, currentNext);
+                            } else {
+                                log.warn("Unable to run buckets corrector." +
+                                                 "Missing next for {} and group {}",
+                                         deviceId, mcastIp);
+                            }
+                        });
+
+                    });
+            } finally {
+                // Finally, it releases the lock
+                mcastUnlock();
+            }
+
+        }
     }
 }
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index c3980b2..2b607af 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -490,6 +490,8 @@
         portNextObjStore.destroy();
         tunnelStore.destroy();
         policyStore.destroy();
+
+        mcastHandler.terminate();
         log.info("Stopped");
     }
 
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index a3670e6..e41ca4c 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -1588,8 +1588,8 @@
      *             modified to match the given next objective
      */
     protected void verifyGroup(NextObjective nextObjective, NextGroup next) {
-        if (nextObjective.type() != NextObjective.Type.HASHED) {
-            log.warn("verification not supported for {} group", nextObjective.type());
+        if (nextObjective.type() == NextObjective.Type.SIMPLE) {
+            log.warn("verification not supported for indirect group");
             fail(nextObjective, ObjectiveError.UNSUPPORTED);
             return;
         }
@@ -1640,17 +1640,25 @@
             indicesToRemove.addAll(otherIndices);
         }
 
+        log.debug("Buckets to create {}", bucketsToCreate);
+        log.debug("Indices to remove {}", indicesToRemove);
+
         if (!bucketsToCreate.isEmpty()) {
             log.info("creating {} buckets as part of nextId: {} verification",
                      bucketsToCreate.size(), nextObjective.id());
             //create a nextObjective only with these buckets
             NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
                     .withId(nextObjective.id())
-                    .withType(NextObjective.Type.HASHED)
+                    .withType(nextObjective.type())
                     .withMeta(nextObjective.meta())
                     .fromApp(nextObjective.appId());
-            bucketsToCreate.forEach(bucket -> nextObjBuilder.addTreatment(bucket));
-            addBucketToHashGroup(nextObjBuilder.addToExisting(), allActiveKeys);
+            bucketsToCreate.forEach(nextObjBuilder::addTreatment);
+            // According to the next type we call the proper add function
+            if (nextObjective.type() == NextObjective.Type.HASHED) {
+                addBucketToHashGroup(nextObjBuilder.addToExisting(), allActiveKeys);
+            } else {
+                addBucketToBroadcastGroup(nextObjBuilder.addToExisting(), allActiveKeys);
+            }
         }
 
         if (!indicesToRemove.isEmpty()) {
@@ -1667,9 +1675,9 @@
             // Nevertheless groupStore may not be in sync due to bug in the store
             // - see CORD-1844. XXX When this bug is fixed, the rest of this verify
             // method will not be required.
-            GroupKey hashGroupKey = allActiveKeys.get(0).peekFirst();
-            Group hashGroup = groupService.getGroup(deviceId, hashGroupKey);
-            int actualGroupSize = hashGroup.buckets().buckets().size();
+            GroupKey topGroupKey = allActiveKeys.get(0).peekFirst();
+            Group topGroup = groupService.getGroup(deviceId, topGroupKey);
+            int actualGroupSize = topGroup.buckets().buckets().size();
             int objGroupSize = nextObjective.next().size();
             if (actualGroupSize != objGroupSize) {
                 log.warn("Mismatch detected in device:{}, nextId:{}, nextObjective-size"
@@ -1677,9 +1685,10 @@
                         objGroupSize, actualGroupSize);
             }
             if (actualGroupSize > objGroupSize) {
+                // Group in the device has more chains
                 List<GroupBucket> bucketsToRemove = Lists.newArrayList();
                 //check every bucket in the actual group
-                for (GroupBucket bucket : hashGroup.buckets().buckets()) {
+                for (GroupBucket bucket : topGroup.buckets().buckets()) {
                     GroupInstruction g = (GroupInstruction) bucket.treatment()
                                             .allInstructions().iterator().next();
                     GroupId gidToCheck = g.groupId(); // the group pointed to
@@ -1707,11 +1716,12 @@
                             + "buckets to remove");
                 } else {
                     GroupBuckets removeBuckets = new GroupBuckets(bucketsToRemove);
-                    groupService.removeBucketsFromGroup(deviceId, hashGroupKey,
-                                                        removeBuckets, hashGroupKey,
+                    groupService.removeBucketsFromGroup(deviceId, topGroupKey,
+                                                        removeBuckets, topGroupKey,
                                                         nextObjective.appId());
                 }
             } else if (actualGroupSize < objGroupSize) {
+                // Group in the device has less chains
                 // should also add buckets not in group-store but in obj-store
                 List<GroupBucket> bucketsToAdd = Lists.newArrayList();
                 //check every bucket in the obj
@@ -1727,7 +1737,7 @@
                         continue;
                     }
                     boolean matches = false;
-                    for (GroupBucket bucket : hashGroup.buckets().buckets()) {
+                    for (GroupBucket bucket : topGroup.buckets().buckets()) {
                         GroupInstruction g = (GroupInstruction) bucket.treatment()
                                                 .allInstructions().iterator().next();
                         GroupId gidToCheck = g.groupId(); // the group pointed to
@@ -1741,7 +1751,12 @@
                         TrafficTreatment t = DefaultTrafficTreatment.builder()
                                                 .group(pointedGroup.id())
                                                 .build();
-                        bucketsToAdd.add(DefaultGroupBucket.createSelectGroupBucket(t));
+                        // Create the proper bucket according to the next type
+                        if (nextObjective.type() == NextObjective.Type.HASHED) {
+                            bucketsToAdd.add(DefaultGroupBucket.createSelectGroupBucket(t));
+                        } else {
+                            bucketsToAdd.add(DefaultGroupBucket.createAllGroupBucket(t));
+                        }
                     }
                 }
                 if (bucketsToAdd.isEmpty()) {
@@ -1749,8 +1764,8 @@
                             + "buckets to add");
                 } else {
                     GroupBuckets addBuckets = new GroupBuckets(bucketsToAdd);
-                    groupService.addBucketsToGroup(deviceId, hashGroupKey,
-                                                   addBuckets, hashGroupKey,
+                    groupService.addBucketsToGroup(deviceId, topGroupKey,
+                                                   addBuckets, topGroupKey,
                                                    nextObjective.appId());
                 }
             }