[CORD-2937] Improve work partition on Multicast
Change-Id: Ia8761245e7f199721c1228bfd500e0392a20de05
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 1e322f6..eff78e5 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -39,7 +39,7 @@
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.Event;
@@ -75,6 +75,7 @@
import org.onosproject.net.host.HostLocationProbingService;
import org.onosproject.net.host.HostService;
import org.onosproject.net.host.InterfaceIpAddress;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intf.Interface;
import org.onosproject.net.intf.InterfaceService;
import org.onosproject.net.link.LinkEvent;
@@ -212,7 +213,7 @@
public ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- public LeadershipService leadershipService;
+ public WorkPartitionService workPartitionService;
@Property(name = "activeProbing", boolValue = true,
label = "Enable active probing to discover dual-homed hosts.")
@@ -673,6 +674,11 @@
return mcastHandler.getMcastPaths(mcastIp);
}
+ @Override
+ public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+ return mcastHandler.getMcastLeaders(mcastIp);
+ }
+
/**
* Extracts the application ID from the manager.
*
@@ -1491,10 +1497,10 @@
case SINKS_ADDED:
case SINKS_REMOVED:
case ROUTE_REMOVED:
+ case ROUTE_ADDED:
log.trace("Schedule Mcast event {}", event);
mcastEventExecutor.execute(new InternalEventHandler(event));
break;
- case ROUTE_ADDED:
default:
log.warn("Unsupported mcast event type: {}", event.type());
break;
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
index b688726..aabed47 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
@@ -17,6 +17,7 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
@@ -247,4 +248,11 @@
*/
Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp);
+ /**
+ * Return the leaders of the mcast groups.
+ *
+ * @param mcastIp the group ip
+ * @return the mapping group-node
+ */
+ Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp);
}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/McastLeaderListCommand.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/McastLeaderListCommand.java
new file mode 100644
index 0000000..33a9fc9
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/McastLeaderListCommand.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.cli;
+
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mcast.cli.McastGroupCompleter;
+import org.onosproject.segmentrouting.SegmentRoutingService;
+
+import java.util.Map;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+/**
+ * Command to show the mcast leaders of the groups.
+ */
+@Command(scope = "onos", name = "sr-mcast-leader",
+ description = "Lists all mcast leaders")
+public class McastLeaderListCommand extends AbstractShellCommand {
+
+ // OSGi workaround to introduce package dependency
+ McastGroupCompleter completer;
+
+ // Format for group line
+ private static final String G_FORMAT_MAPPING = "group=%s, leader=%s";
+
+ @Option(name = "-gAddr", aliases = "--groupAddress",
+ description = "IP Address of the multicast group",
+ valueToShowInHelp = "224.0.0.0",
+ required = false, multiValued = false)
+ String gAddr = null;
+
+ @Override
+ protected void execute() {
+ // Verify mcast group
+ IpAddress mcastGroup = null;
+ if (!isNullOrEmpty(gAddr)) {
+ mcastGroup = IpAddress.valueOf(gAddr);
+ }
+ // Get SR service
+ SegmentRoutingService srService = get(SegmentRoutingService.class);
+ // Get the mapping
+ Map<IpAddress, NodeId> keyToRole = srService.getMcastLeaders(mcastGroup);
+ // And print local cache
+ keyToRole.forEach(this::printMcastLeder);
+ }
+
+ private void printMcastLeder(IpAddress mcastGroup,
+ NodeId nodeId) {
+ print(G_FORMAT_MAPPING, mcastGroup, nodeId);
+ }
+
+}
\ No newline at end of file
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 2c82cf5..9a53e13 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -28,6 +28,7 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mcast.api.McastEvent;
@@ -75,11 +76,13 @@
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
-import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
-import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
-import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
+
import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
@@ -93,19 +96,15 @@
// Reference to srManager and most used internal objects
private final SegmentRoutingManager srManager;
private final TopologyService topologyService;
+ private final McastUtils mcastUtils;
// Internal store of the Mcast nextobjectives
private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
// Internal store of the Mcast roles
private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
- // McastUtils
- private final McastUtils mcastUtils;
// Wait time for the cache
private static final int WAIT_TIME_MS = 1000;
- // Wait time for the removal of the old location
- private static final int HOST_MOVED_DELAY_MS = 1000;
-
/**
* The mcastEventCache is implemented to avoid race condition by giving more time to the
* underlying subsystems to process previous calls.
@@ -313,6 +312,11 @@
mcastLock();
try {
srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastRoute.group())) {
+ log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+ return;
+ }
// FIXME To be addressed with multiple sources support
ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
.stream()
@@ -356,56 +360,29 @@
*/
public void processMcastEvent(McastEvent event) {
log.info("process {}", event);
- // Just enqueue for now
- enqueueMcastEvent(event);
+ // If it is a route added, we do not enqueue
+ if (event.type() == ROUTE_ADDED) {
+ // We need just to elect a leader
+ processRouteAddedInternal(event.subject().route().group());
+ } else {
+ // Just enqueue for now
+ enqueueMcastEvent(event);
+ }
}
+
/**
- * Process the SOURCE_UPDATED event.
+ * Process the ROUTE_ADDED event.
*
- * @param newSource the updated srouce info
- * @param oldSource the outdated source info
+ * @param mcastIp the group address
*/
- private void processSourceUpdatedInternal(IpAddress mcastIp,
- ConnectPoint newSource,
- ConnectPoint oldSource) {
+ private void processRouteAddedInternal(IpAddress mcastIp) {
lastMcastChange = Instant.now();
mcastLock();
try {
- log.debug("Processing source updated for group {}", mcastIp);
-
- // Build key for the store and retrieve old data
- McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
-
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(oldSource)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
-
- // This device is not serving this multicast group
- if (!mcastRoleStore.containsKey(mcastStoreKey) ||
- !mcastNextObjStore.containsKey(mcastStoreKey)) {
- log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
- return;
- }
- NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
- Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
-
- // This an optimization to avoid unnecessary removal and add
- if (!mcastUtils.assignedVlanFromNext(nextObjective)
- .equals(mcastUtils.assignedVlan(newSource))) {
- // Let's remove old flows and groups
- removeGroupFromDevice(oldSource.deviceId(), mcastIp, mcastUtils.assignedVlan(oldSource));
- // Push new flows and group
- outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
- mcastIp, mcastUtils.assignedVlan(newSource)));
- }
- mcastUtils.addFilterToDevice(newSource.deviceId(), newSource.port(),
- mcastUtils.assignedVlan(newSource), mcastIp, INGRESS);
- // Setup mcast roles
- mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
- INGRESS);
+ log.debug("Processing route added for group {}", mcastIp);
+ // Just elect a new leader
+ mcastUtils.isLeader(mcastIp);
} finally {
mcastUnlock();
}
@@ -421,6 +398,12 @@
mcastLock();
try {
log.debug("Processing route removed for group {}", mcastIp);
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ mcastUtils.withdrawLeader(mcastIp);
+ return;
+ }
// Find out the ingress, transit and egress device of the affected group
DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -428,12 +411,6 @@
Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(source)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
-
// If there are no egress devices, sinks could be only on the ingress
if (!egressDevices.isEmpty()) {
egressDevices.forEach(
@@ -470,6 +447,11 @@
lastMcastChange = Instant.now();
mcastLock();
try {
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
// Remove the previous ones
Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
newSinks);
@@ -495,12 +477,6 @@
lastMcastChange = Instant.now();
mcastLock();
try {
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(source)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
-
boolean isLast;
// When source and sink are on the same device
if (source.deviceId().equals(sink.deviceId())) {
@@ -564,6 +540,11 @@
lastMcastChange = Instant.now();
mcastLock();
try {
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
// Get the only sinks to be processed (new ones)
Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
// Install new sinks
@@ -586,13 +567,6 @@
lastMcastChange = Instant.now();
mcastLock();
try {
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- log.debug("Skip {} due to lack of mastership of the source device {}",
- mcastIp, source.deviceId());
- return;
- }
-
// Process the ingress device
mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
mcastUtils.assignedVlan(source), mcastIp, INGRESS);
@@ -663,6 +637,11 @@
// TODO Optimize when the group editing is in place
log.debug("Processing link down {} for group {}",
affectedLink, mcastIp);
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
// Find out the ingress, transit and egress device of affected group
DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -680,13 +659,6 @@
return;
}
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- log.debug("Skip {} due to lack of mastership of the source device {}",
- mcastIp, source.deviceId());
- return;
- }
-
// Remove entire transit
transitDevices.forEach(transitDevice ->
removeGroupFromDevice(transitDevice, mcastIp,
@@ -753,6 +725,11 @@
// TODO Optimize when the group editing is in place
log.debug("Processing device down {} for group {}",
deviceDown, mcastIp);
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
// Find out the ingress, transit and egress device of affected group
DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
@@ -770,12 +747,6 @@
return;
}
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(source)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
-
// If it exists, we have to remove it in any case
if (!transitDevices.isEmpty()) {
// Remove entire transit
@@ -1660,6 +1631,12 @@
// Iterates over the route and updates properly the filtering objective
// on the source device.
srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+ log.debug("Update filter for {}", mcastRoute.group());
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastRoute.group())) {
+ log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+ return;
+ }
// FIXME To be addressed with multiple sources support
ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
.stream()
@@ -1724,12 +1701,10 @@
return;
}
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+ // Continue only when this instance is the leader of the group
+ if (!mcastUtils.isLeader(mcastIp)) {
log.trace("Unable to run buckets corrector. " +
- "Skip {} due to lack of mastership " +
- "of the source device {}",
- mcastIp, source.deviceId());
+ "Skip {} due to lack of leadership", mcastIp);
return;
}
@@ -1876,4 +1851,13 @@
}
}
+ /**
+ * Return the leaders of the mcast groups.
+ *
+ * @param mcastIp the group ip
+ * @return the mapping group-node
+ */
+ public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+ return mcastUtils.getMcastLeaders(mcastIp);
+ }
}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index 37f273b..924794b 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -16,7 +16,11 @@
package org.onosproject.segmentrouting.mcast;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
@@ -70,6 +74,10 @@
private SegmentRoutingManager srManager;
// Internal reference to the app id
private ApplicationId coreAppId;
+ // Hashing function for the multicast hasher
+ private static final HashFunction HASH_FN = Hashing.md5();
+ // Read only cache of the Mcast leader
+ private Map<IpAddress, NodeId> mcastLeaderCache;
/**
* Builds a new McastUtils object.
@@ -82,32 +90,7 @@
this.srManager = srManager;
this.coreAppId = coreAppId;
this.log = log;
- }
-
- /**
- * Given a connect point define a leader for it.
- *
- * @param source the source connect point
- * @return true if this instance is the leader, otherwise false
- */
- boolean isLeader(ConnectPoint source) {
- // Continue only when we have the mastership on the operation
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- // When the source is available we just check the mastership
- if (srManager.deviceService.isAvailable(source.deviceId())) {
- return false;
- }
- // Fallback with Leadership service
- // source id is used a topic
- NodeId leader = srManager.leadershipService.runForLeadership(
- source.deviceId().toString()).leaderNodeId();
- // Verify if this node is the leader
- if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
- return false;
- }
- }
- // Done
- return true;
+ this.mcastLeaderCache = Maps.newConcurrentMap();
}
/**
@@ -425,4 +408,60 @@
.forEach(instr -> builder.add(((Instructions.OutputInstruction) instr).port())));
return builder.build();
}
+
+ /**
+ * Returns the hash of the group address.
+ *
+ * @param ipAddress the ip address
+ * @return the hash of the address
+ */
+ private Long hasher(IpAddress ipAddress) {
+ return HASH_FN.newHasher()
+ .putBytes(ipAddress.toOctets())
+ .hash()
+ .asLong();
+ }
+
+ /**
+ * Given a multicast group define a leader for it.
+ *
+ * @param mcastIp the group address
+ * @return true if the instance is the leader of the group
+ */
+ boolean isLeader(IpAddress mcastIp) {
+ // Get our id
+ final NodeId currentNodeId = srManager.clusterService.getLocalNode().id();
+ // Get the leader for this group using the ip address as key
+ final NodeId leader = srManager.workPartitionService.getLeader(mcastIp, this::hasher);
+ // If there is not a leader, let's send an error
+ if (leader == null) {
+ log.error("Fail to elect a leader for {}.", mcastIp);
+ return false;
+ }
+ // Update cache and return operation result
+ mcastLeaderCache.put(mcastIp, leader);
+ return currentNodeId.equals(leader);
+ }
+
+ /**
+ * Given a multicast group withdraw its leader.
+ *
+ * @param mcastIp the group address
+ */
+ void withdrawLeader(IpAddress mcastIp) {
+ // For now just update the cache
+ mcastLeaderCache.remove(mcastIp);
+ }
+
+ Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
+ // If mcast ip is present
+ if (mcastIp != null) {
+ return mcastLeaderCache.entrySet().stream()
+ .filter(entry -> entry.getKey().equals(mcastIp))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue));
+ }
+ // Otherwise take all the groups
+ return ImmutableMap.copyOf(mcastLeaderCache);
+ }
}
diff --git a/apps/segmentrouting/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/segmentrouting/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 647eb49..4a3329a 100644
--- a/apps/segmentrouting/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/apps/segmentrouting/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -81,6 +81,12 @@
<entry key="-gAddr" value-ref="mcastGroupCompleter"/>
</optional-completers>
</command>
+ <command>
+ <action class="org.onosproject.segmentrouting.cli.McastLeaderListCommand"/>
+ <optional-completers>
+ <entry key="-gAddr" value-ref="mcastGroupCompleter"/>
+ </optional-completers>
+ </command>
</command-bundle>
<bean id="nullCompleter" class="org.apache.karaf.shell.console.completer.NullCompleter"/>