[CORD-2937] Improve work partition on Multicast

Change-Id: Ia8761245e7f199721c1228bfd500e0392a20de05
(cherry picked from commit 901851ef9f2a53d6fdd08d0cc1232b125f1e35bf)
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 2c82cf5..9a53e13 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -28,6 +28,7 @@
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mcast.api.McastEvent;
@@ -75,11 +76,13 @@
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 
-import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
-import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
-import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
-import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
 import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
+
 import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
 import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
 import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
@@ -93,19 +96,15 @@
     // Reference to srManager and most used internal objects
     private final SegmentRoutingManager srManager;
     private final TopologyService topologyService;
+    private final McastUtils mcastUtils;
     // Internal store of the Mcast nextobjectives
     private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
     // Internal store of the Mcast roles
     private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
-    // McastUtils
-    private final McastUtils mcastUtils;
 
     // Wait time for the cache
     private static final int WAIT_TIME_MS = 1000;
 
-    // Wait time for the removal of the old location
-    private static final int HOST_MOVED_DELAY_MS = 1000;
-
     /**
      * The mcastEventCache is implemented to avoid race condition by giving more time to the
      * underlying subsystems to process previous calls.
@@ -313,6 +312,11 @@
         mcastLock();
         try {
             srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+                // Verify leadership on the operation
+                if (!mcastUtils.isLeader(mcastRoute.group())) {
+                    log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+                    return;
+                }
                 // FIXME To be addressed with multiple sources support
                 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
                         .stream()
@@ -356,56 +360,29 @@
      */
     public void processMcastEvent(McastEvent event) {
         log.info("process {}", event);
-        // Just enqueue for now
-        enqueueMcastEvent(event);
+        // If it is a route added, we do not enqueue
+        if (event.type() == ROUTE_ADDED) {
+            // We need just to elect a leader
+            processRouteAddedInternal(event.subject().route().group());
+        } else {
+            // Just enqueue for now
+            enqueueMcastEvent(event);
+        }
     }
 
+
     /**
-     * Process the SOURCE_UPDATED event.
+     * Process the ROUTE_ADDED event.
      *
-     * @param newSource the updated srouce info
-     * @param oldSource the outdated source info
+     * @param mcastIp the group address
      */
-    private void processSourceUpdatedInternal(IpAddress mcastIp,
-                                              ConnectPoint newSource,
-                                              ConnectPoint oldSource) {
+    private void processRouteAddedInternal(IpAddress mcastIp) {
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            log.debug("Processing source updated for group {}", mcastIp);
-
-            // Build key for the store and retrieve old data
-            McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
-
-            // Verify leadership on the operation
-            if (!mcastUtils.isLeader(oldSource)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-
-            // This device is not serving this multicast group
-            if (!mcastRoleStore.containsKey(mcastStoreKey) ||
-                    !mcastNextObjStore.containsKey(mcastStoreKey)) {
-                log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
-                return;
-            }
-            NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
-            Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
-
-            // This an optimization to avoid unnecessary removal and add
-            if (!mcastUtils.assignedVlanFromNext(nextObjective)
-                    .equals(mcastUtils.assignedVlan(newSource))) {
-                // Let's remove old flows and groups
-                removeGroupFromDevice(oldSource.deviceId(), mcastIp, mcastUtils.assignedVlan(oldSource));
-                // Push new flows and group
-                outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
-                                                                  mcastIp, mcastUtils.assignedVlan(newSource)));
-            }
-            mcastUtils.addFilterToDevice(newSource.deviceId(), newSource.port(),
-                              mcastUtils.assignedVlan(newSource), mcastIp, INGRESS);
-            // Setup mcast roles
-            mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
-                               INGRESS);
+            log.debug("Processing route added for group {}", mcastIp);
+            // Just elect a new leader
+            mcastUtils.isLeader(mcastIp);
         } finally {
             mcastUnlock();
         }
@@ -421,6 +398,12 @@
         mcastLock();
         try {
             log.debug("Processing route removed for group {}", mcastIp);
+            // Verify leadership on the operation
+            if (!mcastUtils.isLeader(mcastIp)) {
+                log.debug("Skip {} due to lack of leadership", mcastIp);
+                mcastUtils.withdrawLeader(mcastIp);
+                return;
+            }
 
             // Find out the ingress, transit and egress device of the affected group
             DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -428,12 +411,6 @@
             Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
             Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
 
-            // Verify leadership on the operation
-            if (!mcastUtils.isLeader(source)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-
             // If there are no egress devices, sinks could be only on the ingress
             if (!egressDevices.isEmpty()) {
                 egressDevices.forEach(
@@ -470,6 +447,11 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
+            // Verify leadership on the operation
+            if (!mcastUtils.isLeader(mcastIp)) {
+                log.debug("Skip {} due to lack of leadership", mcastIp);
+                return;
+            }
             // Remove the previous ones
             Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
                                                                          newSinks);
@@ -495,12 +477,6 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            // Verify leadership on the operation
-            if (!mcastUtils.isLeader(source)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-
             boolean isLast;
             // When source and sink are on the same device
             if (source.deviceId().equals(sink.deviceId())) {
@@ -564,6 +540,11 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
+            // Verify leadership on the operation
+            if (!mcastUtils.isLeader(mcastIp)) {
+                log.debug("Skip {} due to lack of leadership", mcastIp);
+                return;
+            }
             // Get the only sinks to be processed (new ones)
             Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
             // Install new sinks
@@ -586,13 +567,6 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            // Continue only when this instance is the master of source device
-            if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-                log.debug("Skip {} due to lack of mastership of the source device {}",
-                          mcastIp, source.deviceId());
-                return;
-            }
-
             // Process the ingress device
             mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
                               mcastUtils.assignedVlan(source), mcastIp, INGRESS);
@@ -663,6 +637,11 @@
                 // TODO Optimize when the group editing is in place
                 log.debug("Processing link down {} for group {}",
                           affectedLink, mcastIp);
+                // Verify leadership on the operation
+                if (!mcastUtils.isLeader(mcastIp)) {
+                    log.debug("Skip {} due to lack of leadership", mcastIp);
+                    return;
+                }
 
                 // Find out the ingress, transit and egress device of affected group
                 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -680,13 +659,6 @@
                     return;
                 }
 
-                // Continue only when this instance is the master of source device
-                if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-                    log.debug("Skip {} due to lack of mastership of the source device {}",
-                             mcastIp, source.deviceId());
-                    return;
-                }
-
                 // Remove entire transit
                 transitDevices.forEach(transitDevice ->
                                 removeGroupFromDevice(transitDevice, mcastIp,
@@ -753,6 +725,11 @@
                 // TODO Optimize when the group editing is in place
                 log.debug("Processing device down {} for group {}",
                           deviceDown, mcastIp);
+                // Verify leadership on the operation
+                if (!mcastUtils.isLeader(mcastIp)) {
+                    log.debug("Skip {} due to lack of leadership", mcastIp);
+                    return;
+                }
 
                 // Find out the ingress, transit and egress device of affected group
                 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -770,12 +747,6 @@
                     return;
                 }
 
-                // Verify leadership on the operation
-                if (!mcastUtils.isLeader(source)) {
-                    log.debug("Skip {} due to lack of leadership", mcastIp);
-                    return;
-                }
-
                 // If it exists, we have to remove it in any case
                 if (!transitDevices.isEmpty()) {
                     // Remove entire transit
@@ -1660,6 +1631,12 @@
             // Iterates over the route and updates properly the filtering objective
             // on the source device.
             srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+                log.debug("Update filter for {}", mcastRoute.group());
+                // Verify leadership on the operation
+                if (!mcastUtils.isLeader(mcastRoute.group())) {
+                    log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+                    return;
+                }
                 // FIXME To be addressed with multiple sources support
                 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
                         .stream()
@@ -1724,12 +1701,10 @@
                             return;
                         }
 
-                        // Continue only when this instance is the master of source device
-                        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+                        // Continue only when this instance is the leader of the group
+                        if (!mcastUtils.isLeader(mcastIp)) {
                             log.trace("Unable to run buckets corrector. " +
-                                             "Skip {} due to lack of mastership " +
-                                             "of the source device {}",
-                                     mcastIp, source.deviceId());
+                                             "Skip {} due to lack of leadership", mcastIp);
                             return;
                         }
 
@@ -1876,4 +1851,13 @@
         }
     }
 
+    /**
+     * Return the leaders of the mcast groups.
+     *
+     * @param mcastIp the group ip
+     * @return the mapping group-node
+     */
+    public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+        return mcastUtils.getMcastLeaders(mcastIp);
+    }
 }