[CORD-2937] Improve work partition on Multicast

Change-Id: Ia8761245e7f199721c1228bfd500e0392a20de05
(cherry picked from commit 901851ef9f2a53d6fdd08d0cc1232b125f1e35bf)
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 1e322f6..eff78e5 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -39,7 +39,7 @@
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.Event;
@@ -75,6 +75,7 @@
 import org.onosproject.net.host.HostLocationProbingService;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.host.InterfaceIpAddress;
+import org.onosproject.net.intent.WorkPartitionService;
 import org.onosproject.net.intf.Interface;
 import org.onosproject.net.intf.InterfaceService;
 import org.onosproject.net.link.LinkEvent;
@@ -212,7 +213,7 @@
     public ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    public LeadershipService leadershipService;
+    public WorkPartitionService workPartitionService;
 
     @Property(name = "activeProbing", boolValue = true,
             label = "Enable active probing to discover dual-homed hosts.")
@@ -673,6 +674,11 @@
         return mcastHandler.getMcastPaths(mcastIp);
     }
 
+    @Override
+    public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+        return mcastHandler.getMcastLeaders(mcastIp);
+    }
+
     /**
      * Extracts the application ID from the manager.
      *
@@ -1491,10 +1497,10 @@
                 case SINKS_ADDED:
                 case SINKS_REMOVED:
                 case ROUTE_REMOVED:
+                case ROUTE_ADDED:
                     log.trace("Schedule Mcast event {}", event);
                     mcastEventExecutor.execute(new InternalEventHandler(event));
                     break;
-                case ROUTE_ADDED:
                 default:
                     log.warn("Unsupported mcast event type: {}", event.type());
                     break;
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
index b688726..aabed47 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
@@ -17,6 +17,7 @@
 
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Link;
@@ -247,4 +248,11 @@
      */
     Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp);
 
+    /**
+     * Return the leaders of the mcast groups.
+     *
+     * @param mcastIp the group ip
+     * @return the mapping group-node
+     */
+    Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp);
 }
diff --git a/app/src/main/java/org/onosproject/segmentrouting/cli/McastLeaderListCommand.java b/app/src/main/java/org/onosproject/segmentrouting/cli/McastLeaderListCommand.java
new file mode 100644
index 0000000..33a9fc9
--- /dev/null
+++ b/app/src/main/java/org/onosproject/segmentrouting/cli/McastLeaderListCommand.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.cli;
+
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mcast.cli.McastGroupCompleter;
+import org.onosproject.segmentrouting.SegmentRoutingService;
+
+import java.util.Map;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+/**
+ * Command to show the mcast leaders of the groups.
+ */
+@Command(scope = "onos", name = "sr-mcast-leader",
+        description = "Lists all mcast leaders")
+public class McastLeaderListCommand extends AbstractShellCommand {
+
+    // OSGi workaround to introduce package dependency
+    McastGroupCompleter completer;
+
+    // Format for group line
+    private static final String G_FORMAT_MAPPING = "group=%s, leader=%s";
+
+    @Option(name = "-gAddr", aliases = "--groupAddress",
+            description = "IP Address of the multicast group",
+            valueToShowInHelp = "224.0.0.0",
+            required = false, multiValued = false)
+    String gAddr = null;
+
+    @Override
+    protected void execute() {
+        // Verify mcast group
+        IpAddress mcastGroup = null;
+        if (!isNullOrEmpty(gAddr)) {
+            mcastGroup = IpAddress.valueOf(gAddr);
+        }
+        // Get SR service
+        SegmentRoutingService srService = get(SegmentRoutingService.class);
+        // Get the mapping
+        Map<IpAddress, NodeId> keyToRole = srService.getMcastLeaders(mcastGroup);
+        // And print local cache
+        keyToRole.forEach(this::printMcastLeder);
+    }
+
+    private void printMcastLeder(IpAddress mcastGroup,
+                                 NodeId nodeId) {
+        print(G_FORMAT_MAPPING, mcastGroup, nodeId);
+    }
+
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 2c82cf5..9a53e13 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -28,6 +28,7 @@
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mcast.api.McastEvent;
@@ -75,11 +76,13 @@
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 
-import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
-import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
-import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
-import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
 import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
+
 import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
 import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
 import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
@@ -93,19 +96,15 @@
     // Reference to srManager and most used internal objects
     private final SegmentRoutingManager srManager;
     private final TopologyService topologyService;
+    private final McastUtils mcastUtils;
     // Internal store of the Mcast nextobjectives
     private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
     // Internal store of the Mcast roles
     private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
-    // McastUtils
-    private final McastUtils mcastUtils;
 
     // Wait time for the cache
     private static final int WAIT_TIME_MS = 1000;
 
-    // Wait time for the removal of the old location
-    private static final int HOST_MOVED_DELAY_MS = 1000;
-
     /**
      * The mcastEventCache is implemented to avoid race condition by giving more time to the
      * underlying subsystems to process previous calls.
@@ -313,6 +312,11 @@
         mcastLock();
         try {
             srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+                // Verify leadership on the operation
+                if (!mcastUtils.isLeader(mcastRoute.group())) {
+                    log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+                    return;
+                }
                 // FIXME To be addressed with multiple sources support
                 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
                         .stream()
@@ -356,56 +360,29 @@
      */
     public void processMcastEvent(McastEvent event) {
         log.info("process {}", event);
-        // Just enqueue for now
-        enqueueMcastEvent(event);
+        // If it is a route added, we do not enqueue
+        if (event.type() == ROUTE_ADDED) {
+            // We need just to elect a leader
+            processRouteAddedInternal(event.subject().route().group());
+        } else {
+            // Just enqueue for now
+            enqueueMcastEvent(event);
+        }
     }
 
+
     /**
-     * Process the SOURCE_UPDATED event.
+     * Process the ROUTE_ADDED event.
      *
-     * @param newSource the updated srouce info
-     * @param oldSource the outdated source info
+     * @param mcastIp the group address
      */
-    private void processSourceUpdatedInternal(IpAddress mcastIp,
-                                              ConnectPoint newSource,
-                                              ConnectPoint oldSource) {
+    private void processRouteAddedInternal(IpAddress mcastIp) {
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            log.debug("Processing source updated for group {}", mcastIp);
-
-            // Build key for the store and retrieve old data
-            McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
-
-            // Verify leadership on the operation
-            if (!mcastUtils.isLeader(oldSource)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-
-            // This device is not serving this multicast group
-            if (!mcastRoleStore.containsKey(mcastStoreKey) ||
-                    !mcastNextObjStore.containsKey(mcastStoreKey)) {
-                log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
-                return;
-            }
-            NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
-            Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
-
-            // This an optimization to avoid unnecessary removal and add
-            if (!mcastUtils.assignedVlanFromNext(nextObjective)
-                    .equals(mcastUtils.assignedVlan(newSource))) {
-                // Let's remove old flows and groups
-                removeGroupFromDevice(oldSource.deviceId(), mcastIp, mcastUtils.assignedVlan(oldSource));
-                // Push new flows and group
-                outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
-                                                                  mcastIp, mcastUtils.assignedVlan(newSource)));
-            }
-            mcastUtils.addFilterToDevice(newSource.deviceId(), newSource.port(),
-                              mcastUtils.assignedVlan(newSource), mcastIp, INGRESS);
-            // Setup mcast roles
-            mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
-                               INGRESS);
+            log.debug("Processing route added for group {}", mcastIp);
+            // Just elect a new leader
+            mcastUtils.isLeader(mcastIp);
         } finally {
             mcastUnlock();
         }
@@ -421,6 +398,12 @@
         mcastLock();
         try {
             log.debug("Processing route removed for group {}", mcastIp);
+            // Verify leadership on the operation
+            if (!mcastUtils.isLeader(mcastIp)) {
+                log.debug("Skip {} due to lack of leadership", mcastIp);
+                mcastUtils.withdrawLeader(mcastIp);
+                return;
+            }
 
             // Find out the ingress, transit and egress device of the affected group
             DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -428,12 +411,6 @@
             Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
             Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
 
-            // Verify leadership on the operation
-            if (!mcastUtils.isLeader(source)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-
             // If there are no egress devices, sinks could be only on the ingress
             if (!egressDevices.isEmpty()) {
                 egressDevices.forEach(
@@ -470,6 +447,11 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
+            // Verify leadership on the operation
+            if (!mcastUtils.isLeader(mcastIp)) {
+                log.debug("Skip {} due to lack of leadership", mcastIp);
+                return;
+            }
             // Remove the previous ones
             Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
                                                                          newSinks);
@@ -495,12 +477,6 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            // Verify leadership on the operation
-            if (!mcastUtils.isLeader(source)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-
             boolean isLast;
             // When source and sink are on the same device
             if (source.deviceId().equals(sink.deviceId())) {
@@ -564,6 +540,11 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
+            // Verify leadership on the operation
+            if (!mcastUtils.isLeader(mcastIp)) {
+                log.debug("Skip {} due to lack of leadership", mcastIp);
+                return;
+            }
             // Get the only sinks to be processed (new ones)
             Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
             // Install new sinks
@@ -586,13 +567,6 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            // Continue only when this instance is the master of source device
-            if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-                log.debug("Skip {} due to lack of mastership of the source device {}",
-                          mcastIp, source.deviceId());
-                return;
-            }
-
             // Process the ingress device
             mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
                               mcastUtils.assignedVlan(source), mcastIp, INGRESS);
@@ -663,6 +637,11 @@
                 // TODO Optimize when the group editing is in place
                 log.debug("Processing link down {} for group {}",
                           affectedLink, mcastIp);
+                // Verify leadership on the operation
+                if (!mcastUtils.isLeader(mcastIp)) {
+                    log.debug("Skip {} due to lack of leadership", mcastIp);
+                    return;
+                }
 
                 // Find out the ingress, transit and egress device of affected group
                 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -680,13 +659,6 @@
                     return;
                 }
 
-                // Continue only when this instance is the master of source device
-                if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-                    log.debug("Skip {} due to lack of mastership of the source device {}",
-                             mcastIp, source.deviceId());
-                    return;
-                }
-
                 // Remove entire transit
                 transitDevices.forEach(transitDevice ->
                                 removeGroupFromDevice(transitDevice, mcastIp,
@@ -753,6 +725,11 @@
                 // TODO Optimize when the group editing is in place
                 log.debug("Processing device down {} for group {}",
                           deviceDown, mcastIp);
+                // Verify leadership on the operation
+                if (!mcastUtils.isLeader(mcastIp)) {
+                    log.debug("Skip {} due to lack of leadership", mcastIp);
+                    return;
+                }
 
                 // Find out the ingress, transit and egress device of affected group
                 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -770,12 +747,6 @@
                     return;
                 }
 
-                // Verify leadership on the operation
-                if (!mcastUtils.isLeader(source)) {
-                    log.debug("Skip {} due to lack of leadership", mcastIp);
-                    return;
-                }
-
                 // If it exists, we have to remove it in any case
                 if (!transitDevices.isEmpty()) {
                     // Remove entire transit
@@ -1660,6 +1631,12 @@
             // Iterates over the route and updates properly the filtering objective
             // on the source device.
             srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+                log.debug("Update filter for {}", mcastRoute.group());
+                // Verify leadership on the operation
+                if (!mcastUtils.isLeader(mcastRoute.group())) {
+                    log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+                    return;
+                }
                 // FIXME To be addressed with multiple sources support
                 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
                         .stream()
@@ -1724,12 +1701,10 @@
                             return;
                         }
 
-                        // Continue only when this instance is the master of source device
-                        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+                        // Continue only when this instance is the leader of the group
+                        if (!mcastUtils.isLeader(mcastIp)) {
                             log.trace("Unable to run buckets corrector. " +
-                                             "Skip {} due to lack of mastership " +
-                                             "of the source device {}",
-                                     mcastIp, source.deviceId());
+                                             "Skip {} due to lack of leadership", mcastIp);
                             return;
                         }
 
@@ -1876,4 +1851,13 @@
         }
     }
 
+    /**
+     * Return the leaders of the mcast groups.
+     *
+     * @param mcastIp the group ip
+     * @return the mapping group-node
+     */
+    public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+        return mcastUtils.getMcastLeaders(mcastIp);
+    }
 }
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index 37f273b..924794b 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -16,7 +16,11 @@
 
 package org.onosproject.segmentrouting.mcast;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
@@ -70,6 +74,10 @@
     private SegmentRoutingManager srManager;
     // Internal reference to the app id
     private ApplicationId coreAppId;
+    // Hashing function for the multicast hasher
+    private static final HashFunction HASH_FN = Hashing.md5();
+    // Read only cache of the Mcast leader
+    private Map<IpAddress, NodeId> mcastLeaderCache;
 
     /**
      * Builds a new McastUtils object.
@@ -82,32 +90,7 @@
         this.srManager = srManager;
         this.coreAppId = coreAppId;
         this.log = log;
-    }
-
-    /**
-     * Given a connect point define a leader for it.
-     *
-     * @param source the source connect point
-     * @return true if this instance is the leader, otherwise false
-     */
-    boolean isLeader(ConnectPoint source) {
-        // Continue only when we have the mastership on the operation
-        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-            // When the source is available we just check the mastership
-            if (srManager.deviceService.isAvailable(source.deviceId())) {
-                return false;
-            }
-            // Fallback with Leadership service
-            // source id is used a topic
-            NodeId leader = srManager.leadershipService.runForLeadership(
-                    source.deviceId().toString()).leaderNodeId();
-            // Verify if this node is the leader
-            if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
-                return false;
-            }
-        }
-        // Done
-        return true;
+        this.mcastLeaderCache = Maps.newConcurrentMap();
     }
 
     /**
@@ -425,4 +408,60 @@
                     .forEach(instr -> builder.add(((Instructions.OutputInstruction) instr).port())));
         return builder.build();
     }
+
+    /**
+     * Returns the hash of the group address.
+     *
+     * @param ipAddress the ip address
+     * @return the hash of the address
+     */
+    private Long hasher(IpAddress ipAddress) {
+        return HASH_FN.newHasher()
+                .putBytes(ipAddress.toOctets())
+                .hash()
+                .asLong();
+    }
+
+    /**
+     * Given a multicast group define a leader for it.
+     *
+     * @param mcastIp the group address
+     * @return true if the instance is the leader of the group
+     */
+    boolean isLeader(IpAddress mcastIp) {
+        // Get our id
+        final NodeId currentNodeId = srManager.clusterService.getLocalNode().id();
+        // Get the leader for this group using the ip address as key
+        final NodeId leader = srManager.workPartitionService.getLeader(mcastIp, this::hasher);
+        // If there is not a leader, let's send an error
+        if (leader == null) {
+            log.error("Fail to elect a leader for {}.", mcastIp);
+            return false;
+        }
+        // Update cache and return operation result
+        mcastLeaderCache.put(mcastIp, leader);
+        return currentNodeId.equals(leader);
+    }
+
+    /**
+     * Given a multicast group withdraw its leader.
+     *
+     * @param mcastIp the group address
+     */
+    void withdrawLeader(IpAddress mcastIp) {
+        // For now just update the cache
+        mcastLeaderCache.remove(mcastIp);
+    }
+
+    Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+        // If mcast ip is present
+        if (mcastIp != null) {
+            return mcastLeaderCache.entrySet().stream()
+                    .filter(entry -> entry.getKey().equals(mcastIp))
+                    .collect(Collectors.toMap(Map.Entry::getKey,
+                                              Map.Entry::getValue));
+        }
+        // Otherwise take all the groups
+        return ImmutableMap.copyOf(mcastLeaderCache);
+    }
 }
diff --git a/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 647eb49..4a3329a 100644
--- a/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -81,6 +81,12 @@
                 <entry key="-gAddr" value-ref="mcastGroupCompleter"/>
             </optional-completers>
         </command>
+        <command>
+            <action class="org.onosproject.segmentrouting.cli.McastLeaderListCommand"/>
+            <optional-completers>
+                <entry key="-gAddr" value-ref="mcastGroupCompleter"/>
+            </optional-completers>
+        </command>
     </command-bundle>
 
     <bean id="nullCompleter" class="org.apache.karaf.shell.console.completer.NullCompleter"/>