[CORD-2937] Improve work partition on Multicast
Change-Id: Ia8761245e7f199721c1228bfd500e0392a20de05
(cherry picked from commit 901851ef9f2a53d6fdd08d0cc1232b125f1e35bf)
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 2c82cf5..9a53e13 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -28,6 +28,7 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mcast.api.McastEvent;
@@ -75,11 +76,13 @@
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
-import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
-import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
-import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
+
import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
@@ -93,19 +96,15 @@
// Reference to srManager and most used internal objects
private final SegmentRoutingManager srManager;
private final TopologyService topologyService;
+ private final McastUtils mcastUtils;
// Internal store of the Mcast nextobjectives
private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
// Internal store of the Mcast roles
private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
- // McastUtils
- private final McastUtils mcastUtils;
// Wait time for the cache
private static final int WAIT_TIME_MS = 1000;
- // Wait time for the removal of the old location
- private static final int HOST_MOVED_DELAY_MS = 1000;
-
/**
* The mcastEventCache is implemented to avoid race condition by giving more time to the
* underlying subsystems to process previous calls.
@@ -313,6 +312,11 @@
mcastLock();
try {
srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastRoute.group())) {
+ log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+ return;
+ }
// FIXME To be addressed with multiple sources support
ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
.stream()
@@ -356,56 +360,29 @@
*/
public void processMcastEvent(McastEvent event) {
log.info("process {}", event);
- // Just enqueue for now
- enqueueMcastEvent(event);
+ // If it is a route added, we do not enqueue
+ if (event.type() == ROUTE_ADDED) {
+ // We need just to elect a leader
+ processRouteAddedInternal(event.subject().route().group());
+ } else {
+ // Just enqueue for now
+ enqueueMcastEvent(event);
+ }
}
+
/**
- * Process the SOURCE_UPDATED event.
+ * Process the ROUTE_ADDED event.
*
- * @param newSource the updated srouce info
- * @param oldSource the outdated source info
+ * @param mcastIp the group address
*/
- private void processSourceUpdatedInternal(IpAddress mcastIp,
- ConnectPoint newSource,
- ConnectPoint oldSource) {
+ private void processRouteAddedInternal(IpAddress mcastIp) {
lastMcastChange = Instant.now();
mcastLock();
try {
- log.debug("Processing source updated for group {}", mcastIp);
-
- // Build key for the store and retrieve old data
- McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
-
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(oldSource)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
-
- // This device is not serving this multicast group
- if (!mcastRoleStore.containsKey(mcastStoreKey) ||
- !mcastNextObjStore.containsKey(mcastStoreKey)) {
- log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
- return;
- }
- NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
- Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
-
- // This an optimization to avoid unnecessary removal and add
- if (!mcastUtils.assignedVlanFromNext(nextObjective)
- .equals(mcastUtils.assignedVlan(newSource))) {
- // Let's remove old flows and groups
- removeGroupFromDevice(oldSource.deviceId(), mcastIp, mcastUtils.assignedVlan(oldSource));
- // Push new flows and group
- outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
- mcastIp, mcastUtils.assignedVlan(newSource)));
- }
- mcastUtils.addFilterToDevice(newSource.deviceId(), newSource.port(),
- mcastUtils.assignedVlan(newSource), mcastIp, INGRESS);
- // Setup mcast roles
- mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
- INGRESS);
+ log.debug("Processing route added for group {}", mcastIp);
+ // Just elect a new leader
+ mcastUtils.isLeader(mcastIp);
} finally {
mcastUnlock();
}
@@ -421,6 +398,12 @@
mcastLock();
try {
log.debug("Processing route removed for group {}", mcastIp);
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ mcastUtils.withdrawLeader(mcastIp);
+ return;
+ }
// Find out the ingress, transit and egress device of the affected group
DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -428,12 +411,6 @@
Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(source)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
-
// If there are no egress devices, sinks could be only on the ingress
if (!egressDevices.isEmpty()) {
egressDevices.forEach(
@@ -470,6 +447,11 @@
lastMcastChange = Instant.now();
mcastLock();
try {
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
// Remove the previous ones
Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
newSinks);
@@ -495,12 +477,6 @@
lastMcastChange = Instant.now();
mcastLock();
try {
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(source)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
-
boolean isLast;
// When source and sink are on the same device
if (source.deviceId().equals(sink.deviceId())) {
@@ -564,6 +540,11 @@
lastMcastChange = Instant.now();
mcastLock();
try {
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
// Get the only sinks to be processed (new ones)
Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
// Install new sinks
@@ -586,13 +567,6 @@
lastMcastChange = Instant.now();
mcastLock();
try {
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- log.debug("Skip {} due to lack of mastership of the source device {}",
- mcastIp, source.deviceId());
- return;
- }
-
// Process the ingress device
mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
mcastUtils.assignedVlan(source), mcastIp, INGRESS);
@@ -663,6 +637,11 @@
// TODO Optimize when the group editing is in place
log.debug("Processing link down {} for group {}",
affectedLink, mcastIp);
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
// Find out the ingress, transit and egress device of affected group
DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -680,13 +659,6 @@
return;
}
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- log.debug("Skip {} due to lack of mastership of the source device {}",
- mcastIp, source.deviceId());
- return;
- }
-
// Remove entire transit
transitDevices.forEach(transitDevice ->
removeGroupFromDevice(transitDevice, mcastIp,
@@ -753,6 +725,11 @@
// TODO Optimize when the group editing is in place
log.debug("Processing device down {} for group {}",
deviceDown, mcastIp);
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
// Find out the ingress, transit and egress device of affected group
DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -770,12 +747,6 @@
return;
}
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(source)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
-
// If it exists, we have to remove it in any case
if (!transitDevices.isEmpty()) {
// Remove entire transit
@@ -1660,6 +1631,12 @@
// Iterates over the route and updates properly the filtering objective
// on the source device.
srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+ log.debug("Update filter for {}", mcastRoute.group());
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastRoute.group())) {
+ log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+ return;
+ }
// FIXME To be addressed with multiple sources support
ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
.stream()
@@ -1724,12 +1701,10 @@
return;
}
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+ // Continue only when this instance is the leader of the group
+ if (!mcastUtils.isLeader(mcastIp)) {
log.trace("Unable to run buckets corrector. " +
- "Skip {} due to lack of mastership " +
- "of the source device {}",
- mcastIp, source.deviceId());
+ "Skip {} due to lack of leadership", mcastIp);
return;
}
@@ -1876,4 +1851,13 @@
}
}
+ /**
+ * Return the leaders of the mcast groups.
+ *
+ * @param mcastIp the group ip
+ * @return the mapping group-node
+ */
+ public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+ return mcastUtils.getMcastLeaders(mcastIp);
+ }
}