[CORD-2937] Improve work partition on Multicast

Change-Id: Ia8761245e7f199721c1228bfd500e0392a20de05
(cherry picked from commit 901851ef9f2a53d6fdd08d0cc1232b125f1e35bf)
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index 37f273b..924794b 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -16,7 +16,11 @@
 
 package org.onosproject.segmentrouting.mcast;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
@@ -70,6 +74,10 @@
     private SegmentRoutingManager srManager;
     // Internal reference to the app id
     private ApplicationId coreAppId;
+    // Hashing function for the multicast hasher
+    private static final HashFunction HASH_FN = Hashing.md5();
+    // Read only cache of the Mcast leader
+    private Map<IpAddress, NodeId> mcastLeaderCache;
 
     /**
      * Builds a new McastUtils object.
@@ -82,32 +90,7 @@
         this.srManager = srManager;
         this.coreAppId = coreAppId;
         this.log = log;
-    }
-
-    /**
-     * Given a connect point define a leader for it.
-     *
-     * @param source the source connect point
-     * @return true if this instance is the leader, otherwise false
-     */
-    boolean isLeader(ConnectPoint source) {
-        // Continue only when we have the mastership on the operation
-        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-            // When the source is available we just check the mastership
-            if (srManager.deviceService.isAvailable(source.deviceId())) {
-                return false;
-            }
-            // Fallback with Leadership service
-            // source id is used a topic
-            NodeId leader = srManager.leadershipService.runForLeadership(
-                    source.deviceId().toString()).leaderNodeId();
-            // Verify if this node is the leader
-            if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
-                return false;
-            }
-        }
-        // Done
-        return true;
+        this.mcastLeaderCache = Maps.newConcurrentMap();
     }
 
     /**
@@ -425,4 +408,60 @@
                     .forEach(instr -> builder.add(((Instructions.OutputInstruction) instr).port())));
         return builder.build();
     }
+
+    /**
+     * Returns the hash of the group address.
+     *
+     * @param ipAddress the ip address
+     * @return the hash of the address
+     */
+    private Long hasher(IpAddress ipAddress) {
+        return HASH_FN.newHasher()
+                .putBytes(ipAddress.toOctets())
+                .hash()
+                .asLong();
+    }
+
+    /**
+     * Given a multicast group define a leader for it.
+     *
+     * @param mcastIp the group address
+     * @return true if the instance is the leader of the group
+     */
+    boolean isLeader(IpAddress mcastIp) {
+        // Get our id
+        final NodeId currentNodeId = srManager.clusterService.getLocalNode().id();
+        // Get the leader for this group using the ip address as key
+        final NodeId leader = srManager.workPartitionService.getLeader(mcastIp, this::hasher);
+        // If there is not a leader, let's send an error
+        if (leader == null) {
+            log.error("Fail to elect a leader for {}.", mcastIp);
+            return false;
+        }
+        // Update cache and return operation result
+        mcastLeaderCache.put(mcastIp, leader);
+        return currentNodeId.equals(leader);
+    }
+
+    /**
+     * Given a multicast group withdraw its leader.
+     *
+     * @param mcastIp the group address
+     */
+    void withdrawLeader(IpAddress mcastIp) {
+        // For now just update the cache
+        mcastLeaderCache.remove(mcastIp);
+    }
+
+    Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+        // If mcast ip is present
+        if (mcastIp != null) {
+            return mcastLeaderCache.entrySet().stream()
+                    .filter(entry -> entry.getKey().equals(mcastIp))
+                    .collect(Collectors.toMap(Map.Entry::getKey,
+                                              Map.Entry::getValue));
+        }
+        // Otherwise take all the groups
+        return ImmutableMap.copyOf(mcastLeaderCache);
+    }
 }