[CORD-2937] Improve work partition on Multicast
Change-Id: Ia8761245e7f199721c1228bfd500e0392a20de05
(cherry picked from commit 901851ef9f2a53d6fdd08d0cc1232b125f1e35bf)
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index 37f273b..924794b 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -16,7 +16,11 @@
package org.onosproject.segmentrouting.mcast;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
@@ -70,6 +74,10 @@
private SegmentRoutingManager srManager;
// Internal reference to the app id
private ApplicationId coreAppId;
+ // Hashing function for the multicast hasher
+ private static final HashFunction HASH_FN = Hashing.md5();
+ // Read only cache of the Mcast leader
+ private Map<IpAddress, NodeId> mcastLeaderCache;
/**
* Builds a new McastUtils object.
@@ -82,32 +90,7 @@
this.srManager = srManager;
this.coreAppId = coreAppId;
this.log = log;
- }
-
- /**
- * Given a connect point define a leader for it.
- *
- * @param source the source connect point
- * @return true if this instance is the leader, otherwise false
- */
- boolean isLeader(ConnectPoint source) {
- // Continue only when we have the mastership on the operation
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- // When the source is available we just check the mastership
- if (srManager.deviceService.isAvailable(source.deviceId())) {
- return false;
- }
- // Fallback with Leadership service
- // source id is used a topic
- NodeId leader = srManager.leadershipService.runForLeadership(
- source.deviceId().toString()).leaderNodeId();
- // Verify if this node is the leader
- if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
- return false;
- }
- }
- // Done
- return true;
+ this.mcastLeaderCache = Maps.newConcurrentMap();
}
/**
@@ -425,4 +408,60 @@
.forEach(instr -> builder.add(((Instructions.OutputInstruction) instr).port())));
return builder.build();
}
+
+ /**
+ * Returns the hash of the group address.
+ *
+ * @param ipAddress the ip address
+ * @return the hash of the address
+ */
+ private Long hasher(IpAddress ipAddress) {
+ return HASH_FN.newHasher()
+ .putBytes(ipAddress.toOctets())
+ .hash()
+ .asLong();
+ }
+
+ /**
+ * Given a multicast group define a leader for it.
+ *
+ * @param mcastIp the group address
+ * @return true if the instance is the leader of the group
+ */
+ boolean isLeader(IpAddress mcastIp) {
+ // Get our id
+ final NodeId currentNodeId = srManager.clusterService.getLocalNode().id();
+ // Get the leader for this group using the ip address as key
+ final NodeId leader = srManager.workPartitionService.getLeader(mcastIp, this::hasher);
+ // If there is not a leader, let's send an error
+ if (leader == null) {
+ log.error("Fail to elect a leader for {}.", mcastIp);
+ return false;
+ }
+ // Update cache and return operation result
+ mcastLeaderCache.put(mcastIp, leader);
+ return currentNodeId.equals(leader);
+ }
+
+ /**
+ * Given a multicast group withdraw its leader.
+ *
+ * @param mcastIp the group address
+ */
+ void withdrawLeader(IpAddress mcastIp) {
+ // For now just update the cache
+ mcastLeaderCache.remove(mcastIp);
+ }
+
+ Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+ // If mcast ip is present
+ if (mcastIp != null) {
+ return mcastLeaderCache.entrySet().stream()
+ .filter(entry -> entry.getKey().equals(mcastIp))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue));
+ }
+ // Otherwise take all the groups
+ return ImmutableMap.copyOf(mcastLeaderCache);
+ }
}