[CORD-2834] Handling of dual-homed sinks
Includes also McastUtils to move some code out of the McastHandler
Change-Id: I101637ee600c9d524f17e9f3fc29d63256844956
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
index b0875ca..930432d 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
@@ -18,6 +18,7 @@
import org.onlab.packet.IpAddress;
import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.HostId;
import java.util.Objects;
@@ -31,6 +32,8 @@
class McastCacheKey {
// The group ip
private final IpAddress mcastIp;
+ // The sink id
+ private final HostId sinkHost;
// The sink connect point
private final ConnectPoint sink;
@@ -39,6 +42,8 @@
*
* @param mcastIp multicast group IP address
* @param sink connect point of the sink
+ *
+ * @deprecated in 1.12 ("Magpie") release.
*/
public McastCacheKey(IpAddress mcastIp, ConnectPoint sink) {
checkNotNull(mcastIp, "mcastIp cannot be null");
@@ -46,6 +51,22 @@
checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
this.mcastIp = mcastIp;
this.sink = sink;
+ this.sinkHost = null;
+ }
+
+ /**
+ * Constructs a key for multicast event cache.
+ *
+ * @param mcastIp multicast group IP address
+ * @param hostId id of the sink
+ */
+ public McastCacheKey(IpAddress mcastIp, HostId hostId) {
+ checkNotNull(mcastIp, "mcastIp cannot be null");
+ checkNotNull(hostId, "sink cannot be null");
+ checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
+ this.mcastIp = mcastIp;
+ this.sinkHost = hostId;
+ this.sink = null;
}
/**
@@ -61,11 +82,22 @@
* Returns the sink of this key.
*
* @return connect point of the sink
+ *
+ * @deprecated in 1.12 ("Magpie") release.
*/
public ConnectPoint sink() {
return sink;
}
+ /**
+ * Returns the sink of this key.
+ *
+ * @return host id of the sink
+ */
+ public HostId sinkHost() {
+ return sinkHost;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -77,7 +109,8 @@
McastCacheKey that =
(McastCacheKey) o;
return (Objects.equals(this.mcastIp, that.mcastIp) &&
- Objects.equals(this.sink, that.sink));
+ Objects.equals(this.sink, that.sink) &&
+ Objects.equals(this.sinkHost, that.sinkHost));
}
@Override
@@ -90,6 +123,7 @@
return toStringHelper(getClass())
.add("mcastIp", mcastIp)
.add("sink", sink)
+ .add("sinkHost", sinkHost)
.toString();
}
}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 968ad3b..4c83a38 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -20,40 +20,27 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.onlab.packet.Ethernet;
import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mcast.api.McastEvent;
import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.McastRouteData;
import org.onosproject.mcast.api.McastRouteUpdate;
-import org.onosproject.net.config.basics.McastConfig;
+import org.onosproject.net.HostId;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.Path;
import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.DefaultTrafficSelector;
-import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.TrafficSelector;
-import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.criteria.Criteria;
-import org.onosproject.net.flow.criteria.VlanIdCriterion;
-import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
-import org.onosproject.net.flowobjective.DefaultFilteringObjective;
-import org.onosproject.net.flowobjective.DefaultForwardingObjective;
-import org.onosproject.net.flowobjective.DefaultNextObjective;
import org.onosproject.net.flowobjective.DefaultObjectiveContext;
-import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.ObjectiveContext;
@@ -62,14 +49,10 @@
import org.onosproject.net.topology.TopologyService;
import org.onosproject.segmentrouting.SRLinkWeigher;
import org.onosproject.segmentrouting.SegmentRoutingManager;
-import org.onosproject.segmentrouting.SegmentRoutingService;
-import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
-import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
import org.onosproject.segmentrouting.storekey.McastStoreKey;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,9 +75,11 @@
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.mcast.api.McastEvent.Type.*;
-import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
-import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
@@ -103,17 +88,24 @@
* Handles Multicast related events.
*/
public class McastHandler {
+ // Logger instance
private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
+ // Reference to srManager and most used internal objects
private final SegmentRoutingManager srManager;
- private final ApplicationId coreAppId;
- private final StorageService storageService;
private final TopologyService topologyService;
+ // Internal store of the Mcast nextobjectives
private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
- private final KryoNamespace.Builder mcastKryo;
+ // Internal store of the Mcast roles
private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
+ // McastUtils
+ private final McastUtils mcastUtils;
// Wait time for the cache
private static final int WAIT_TIME_MS = 1000;
+
+ // Wait time for the removal of the old location
+ private static final int HOST_MOVED_DELAY_MS = 1000;
+
/**
* The mcastEventCache is implemented to avoid race condition by giving more time to the
* underlying subsystems to process previous calls.
@@ -123,7 +115,7 @@
.removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
// Get group ip, sink and related event
IpAddress mcastIp = notification.getKey().mcastIp();
- ConnectPoint sink = notification.getKey().sink();
+ HostId sink = notification.getKey().sinkHost();
McastEvent mcastEvent = notification.getValue();
RemovalCause cause = notification.getCause();
log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
@@ -140,50 +132,42 @@
}).build();
private void enqueueMcastEvent(McastEvent mcastEvent) {
- log.debug("Enqueue mcastEvent {}", mcastEvent);
+ // Retrieve, currentData, prevData and the group
final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
- final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
- final IpAddress group = prevUpdate.route().group();
+ final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
+ final IpAddress group = mcastRoutePrevUpdate.route().group();
// Let's create the keys of the cache
- ImmutableSet.Builder<ConnectPoint> sinksBuilder = ImmutableSet.builder();
- Set<ConnectPoint> sinks;
- // For this event we will have a set of sinks
+ ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
if (mcastEvent.type() == SOURCES_ADDED ||
mcastEvent.type() == SOURCES_REMOVED) {
// FIXME To be addressed with multiple sources support
- sinks = mcastRouteUpdate.sinks()
- .values()
- .stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- } else {
- Set<ConnectPoint> prevSinks = prevUpdate.sinks()
- .values()
- .stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- if (mcastEvent.type() == ROUTE_REMOVED) {
- // Get the old sinks, since current subject is null
- sinks = prevSinks;
- } else {
- // Get new sinks
- Set<ConnectPoint> newsinks = mcastRouteUpdate.sinks()
- .values()
- .stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- // If it is a SINKS_ADDED event
- if (mcastEvent.type() == SINKS_ADDED) {
- // Let's do the difference between current and prev subjects
- sinks = Sets.difference(newsinks, prevSinks);
- } else {
- // Let's do the difference between prev and current subjects
- sinks = Sets.difference(prevSinks, newsinks);
+ sinksBuilder.addAll(Collections.emptySet());
+ } else if (mcastEvent.type() == SINKS_ADDED) {
+ // We need to process the host id one by one
+ mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
+ // Get the previous locations and verify if there are changes
+ Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
+ Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
+ prevConnectPoints : Collections.emptySet());
+ if (!changes.isEmpty()) {
+ sinksBuilder.add(hostId);
}
- }
+ }));
+ } else if (mcastEvent.type() == SINKS_REMOVED) {
+ // We need to process the host id one by one
+ mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
+ // Get the current locations and verify if there are changes
+ Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
+ Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
+ currentConnectPoints : Collections.emptySet());
+ if (!changes.isEmpty()) {
+ sinksBuilder.add(hostId);
+ }
+ }));
+ } else if (mcastEvent.type() == ROUTE_REMOVED) {
+ // Current subject is null, just take the previous host ids
+ sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
}
- // Add all the sinks
- sinksBuilder.addAll(sinks);
// Push the elements in the cache
sinksBuilder.build().forEach(sink -> {
McastCacheKey cacheKey = new McastCacheKey(group, sink);
@@ -192,24 +176,22 @@
}
private void dequeueMcastEvent(McastEvent mcastEvent) {
- log.debug("Dequeue mcastEvent {}", mcastEvent);
- final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
- final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
+ // Get new and old data
+ final McastRouteUpdate mcastUpdate = mcastEvent.subject();
+ final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
// Get source, mcast group
// FIXME To be addressed with multiple sources support
- ConnectPoint prevSource = prevUpdate.sources()
+ final ConnectPoint source = mcastPrevUpdate.sources()
.stream()
.findFirst()
.orElse(null);
- IpAddress mcastIp = prevUpdate.route().group();
- Set<ConnectPoint> prevSinks = prevUpdate.sinks()
+ IpAddress mcastIp = mcastPrevUpdate.route().group();
+ // Get all the previous sinks
+ Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());
- Set<ConnectPoint> newSinks;
- // Sinks to handled by SINKS_ADDED and SINKS_REMOVED procedures
- Set<ConnectPoint> sinks;
// According to the event type let's call the proper method
switch (mcastEvent.type()) {
case SOURCES_ADDED:
@@ -230,27 +212,17 @@
break;
case ROUTE_REMOVED:
// Process the route removed, just the first cached element will be processed
- processRouteRemovedInternal(prevSource, mcastIp);
+ processRouteRemovedInternal(source, mcastIp);
break;
case SINKS_ADDED:
- // Get the only sinks to be processed (new ones)
- newSinks = mcastRouteUpdate.sinks()
- .values()
- .stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- sinks = Sets.difference(newSinks, prevSinks);
- sinks.forEach(sink -> processSinkAddedInternal(prevSource, sink, mcastIp, null));
+ // FIXME To be addressed with multiple sources support
+ processSinksAddedInternal(source, mcastIp,
+ mcastUpdate.sinks(), prevSinks);
break;
case SINKS_REMOVED:
- // Get the only sinks to be processed (old ones)
- newSinks = mcastRouteUpdate.sinks()
- .values()
- .stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- sinks = Sets.difference(prevSinks, newSinks);
- sinks.forEach(sink -> processSinkRemovedInternal(prevSource, sink, mcastIp));
+ // FIXME To be addressed with multiple sources support
+ processSinksRemovedInternal(source, mcastIp,
+ mcastUpdate.sinks(), prevSinks);
break;
default:
break;
@@ -306,28 +278,28 @@
* @param srManager Segment Routing manager
*/
public McastHandler(SegmentRoutingManager srManager) {
- coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
+ ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
this.srManager = srManager;
- this.storageService = srManager.storageService;
this.topologyService = srManager.topologyService;
- mcastKryo = new KryoNamespace.Builder()
+ KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.register(McastStoreKey.class)
.register(McastRole.class);
- mcastNextObjStore = storageService
+ mcastNextObjStore = srManager.storageService
.<McastStoreKey, NextObjective>consistentMapBuilder()
.withName("onos-mcast-nextobj-store")
.withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
.build();
- mcastRoleStore = storageService
+ mcastRoleStore = srManager.storageService
.<McastStoreKey, McastRole>consistentMapBuilder()
.withName("onos-mcast-role-store")
.withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
.build();
+ // Let's create McastUtils object
+ mcastUtils = new McastUtils(srManager, coreAppId, log);
// Init the executor service and the buckets corrector
executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
- MCAST_VERIFY_INTERVAL,
- TimeUnit.SECONDS);
+ MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
// Schedule the clean up, this will allow the processing of the expired events
executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
WAIT_TIME_MS, TimeUnit.MILLISECONDS);
@@ -337,28 +309,36 @@
* Read initial multicast from mcast store.
*/
public void init() {
- srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
- // FIXME To be addressed with multiple sources support
- ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
- .stream()
- .findFirst()
- .orElse(null);
- Set<ConnectPoint> sinks = srManager.multicastRouteService.sinks(mcastRoute);
- // Filter out all the working sinks, we do not want to move them
- sinks = sinks.stream()
- .filter(sink -> {
- McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
- Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
- return verMcastNext == null ||
- !getPorts(verMcastNext.value().next()).contains(sink.port());
- })
- .collect(Collectors.toSet());
- // Compute the Mcast tree
- Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
- // Process the given sinks using the pre-computed paths
- mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
- mcastRoute.group(), paths));
- });
+ lastMcastChange = Instant.now();
+ mcastLock();
+ try {
+ srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+ // FIXME To be addressed with multiple sources support
+ ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
+ .stream()
+ .findFirst()
+ .orElse(null);
+ // Get all the sinks and process them
+ McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
+ Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(), mcastRouteData.sinks());
+ // Filter out all the working sinks, we do not want to move them
+ sinks = sinks.stream()
+ .filter(sink -> {
+ McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
+ Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
+ return verMcastNext == null ||
+ !mcastUtils.getPorts(verMcastNext.value().next()).contains(sink.port());
+ })
+ .collect(Collectors.toSet());
+ // Compute the Mcast tree
+ Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
+ // Process the given sinks using the pre-computed paths
+ mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
+ mcastRoute.group(), paths));
+ });
+ } finally {
+ mcastUnlock();
+ }
}
/**
@@ -398,7 +378,7 @@
McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
// Verify leadership on the operation
- if (!isLeader(oldSource)) {
+ if (!mcastUtils.isLeader(oldSource)) {
log.debug("Skip {} due to lack of leadership", mcastIp);
return;
}
@@ -410,18 +390,19 @@
return;
}
NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
- Set<PortNumber> outputPorts = getPorts(nextObjective.next());
+ Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
// This an optimization to avoid unnecessary removal and add
- if (!assignedVlanFromNext(nextObjective).equals(assignedVlan(newSource))) {
+ if (!mcastUtils.assignedVlanFromNext(nextObjective)
+ .equals(mcastUtils.assignedVlan(newSource))) {
// Let's remove old flows and groups
- removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
+ removeGroupFromDevice(oldSource.deviceId(), mcastIp, mcastUtils.assignedVlan(oldSource));
// Push new flows and group
outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
- mcastIp, assignedVlan(newSource)));
+ mcastIp, mcastUtils.assignedVlan(newSource)));
}
- addFilterToDevice(newSource.deviceId(), newSource.port(),
- assignedVlan(newSource), mcastIp, INGRESS);
+ mcastUtils.addFilterToDevice(newSource.deviceId(), newSource.port(),
+ mcastUtils.assignedVlan(newSource), mcastIp, INGRESS);
// Setup mcast roles
mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
INGRESS);
@@ -448,7 +429,7 @@
Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
// Verify leadership on the operation
- if (!isLeader(source)) {
+ if (!mcastUtils.isLeader(source)) {
log.debug("Skip {} due to lack of leadership", mcastIp);
return;
}
@@ -456,24 +437,60 @@
// If there are no egress devices, sinks could be only on the ingress
if (!egressDevices.isEmpty()) {
egressDevices.forEach(
- deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
+ deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
);
}
// Transit could be empty if sinks are on the ingress
if (!transitDevices.isEmpty()) {
transitDevices.forEach(
- deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
+ deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
);
}
// Ingress device should be not null
if (ingressDevice != null) {
- removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
+ removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
}
} finally {
mcastUnlock();
}
}
+
+ /**
+ * Process sinks to be removed.
+ *
+ * @param source the source connect point
+ * @param mcastIp the ip address of the group
+ * @param newSinks the new sinks to be processed
+ * @param allPrevSinks all previous sinks
+ */
+ private void processSinksRemovedInternal(ConnectPoint source, IpAddress mcastIp,
+ Map<HostId, Set<ConnectPoint>> newSinks,
+ Set<ConnectPoint> allPrevSinks) {
+ lastMcastChange = Instant.now();
+ mcastLock();
+ // Let's instantiate the sinks to be removed
+ Set<ConnectPoint> sinksToBeRemoved = Sets.newHashSet();
+ try {
+ // Recover the dual-homed sinks
+ Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks);
+ sinksToBeRecovered.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
+ // Get the only sinks to be processed (old ones)
+ Set<ConnectPoint> allNewSinks = newSinks.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ // Remove the previous one
+ sinksToBeRemoved.addAll(Sets.difference(allPrevSinks, allNewSinks));
+ } finally {
+ mcastUnlock();
+ // Let's schedule the removal of the previous sinks
+ executorService.schedule(
+ () -> sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp)),
+ HOST_MOVED_DELAY_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+
/**
* Removes a path from source to sink for given multicast group.
*
@@ -482,17 +499,17 @@
* @param mcastIp multicast group IP address
*/
private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
- IpAddress mcastIp) {
+ IpAddress mcastIp) {
lastMcastChange = Instant.now();
mcastLock();
try {
// Verify leadership on the operation
- if (!isLeader(source)) {
+ if (!mcastUtils.isLeader(source)) {
log.debug("Skip {} due to lack of leadership", mcastIp);
return;
}
- boolean isLast = false;
+ boolean isLast;
// When source and sink are on the same device
if (source.deviceId().equals(sink.deviceId())) {
// Source and sink are on even the same port. There must be something wrong.
@@ -501,7 +518,7 @@
mcastIp, sink, source);
return;
}
- isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
+ isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
if (isLast) {
mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
}
@@ -509,7 +526,7 @@
}
// Process the egress device
- isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+ isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
if (isLast) {
mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
}
@@ -526,7 +543,8 @@
link.src().deviceId(),
link.src().port(),
mcastIp,
- assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
+ mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ?
+ source : null)
);
if (isLast) {
mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
@@ -539,6 +557,31 @@
}
}
+
+ /**
+ * Process sinks to be added.
+ *
+ * @param source the source connect point
+ * @param mcastIp the group IP
+ * @param newSinks the new sinks to be processed
+ * @param allPrevSinks all previous sinks
+ */
+ private void processSinksAddedInternal(ConnectPoint source, IpAddress mcastIp,
+ Map<HostId, Set<ConnectPoint>> newSinks,
+ Set<ConnectPoint> allPrevSinks) {
+ lastMcastChange = Instant.now();
+ mcastLock();
+ try {
+ // Get the only sinks to be processed (new ones)
+ Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
+ // Install new sinks
+ sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
+ sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
+ } finally {
+ mcastUnlock();
+ }
+ }
+
/**
* Establishes a path from source to sink for given multicast group.
*
@@ -547,7 +590,7 @@
* @param mcastIp multicast group IP address
*/
private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
- IpAddress mcastIp, List<Path> allPaths) {
+ IpAddress mcastIp, List<Path> allPaths) {
lastMcastChange = Instant.now();
mcastLock();
try {
@@ -559,8 +602,8 @@
}
// Process the ingress device
- addFilterToDevice(source.deviceId(), source.port(),
- assignedVlan(source), mcastIp, INGRESS);
+ mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
+ mcastUtils.assignedVlan(source), mcastIp, INGRESS);
// When source and sink are on the same device
if (source.deviceId().equals(sink.deviceId())) {
@@ -570,7 +613,7 @@
mcastIp, sink, source);
return;
}
- addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
+ addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), INGRESS);
return;
}
@@ -588,9 +631,10 @@
// Setup properly the transit
links.forEach(link -> {
addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
- assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
- addFilterToDevice(link.dst().deviceId(), link.dst().port(),
- assignedVlan(null), mcastIp, null);
+ mcastUtils.assignedVlan(link.src().deviceId()
+ .equals(source.deviceId()) ? source : null));
+ mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
+ mcastUtils.assignedVlan(null), mcastIp, null);
});
// Setup mcast role for the transit
@@ -600,7 +644,7 @@
TRANSIT));
// Process the egress device
- addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+ addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
// Setup mcast role for egress
mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
EGRESS);
@@ -633,7 +677,7 @@
.stream().findAny().orElse(null);
Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
- ConnectPoint source = getSource(mcastIp);
+ ConnectPoint source = mcastUtils.getSource(mcastIp);
// Do not proceed if ingress device or source of this group are missing
// If sinks are in other leafs, we have ingress, transit, egress, and source
@@ -653,28 +697,50 @@
// Remove entire transit
transitDevices.forEach(transitDevice ->
- removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)));
+ removeGroupFromDevice(transitDevice, mcastIp,
+ mcastUtils.assignedVlan(null)));
// Remove transit-facing ports on the ingress device
removeIngressTransitPorts(mcastIp, ingressDevice, source);
+ // TODO create a shared procedure with DEVICE_DOWN
// Compute mcast tree for the the egress devices
Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
- // Construct a new path for each egress device
+ // We have to verify, if there are egresses without paths
+ Set<DeviceId> notRecovered = Sets.newHashSet();
mcastTree.forEach((egressDevice, paths) -> {
- // We try to enforce the sinks path on the mcast tree
+ // Let's check if there is at least a path
Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
mcastIp, paths);
- // If a path is present, let's install it
- if (mcastPath.isPresent()) {
- installPath(mcastIp, source, mcastPath.get());
- } else {
+ // No paths, we have to try with alternative location
+ if (!mcastPath.isPresent()) {
+ notRecovered.add(egressDevice);
+ // We were not able to find an alternative path for this egress
log.warn("Fail to recover egress device {} from link failure {}",
egressDevice, affectedLink);
- removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+ removeGroupFromDevice(egressDevice, mcastIp,
+ mcastUtils.assignedVlan(null));
}
});
+
+ // Fast path, we can recover all the locations
+ if (notRecovered.isEmpty()) {
+ // Construct a new path for each egress device
+ mcastTree.forEach((egressDevice, paths) -> {
+ // We try to enforce the sinks path on the mcast tree
+ Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+ mcastIp, paths);
+ // If a path is present, let's install it
+ if (mcastPath.isPresent()) {
+ installPath(mcastIp, source, mcastPath.get());
+ }
+ });
+ } else {
+ // Let's try to recover using alternate
+ recoverSinks(egressDevices, notRecovered, mcastIp,
+ ingressDevice, source, true);
+ }
});
} finally {
mcastUnlock();
@@ -701,7 +767,7 @@
.stream().findAny().orElse(null);
Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
- ConnectPoint source = getSource(mcastIp);
+ ConnectPoint source = mcastUtils.getSource(mcastIp);
// Do not proceed if ingress device or source of this group are missing
// If sinks are in other leafs, we have ingress, transit, egress, and source
@@ -713,7 +779,7 @@
}
// Verify leadership on the operation
- if (!isLeader(source)) {
+ if (!mcastUtils.isLeader(source)) {
log.debug("Skip {} due to lack of leadership", mcastIp);
return;
}
@@ -722,17 +788,19 @@
if (!transitDevices.isEmpty()) {
// Remove entire transit
transitDevices.forEach(transitDevice ->
- removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)));
+ removeGroupFromDevice(transitDevice, mcastIp,
+ mcastUtils.assignedVlan(null)));
}
// If the ingress is down
if (ingressDevice.equals(deviceDown)) {
// Remove entire ingress
- removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
+ removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
// If other sinks different from the ingress exist
if (!egressDevices.isEmpty()) {
// Remove all the remaining egress
egressDevices.forEach(
- egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
+ egressDevice -> removeGroupFromDevice(egressDevice, mcastIp,
+ mcastUtils.assignedVlan(null))
);
}
} else {
@@ -743,7 +811,7 @@
// One of the egress device is down
if (egressDevices.contains(deviceDown)) {
// Remove entire device down
- removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
+ removeGroupFromDevice(deviceDown, mcastIp, mcastUtils.assignedVlan(null));
// Remove the device down from egress
egressDevices.remove(deviceDown);
// If there are no more egress and ingress does not have sinks
@@ -756,21 +824,39 @@
// Compute mcast tree for the the egress devices
Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
- // Construct a new path for each egress device
+ // We have to verify, if there are egresses without paths
+ Set<DeviceId> notRecovered = Sets.newHashSet();
mcastTree.forEach((egressDevice, paths) -> {
+ // Let's check if there is at least a path
Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
- mcastIp, null);
- // If there is a new path
- if (mcastPath.isPresent()) {
- // Let's install the new mcast path for this egress
- installPath(mcastIp, source, mcastPath.get());
- } else {
+ mcastIp, paths);
+ // No paths, we have to try with alternative location
+ if (!mcastPath.isPresent()) {
+ notRecovered.add(egressDevice);
// We were not able to find an alternative path for this egress
log.warn("Fail to recover egress device {} from device down {}",
egressDevice, deviceDown);
- removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+ removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
}
});
+
+ // Fast path, we can recover all the locations
+ if (notRecovered.isEmpty()) {
+ // Construct a new path for each egress device
+ mcastTree.forEach((egressDevice, paths) -> {
+ // We try to enforce the sinks path on the mcast tree
+ Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+ mcastIp, paths);
+ // If a path is present, let's install it
+ if (mcastPath.isPresent()) {
+ installPath(mcastIp, source, mcastPath.get());
+ }
+ });
+ } else {
+ // Let's try to recover using alternate
+ recoverSinks(egressDevices, notRecovered, mcastIp,
+ ingressDevice, source, false);
+ }
}
});
} finally {
@@ -779,6 +865,187 @@
}
/**
+ * Try to recover sinks using alternate locations.
+ *
+ * @param egressDevices the original egress devices
+ * @param notRecovered the devices not recovered
+ * @param mcastIp the group address
+ * @param ingressDevice the ingress device
+ * @param source the source connect point
+ * @param isLinkFailure true if it is a link failure, otherwise false
+ */
+ private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
+ IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source,
+ boolean isLinkFailure) {
+ // Recovered devices
+ Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
+ // Total affected sinks
+ Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
+ // Total sinks
+ Set<ConnectPoint> totalSinks = Sets.newHashSet();
+ // Let's compute all the affected sinks and all the sinks
+ notRecovered.forEach(deviceId -> {
+ totalAffectedSinks.addAll(
+ mcastUtils.getAffectedSinks(deviceId, mcastIp)
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
+ .collect(Collectors.toSet())
+ );
+ totalSinks.addAll(
+ mcastUtils.getAffectedSinks(deviceId, mcastIp)
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ );
+ });
+
+ // Sinks to be added
+ Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
+ // New egress devices, filtering out the source
+ Set<DeviceId> newEgressDevice = sinksToBeAdded.stream()
+ .map(ConnectPoint::deviceId)
+ .collect(Collectors.toSet());
+ // Let's add the devices recovered from the previous round
+ newEgressDevice.addAll(recovered);
+ // Let's do a copy of the new egresses and filter out the source
+ Set<DeviceId> copyNewEgressDevice = ImmutableSet.copyOf(newEgressDevice);
+ newEgressDevice = newEgressDevice.stream()
+ .filter(deviceId -> !deviceId.equals(ingressDevice))
+ .collect(Collectors.toSet());
+
+ // Re-compute mcast tree for the the egress devices
+ Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevice);
+ // if the source was originally in the new locations, add new sinks
+ if (copyNewEgressDevice.contains(ingressDevice)) {
+ sinksToBeAdded.stream()
+ .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
+ .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
+ }
+
+ // Construct a new path for each egress device
+ mcastTree.forEach((egressDevice, paths) -> {
+ // We try to enforce the sinks path on the mcast tree
+ Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+ mcastIp, paths);
+ // If a path is present, let's install it
+ if (mcastPath.isPresent()) {
+ // Using recovery procedure
+ if (recovered.contains(egressDevice)) {
+ installPath(mcastIp, source, mcastPath.get());
+ } else {
+ // otherwise we need to threat as new sink
+ sinksToBeAdded.stream()
+ .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
+ .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
+ }
+ } else {
+ // We were not able to find an alternative path for this egress
+ log.warn("Fail to recover egress device {} from {} failure",
+ egressDevice, isLinkFailure ? "Link" : "Device");
+ removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
+ }
+ });
+
+ }
+
+ /**
+ * Process new locations and return the set of sinks to be added
+ * in the context of the recovery.
+ *
+ * @param sinks the remaining locations
+ * @return the set of the sinks to be processed
+ */
+ private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
+ Map<HostId, Set<ConnectPoint>> sinks) {
+ // Iterate over the sinks in order to build the set
+ // of the connect points to be served by this group
+ final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+ sinks.forEach((hostId, connectPoints) -> {
+ // If it has more than 1 locations
+ if (connectPoints.size() > 1 || connectPoints.size() == 0) {
+ log.debug("Skip {} since sink {} has {} locations",
+ mcastIp, hostId, connectPoints.size());
+ return;
+ }
+ sinksToBeProcessed.add(connectPoints.stream()
+ .findFirst().orElseGet(null));
+ });
+ return sinksToBeProcessed;
+ }
+
+ /**
+ * Process all the sinks related to a mcast group and return
+ * the ones to be processed.
+ *
+ * @param source the source connect point
+ * @param mcastIp the group address
+ * @param sinks the sinks to be evaluated
+ * @return the set of the sinks to be processed
+ */
+ private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
+ Map<HostId, Set<ConnectPoint>> sinks) {
+ // Iterate over the sinks in order to build the set
+ // of the connect points to be served by this group
+ final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+ sinks.forEach(((hostId, connectPoints) -> {
+ // If it has more than 2 locations
+ if (connectPoints.size() > 2 || connectPoints.size() == 0) {
+ log.debug("Skip {} since sink {} has {} locations",
+ mcastIp, hostId, connectPoints.size());
+ return;
+ }
+ // If it has one location, just use it
+ if (connectPoints.size() == 1) {
+ sinksToBeProcessed.add(connectPoints.stream()
+ .findFirst().orElseGet(null));
+ return;
+ }
+ // We prefer to reuse existing flows
+ ConnectPoint sinkToBeProcessed = connectPoints.stream()
+ .filter(connectPoint -> {
+ // Let's check if we are already serving that location
+ McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
+ if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
+ return false;
+ }
+ // Get next and check with the port
+ NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
+ return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
+ })
+ .findFirst().orElse(null);
+ if (sinkToBeProcessed != null) {
+ sinksToBeProcessed.add(sinkToBeProcessed);
+ return;
+ }
+ // Otherwise we prefer to reuse existing egresses
+ Set<DeviceId> egresses = getDevice(mcastIp, EGRESS);
+ sinkToBeProcessed = connectPoints.stream()
+ .filter(egresses::contains)
+ .findFirst().orElse(null);
+ if (sinkToBeProcessed != null) {
+ sinksToBeProcessed.add(sinkToBeProcessed);
+ return;
+ }
+ // Otherwise we prefer a location co-located with the source (if it exists)
+ sinkToBeProcessed = connectPoints.stream()
+ .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
+ .findFirst().orElse(null);
+ if (sinkToBeProcessed != null) {
+ sinksToBeProcessed.add(sinkToBeProcessed);
+ return;
+ }
+ // Finally, we randomly pick a new location
+ sinksToBeProcessed.add(connectPoints.stream()
+ .findFirst().orElseGet(null));
+ }));
+ // We have done, return the set
+ return sinksToBeProcessed;
+ }
+
+ /**
* Utility method to remove all the ingress transit ports.
*
* @param mcastIp the group ip
@@ -791,7 +1058,7 @@
ingressTransitPorts.forEach(ingressTransitPort -> {
if (ingressTransitPort != null) {
boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
- mcastIp, assignedVlan(source));
+ mcastIp, mcastUtils.assignedVlan(source));
if (isLast) {
mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
}
@@ -800,43 +1067,6 @@
}
/**
- * Adds filtering objective for given device and port.
- *
- * @param deviceId device ID
- * @param port ingress port number
- * @param assignedVlan assigned VLAN ID
- */
- private void addFilterToDevice(DeviceId deviceId, PortNumber port,
- VlanId assignedVlan, IpAddress mcastIp, McastRole mcastRole) {
- // Do nothing if the port is configured as suppressed
- ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
- SegmentRoutingAppConfig appConfig = srManager.cfgService
- .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
- if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
- log.info("Ignore suppressed port {}", connectPoint);
- return;
- }
-
- MacAddress routerMac;
- try {
- routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
- } catch (DeviceConfigNotFoundException dcnfe) {
- log.warn("Fail to push filtering objective since device is not configured. Abort");
- return;
- }
-
- FilteringObjective.Builder filtObjBuilder =
- filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
- ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
- deviceId, port.toLong(), assignedVlan),
- (objective, error) ->
- log.warn("Failed to add filter on {}/{}, vlan {}: {}",
- deviceId, port.toLong(), assignedVlan, error));
- srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
- }
-
- /**
* Adds a port to given multicast group on given device. This involves the
* update of L3 multicast group and multicast routing table entry.
*
@@ -846,7 +1076,7 @@
* @param assignedVlan assigned VLAN ID
*/
private void addPortToDevice(DeviceId deviceId, PortNumber port,
- IpAddress mcastIp, VlanId assignedVlan) {
+ IpAddress mcastIp, VlanId assignedVlan) {
McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
NextObjective newNextObj;
@@ -854,7 +1084,7 @@
// First time someone request this mcast group via this device
portBuilder.add(port);
// New nextObj
- newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+ newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
portBuilder.build(), null).add();
// Store the new port
mcastNextObjStore.put(mcastStoreKey, newNextObj);
@@ -862,7 +1092,7 @@
// This device already serves some subscribers of this mcast group
NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
// Stop if the port is already in the nextobj
- Set<PortNumber> existingPorts = getPorts(nextObj.next());
+ Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
if (existingPorts.contains(port)) {
log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
return;
@@ -870,14 +1100,14 @@
// Let's add the port and reuse the previous one
portBuilder.addAll(existingPorts).add(port);
// Reuse previous nextObj
- newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+ newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
portBuilder.build(), nextObj.id()).addToExisting();
// Store the final next objective and send only the difference to the driver
mcastNextObjStore.put(mcastStoreKey, newNextObj);
// Add just the new port
portBuilder = ImmutableSet.builder();
portBuilder.add(port);
- newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+ newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
portBuilder.build(), nextObj.id()).addToExisting();
}
// Create, store and apply the new nextObj and fwdObj
@@ -887,8 +1117,8 @@
(objective, error) ->
log.warn("Failed to add {} on {}/{}, vlan {}: {}",
mcastIp, deviceId, port.toLong(), assignedVlan, error));
- ForwardingObjective fwdObj =
- fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
+ ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
+ newNextObj.id()).add(context);
srManager.flowObjectiveService.next(deviceId, newNextObj);
srManager.flowObjectiveService.forward(deviceId, fwdObj);
}
@@ -905,7 +1135,7 @@
* @return true if this is the last sink on this device
*/
private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
- IpAddress mcastIp, VlanId assignedVlan) {
+ IpAddress mcastIp, VlanId assignedVlan) {
McastStoreKey mcastStoreKey =
new McastStoreKey(mcastIp, deviceId);
// This device is not serving this multicast group
@@ -915,7 +1145,7 @@
}
NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
- Set<PortNumber> existingPorts = getPorts(nextObj.next());
+ Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
// This port does not serve this multicast group
if (!existingPorts.contains(port)) {
log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
@@ -939,7 +1169,7 @@
(objective, error) ->
log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
mcastIp, deviceId, port.toLong(), assignedVlan, error));
- fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
+ fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
mcastNextObjStore.remove(mcastStoreKey);
} else {
// If this is not the last sink, update flows and groups
@@ -950,13 +1180,13 @@
log.warn("Failed to update {} on {}/{}, vlan {}: {}",
mcastIp, deviceId, port.toLong(), assignedVlan, error));
// Here we store the next objective with the remaining port
- newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+ newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
existingPorts, nextObj.id()).removeFromExisting();
- fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
+ fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
mcastNextObjStore.put(mcastStoreKey, newNextObj);
}
// Let's modify the next objective removing the bucket
- newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+ newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
ImmutableSet.of(port), nextObj.id()).removeFromExisting();
srManager.flowObjectiveService.next(deviceId, newNextObj);
srManager.flowObjectiveService.forward(deviceId, fwdObj);
@@ -971,7 +1201,7 @@
* @param assignedVlan assigned VLAN ID
*/
private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
- VlanId assignedVlan) {
+ VlanId assignedVlan) {
McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
// This device is not serving this multicast group
if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
@@ -988,7 +1218,7 @@
(objective, error) ->
log.warn("Failed to remove {} on {}, vlan {}: {}",
mcastIp, deviceId, assignedVlan, error));
- ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
+ ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
srManager.flowObjectiveService.forward(deviceId, fwdObj);
mcastNextObjStore.remove(mcastStoreKey);
mcastRoleStore.remove(mcastStoreKey);
@@ -1006,9 +1236,9 @@
// and a new filter objective on the destination port
links.forEach(link -> {
addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
- assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
- addFilterToDevice(link.dst().deviceId(), link.dst().port(),
- assignedVlan(null), mcastIp, null);
+ mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
+ mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
+ mcastUtils.assignedVlan(null), mcastIp, null);
});
// Setup mcast role for the transit
@@ -1019,137 +1249,6 @@
}
/**
- * Creates a next objective builder for multicast.
- *
- * @param mcastIp multicast group
- * @param assignedVlan assigned VLAN ID
- * @param outPorts set of output port numbers
- * @return next objective builder
- */
- private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
- VlanId assignedVlan, Set<PortNumber> outPorts, Integer nextId) {
- // If nextId is null allocate a new one
- if (nextId == null) {
- nextId = srManager.flowObjectiveService.allocateNextId();
- }
-
- TrafficSelector metadata =
- DefaultTrafficSelector.builder()
- .matchVlanId(assignedVlan)
- .matchIPDst(mcastIp.toIpPrefix())
- .build();
-
- NextObjective.Builder nextObjBuilder = DefaultNextObjective
- .builder().withId(nextId)
- .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId())
- .withMeta(metadata);
-
- outPorts.forEach(port -> {
- TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
- if (egressVlan().equals(VlanId.NONE)) {
- tBuilder.popVlan();
- }
- tBuilder.setOutput(port);
- nextObjBuilder.addTreatment(tBuilder.build());
- });
-
- return nextObjBuilder;
- }
-
- /**
- * Creates a forwarding objective builder for multicast.
- *
- * @param mcastIp multicast group
- * @param assignedVlan assigned VLAN ID
- * @param nextId next ID of the L3 multicast group
- * @return forwarding objective builder
- */
- private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
- VlanId assignedVlan, int nextId) {
- TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
- IpPrefix mcastPrefix = mcastIp.toIpPrefix();
-
- if (mcastIp.isIp4()) {
- sbuilder.matchEthType(Ethernet.TYPE_IPV4);
- sbuilder.matchIPDst(mcastPrefix);
- } else {
- sbuilder.matchEthType(Ethernet.TYPE_IPV6);
- sbuilder.matchIPv6Dst(mcastPrefix);
- }
-
-
- TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
- metabuilder.matchVlanId(assignedVlan);
-
- ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
- fwdBuilder.withSelector(sbuilder.build())
- .withMeta(metabuilder.build())
- .nextStep(nextId)
- .withFlag(ForwardingObjective.Flag.SPECIFIC)
- .fromApp(srManager.appId())
- .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
- return fwdBuilder;
- }
-
- /**
- * Creates a filtering objective builder for multicast.
- *
- * @param ingressPort ingress port of the multicast stream
- * @param assignedVlan assigned VLAN ID
- * @param routerMac router MAC. This is carried in metadata and used from some switches that
- * need to put unicast entry before multicast entry in TMAC table.
- * @return filtering objective builder
- */
- private FilteringObjective.Builder filterObjBuilder(PortNumber ingressPort,
- VlanId assignedVlan, IpAddress mcastIp, MacAddress routerMac, McastRole mcastRole) {
- FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
- // Let's add the in port matching and the priority
- filtBuilder.withKey(Criteria.matchInPort(ingressPort))
- .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
- // According to the mcast role we match on the proper vlan
- // If the role is null we are on the transit or on the egress
- if (mcastRole == null) {
- filtBuilder.addCondition(Criteria.matchVlanId(egressVlan()));
- } else {
- filtBuilder.addCondition(Criteria.matchVlanId(ingressVlan()));
- }
- // According to the IP type we set the proper match on the mac address
- if (mcastIp.isIp4()) {
- filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
- MacAddress.IPV4_MULTICAST_MASK));
- } else {
- filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
- MacAddress.IPV6_MULTICAST_MASK));
- }
- // We finally build the meta treatment
- TrafficTreatment tt = DefaultTrafficTreatment.builder()
- .pushVlan().setVlanId(assignedVlan)
- .setEthDst(routerMac)
- .build();
- filtBuilder.withMeta(tt);
- // Done, we return a permit filtering objective
- return filtBuilder.permit().fromApp(srManager.appId());
- }
-
- /**
- * Gets output ports information from treatments.
- *
- * @param treatments collection of traffic treatments
- * @return set of output port numbers
- */
- private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
- ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
- treatments.forEach(treatment -> {
- treatment.allInstructions().stream()
- .filter(instr -> instr instanceof OutputInstruction)
- .forEach(instr -> {
- builder.add(((OutputInstruction) instr).port());
- });
- });
- return builder.build();
- }
-
- /**
* Go through all the paths, looking for shared links to be used
* in the final path computation.
*
@@ -1167,13 +1266,13 @@
// Verify the source can still reach all the egresses
for (DeviceId egress : egresses) {
// From the source we cannot reach all the sinks
- // just continue and let's figured out after
+ // just continue and let's figure out after
currentPaths = availablePaths.get(egress);
if (currentPaths.isEmpty()) {
continue;
}
// Get the length of the first one available,
- // update the min length and save the paths
+ // update the min length
length = currentPaths.get(0).links().size();
if (length < minLength) {
minLength = length;
@@ -1189,7 +1288,7 @@
Set<Link> sharedLinks = Sets.newHashSet();
Set<Link> currentSharedLinks;
Set<Link> currentLinks;
- DeviceId deviceToRemove = null;
+ DeviceId egressToRemove = null;
// Let's find out the shared links
while (index < minLength) {
// Initialize the intersection with the paths related to the first egress
@@ -1215,7 +1314,7 @@
// If there are no shared paths exit and record the device to remove
// we have to retry with a subset of sinks
if (currentSharedLinks.isEmpty()) {
- deviceToRemove = egress;
+ egressToRemove = egress;
index = minLength;
break;
}
@@ -1226,8 +1325,8 @@
// If the shared links is empty and there are egress
// let's retry another time with less sinks, we can
// still build optimal subtrees
- if (sharedLinks.isEmpty() && egresses.size() > 1 && deviceToRemove != null) {
- egresses.remove(deviceToRemove);
+ if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
+ egresses.remove(egressToRemove);
sharedLinks = exploreMcastTree(egresses, availablePaths);
}
return sharedLinks;
@@ -1241,18 +1340,20 @@
* @return the computed Mcast tree
*/
private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
- Set<ConnectPoint> sinks) {
+ Set<ConnectPoint> sinks) {
// Get the egress devices, remove source from the egress if present
Set<DeviceId> egresses = sinks.stream()
.map(ConnectPoint::deviceId)
.filter(deviceId -> !deviceId.equals(source))
.collect(Collectors.toSet());
Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
- // Build final tree nad return it as it is
+ // Build final tree and return it as it is
final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
- mcastTree.forEach((egress, paths) ->
- sinks.stream().filter(sink -> sink.deviceId().equals(egress))
- .forEach(sink -> finalTree.put(sink, mcastTree.get(sink.deviceId()))));
+ // We need to put back the source if it was originally present
+ sinks.forEach(sink -> {
+ List<Path> sinkPaths = mcastTree.get(sink.deviceId());
+ finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
+ });
return finalTree;
}
@@ -1356,7 +1457,7 @@
}
// Get the output ports on the next
nextObj = mcastNextObjStore.get(mcastStoreKey).value();
- existingPorts = getPorts(nextObj.next());
+ existingPorts = mcastUtils.getPorts(nextObj.next());
// And the src port on the link
srcPort = hop.src().port();
// the src port is not used as output, exit
@@ -1412,37 +1513,6 @@
}
/**
- * Gets source connect point of given multicast group.
- *
- * @param mcastIp multicast IP
- * @return source connect point or null if not found
- */
- // FIXME To be addressed with multiple sources support
- private ConnectPoint getSource(IpAddress mcastIp) {
- // FIXME we should support different types of routes
- McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
- .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
- .findFirst().orElse(null);
- return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
- .stream()
- .findFirst().orElse(null);
- }
- /**
- * Gets sinks of given multicast group.
- *
- * @param mcastIp multicast IP
- * @return set of sinks or empty set if not found
- */
- private Set<ConnectPoint> getSinks(IpAddress mcastIp) {
- // FIXME we should support different types of routes
- McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
- .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
- .findFirst().orElse(null);
- return mcastRoute == null ?
- Collections.emptySet() : srManager.multicastRouteService.sinks(mcastRoute);
- }
-
- /**
* Gets groups which is affected by the link down event.
*
* @param link link going down
@@ -1453,7 +1523,7 @@
PortNumber port = link.src().port();
return mcastNextObjStore.entrySet().stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
- getPorts(entry.getValue().value().next()).contains(port))
+ mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
.map(Entry::getKey).map(McastStoreKey::mcastIp)
.collect(Collectors.toSet());
}
@@ -1472,59 +1542,6 @@
}
/**
- * Gets ingress VLAN from McastConfig.
- *
- * @return ingress VLAN or VlanId.NONE if not configured
- */
- private VlanId ingressVlan() {
- McastConfig mcastConfig =
- srManager.cfgService.getConfig(coreAppId, McastConfig.class);
- return (mcastConfig != null) ? mcastConfig.ingressVlan() : VlanId.NONE;
- }
-
- /**
- * Gets egress VLAN from McastConfig.
- *
- * @return egress VLAN or VlanId.NONE if not configured
- */
- private VlanId egressVlan() {
- McastConfig mcastConfig =
- srManager.cfgService.getConfig(coreAppId, McastConfig.class);
- return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
- }
-
- /**
- * Gets assigned VLAN according to the value of egress VLAN.
- * If connect point is specified, try to reuse the assigned VLAN on the connect point.
- *
- * @param cp connect point; Can be null if not specified
- * @return assigned VLAN ID
- */
- private VlanId assignedVlan(ConnectPoint cp) {
- // Use the egressVlan if it is tagged
- if (!egressVlan().equals(VlanId.NONE)) {
- return egressVlan();
- }
- // Reuse unicast VLAN if the port has subnet configured
- if (cp != null) {
- VlanId untaggedVlan = srManager.getInternalVlanId(cp);
- return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
- }
- // Use DEFAULT_VLAN if none of the above matches
- return SegmentRoutingManager.INTERNAL_VLAN;
- }
-
- /**
- * Gets assigned VLAN according to the value in the meta.
- *
- * @param nextObjective nextObjective to analyze
- * @return assigned VLAN ID
- */
- private VlanId assignedVlanFromNext(NextObjective nextObjective) {
- return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId();
- }
-
- /**
* Gets the spine-facing port on ingress device of given multicast group.
*
* @param mcastIp multicast IP
@@ -1537,7 +1554,7 @@
if (ingressDevice != null) {
NextObjective nextObj = mcastNextObjStore
.get(new McastStoreKey(mcastIp, ingressDevice)).value();
- Set<PortNumber> ports = getPorts(nextObj.next());
+ Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
// Let's find out all the ingress-transit ports
for (PortNumber port : ports) {
// Spine-facing port should have no subnet and no xconnect
@@ -1570,7 +1587,7 @@
if (versionedNextObj != null) {
NextObjective nextObj = versionedNextObj.value();
// Retrieves all the output ports
- Set<PortNumber> ports = getPorts(nextObj.next());
+ Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
// Tries to find at least one port that is not spine-facing
for (PortNumber port : ports) {
// Spine-facing port should have no subnet and no xconnect
@@ -1586,44 +1603,6 @@
}
/**
- * Removes filtering objective for given device and port.
- *
- * @param deviceId device ID
- * @param port ingress port number
- * @param assignedVlan assigned VLAN ID
- * @param mcastIp multicast IP address
- */
- private void removeFilterToDevice(DeviceId deviceId, PortNumber port,
- VlanId assignedVlan, IpAddress mcastIp, McastRole mcastRole) {
- // Do nothing if the port is configured as suppressed
- ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
- SegmentRoutingAppConfig appConfig = srManager.cfgService
- .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
- if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
- log.info("Ignore suppressed port {}", connectPoint);
- return;
- }
-
- MacAddress routerMac;
- try {
- routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
- } catch (DeviceConfigNotFoundException dcnfe) {
- log.warn("Fail to push filtering objective since device is not configured. Abort");
- return;
- }
-
- FilteringObjective.Builder filtObjBuilder =
- filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
- ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
- deviceId, port.toLong(), assignedVlan),
- (objective, error) ->
- log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
- deviceId, port.toLong(), assignedVlan, error));
- srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
- }
-
- /**
* Updates filtering objective for given device and port.
* It is called in general when the mcast config has been
* changed.
@@ -1647,9 +1626,9 @@
.findFirst().orElse(null);
if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
if (install) {
- addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
+ mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
} else {
- removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
+ mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
}
}
});
@@ -1658,26 +1637,6 @@
}
}
- private boolean isLeader(ConnectPoint source) {
- // Continue only when we have the mastership on the operation
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- // When the source is available we just check the mastership
- if (srManager.deviceService.isAvailable(source.deviceId())) {
- return false;
- }
- // Fallback with Leadership service
- // source id is used a topic
- NodeId leader = srManager.leadershipService.runForLeadership(
- source.deviceId().toString()).leaderNodeId();
- // Verify if this node is the leader
- if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
- return false;
- }
- }
- // Done
- return true;
- }
-
/**
* Performs bucket verification operation for all mcast groups in the devices.
* Firstly, it verifies that mcast is stable before trying verification operation.
@@ -1710,8 +1669,10 @@
Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
// Get source and sinks from Mcast Route Service and warn about errors
- ConnectPoint source = getSource(mcastIp);
- Set<ConnectPoint> sinks = getSinks(mcastIp);
+ ConnectPoint source = mcastUtils.getSource(mcastIp);
+ Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
// Do not proceed if ingress device or source of this group are missing
if (ingressDevice == null || source == null) {
@@ -1750,11 +1711,12 @@
if (mcastNextObjStore.containsKey(currentKey)) {
NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
// Get current ports
- Set<PortNumber> currentPorts = getPorts(currentNext.next());
+ Set<PortNumber> currentPorts = mcastUtils.getPorts(currentNext.next());
// Rebuild the next objective
- currentNext = nextObjBuilder(
+ currentNext = mcastUtils.nextObjBuilder(
mcastIp,
- assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
+ mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
+ source : null),
currentPorts,
currentNext.id()
).verify();
@@ -1807,7 +1769,7 @@
public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
// Get the source
- ConnectPoint source = getSource(mcastIp);
+ ConnectPoint source = mcastUtils.getSource(mcastIp);
// Source cannot be null, we don't know the starting point
if (source != null) {
// Init steps
@@ -1837,7 +1799,7 @@
// Build egress connectpoints
NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
// Get Ports
- Set<PortNumber> outputPorts = getPorts(nextObjective.next());
+ Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
// Build relative cps
ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
new file mode 100644
index 0000000..37f273b
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -0,0 +1,428 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.mcast;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.HostId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.basics.McastConfig;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.segmentrouting.SegmentRoutingManager;
+import org.onosproject.segmentrouting.SegmentRoutingService;
+import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
+import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
+import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
+
+/**
+ * Utility class for Multicast Handler.
+ */
+class McastUtils {
+
+ // Internal reference to the log
+ private final Logger log;
+ // Internal reference to SR Manager
+ private SegmentRoutingManager srManager;
+ // Internal reference to the app id
+ private ApplicationId coreAppId;
+
+ /**
+ * Builds a new McastUtils object.
+ *
+ * @param srManager the SR manager
+ * @param coreAppId the core application id
+ * @param log log reference of the McastHandler
+ */
+ McastUtils(SegmentRoutingManager srManager, ApplicationId coreAppId, Logger log) {
+ this.srManager = srManager;
+ this.coreAppId = coreAppId;
+ this.log = log;
+ }
+
+ /**
+ * Given a connect point define a leader for it.
+ *
+ * @param source the source connect point
+ * @return true if this instance is the leader, otherwise false
+ */
+ boolean isLeader(ConnectPoint source) {
+ // Continue only when we have the mastership on the operation
+ if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+ // When the source is available we just check the mastership
+ if (srManager.deviceService.isAvailable(source.deviceId())) {
+ return false;
+ }
+ // Fallback with Leadership service
+ // source id is used a topic
+ NodeId leader = srManager.leadershipService.runForLeadership(
+ source.deviceId().toString()).leaderNodeId();
+ // Verify if this node is the leader
+ if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
+ return false;
+ }
+ }
+ // Done
+ return true;
+ }
+
+ /**
+ * Get router mac using application config and the connect point.
+ *
+ * @param deviceId the device id
+ * @param port the port number
+ * @return the router mac if the port is configured, otherwise null
+ */
+ private MacAddress getRouterMac(DeviceId deviceId, PortNumber port) {
+ // Do nothing if the port is configured as suppressed
+ ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
+ SegmentRoutingAppConfig appConfig = srManager.cfgService
+ .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
+ if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
+ log.info("Ignore suppressed port {}", connectPoint);
+ return MacAddress.NONE;
+ }
+ // Get the router mac using the device configuration
+ MacAddress routerMac;
+ try {
+ routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
+ } catch (DeviceConfigNotFoundException dcnfe) {
+ log.warn("Fail to push filtering objective since device is not configured. Abort");
+ return MacAddress.NONE;
+ }
+ return routerMac;
+ }
+
+ /**
+ * Adds filtering objective for given device and port.
+ *
+ * @param deviceId device ID
+ * @param port ingress port number
+ * @param assignedVlan assigned VLAN ID
+ * @param mcastIp the group address
+ * @param mcastRole the role of the device
+ */
+ void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan,
+ IpAddress mcastIp, McastRole mcastRole) {
+
+ MacAddress routerMac = getRouterMac(deviceId, port);
+ if (routerMac.equals(MacAddress.NONE)) {
+ return;
+ }
+
+ FilteringObjective.Builder filtObjBuilder = filterObjBuilder(port, assignedVlan, mcastIp,
+ routerMac, mcastRole);
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
+ deviceId, port.toLong(), assignedVlan),
+ (objective, error) ->
+ log.warn("Failed to add filter on {}/{}, vlan {}: {}",
+ deviceId, port.toLong(), assignedVlan, error));
+ srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
+ }
+
+ /**
+ * Removes filtering objective for given device and port.
+ *
+ * @param deviceId device ID
+ * @param port ingress port number
+ * @param assignedVlan assigned VLAN ID
+ * @param mcastIp multicast IP address
+ * @param mcastRole the multicast role of the device
+ */
+ void removeFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan,
+ IpAddress mcastIp, McastRole mcastRole) {
+
+ MacAddress routerMac = getRouterMac(deviceId, port);
+ if (routerMac.equals(MacAddress.NONE)) {
+ return;
+ }
+
+ FilteringObjective.Builder filtObjBuilder =
+ filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
+ deviceId, port.toLong(), assignedVlan),
+ (objective, error) ->
+ log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
+ deviceId, port.toLong(), assignedVlan, error));
+ srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
+ }
+
+ /**
+ * Gets assigned VLAN according to the value in the meta.
+ *
+ * @param nextObjective nextObjective to analyze
+ * @return assigned VLAN ID
+ */
+ VlanId assignedVlanFromNext(NextObjective nextObjective) {
+ return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId();
+ }
+
+ /**
+ * Gets ingress VLAN from McastConfig.
+ *
+ * @return ingress VLAN or VlanId.NONE if not configured
+ */
+ private VlanId ingressVlan() {
+ McastConfig mcastConfig =
+ srManager.cfgService.getConfig(coreAppId, McastConfig.class);
+ return (mcastConfig != null) ? mcastConfig.ingressVlan() : VlanId.NONE;
+ }
+
+ /**
+ * Gets egress VLAN from McastConfig.
+ *
+ * @return egress VLAN or VlanId.NONE if not configured
+ */
+ private VlanId egressVlan() {
+ McastConfig mcastConfig =
+ srManager.cfgService.getConfig(coreAppId, McastConfig.class);
+ return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
+ }
+
+ /**
+ * Gets assigned VLAN according to the value of egress VLAN.
+ * If connect point is specified, try to reuse the assigned VLAN on the connect point.
+ *
+ * @param cp connect point; Can be null if not specified
+ * @return assigned VLAN ID
+ */
+ VlanId assignedVlan(ConnectPoint cp) {
+ // Use the egressVlan if it is tagged
+ if (!egressVlan().equals(VlanId.NONE)) {
+ return egressVlan();
+ }
+ // Reuse unicast VLAN if the port has subnet configured
+ if (cp != null) {
+ VlanId untaggedVlan = srManager.getInternalVlanId(cp);
+ return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
+ }
+ // Use DEFAULT_VLAN if none of the above matches
+ return SegmentRoutingManager.INTERNAL_VLAN;
+ }
+
+ /**
+ * Gets source connect point of given multicast group.
+ *
+ * @param mcastIp multicast IP
+ * @return source connect point or null if not found
+ */
+ // FIXME To be addressed with multiple sources support
+ ConnectPoint getSource(IpAddress mcastIp) {
+ // FIXME we should support different types of routes
+ McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
+ .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
+ .findFirst().orElse(null);
+ return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
+ .stream()
+ .findFirst().orElse(null);
+ }
+
+ /**
+ * Gets sinks of given multicast group.
+ *
+ * @param mcastIp multicast IP
+ * @return map of sinks or empty map if not found
+ */
+ Map<HostId, Set<ConnectPoint>> getSinks(IpAddress mcastIp) {
+ // FIXME we should support different types of routes
+ McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
+ .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
+ .findFirst().orElse(null);
+ return mcastRoute == null ?
+ Collections.emptyMap() :
+ srManager.multicastRouteService.routeData(mcastRoute).sinks();
+ }
+
+ /**
+ * Get sinks affected by this egress device.
+ *
+ * @param egressDevice the egress device
+ * @param mcastIp the mcast ip address
+ * @return the map of the sinks affected
+ */
+ Map<HostId, Set<ConnectPoint>> getAffectedSinks(DeviceId egressDevice,
+ IpAddress mcastIp) {
+ return getSinks(mcastIp).entrySet()
+ .stream()
+ .filter(hostIdSetEntry -> hostIdSetEntry.getValue().stream()
+ .map(ConnectPoint::deviceId)
+ .anyMatch(deviceId -> deviceId.equals(egressDevice))
+ ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ /**
+ * Creates a next objective builder for multicast.
+ *
+ * @param mcastIp multicast group
+ * @param assignedVlan assigned VLAN ID
+ * @param outPorts set of output port numbers
+ * @param nextId the next id
+ * @return next objective builder
+ */
+ NextObjective.Builder nextObjBuilder(IpAddress mcastIp, VlanId assignedVlan,
+ Set<PortNumber> outPorts, Integer nextId) {
+ // If nextId is null allocate a new one
+ if (nextId == null) {
+ nextId = srManager.flowObjectiveService.allocateNextId();
+ }
+ // Build the meta selector with the fwd objective info
+ TrafficSelector metadata =
+ DefaultTrafficSelector.builder()
+ .matchVlanId(assignedVlan)
+ .matchIPDst(mcastIp.toIpPrefix())
+ .build();
+ // Define the nextobjective type
+ NextObjective.Builder nextObjBuilder = DefaultNextObjective
+ .builder().withId(nextId)
+ .withType(NextObjective.Type.BROADCAST)
+ .fromApp(srManager.appId())
+ .withMeta(metadata);
+ // Add the output ports
+ outPorts.forEach(port -> {
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+ if (egressVlan().equals(VlanId.NONE)) {
+ tBuilder.popVlan();
+ }
+ tBuilder.setOutput(port);
+ nextObjBuilder.addTreatment(tBuilder.build());
+ });
+ // Done return the complete builder
+ return nextObjBuilder;
+ }
+
+ /**
+ * Creates a forwarding objective builder for multicast.
+ *
+ * @param mcastIp multicast group
+ * @param assignedVlan assigned VLAN ID
+ * @param nextId next ID of the L3 multicast group
+ * @return forwarding objective builder
+ */
+ ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
+ VlanId assignedVlan, int nextId) {
+ TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
+ // Let's the matching on the group address
+ // TODO SSM support in future
+ if (mcastIp.isIp6()) {
+ sbuilder.matchEthType(Ethernet.TYPE_IPV6);
+ sbuilder.matchIPv6Dst(mcastIp.toIpPrefix());
+ } else {
+ sbuilder.matchEthType(Ethernet.TYPE_IPV4);
+ sbuilder.matchIPDst(mcastIp.toIpPrefix());
+ }
+ // Then build the meta selector
+ TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
+ metabuilder.matchVlanId(assignedVlan);
+ // Finally return the completed builder
+ ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
+ fwdBuilder.withSelector(sbuilder.build())
+ .withMeta(metabuilder.build())
+ .nextStep(nextId)
+ .withFlag(ForwardingObjective.Flag.SPECIFIC)
+ .fromApp(srManager.appId())
+ .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
+ return fwdBuilder;
+ }
+
+ /**
+ * Creates a filtering objective builder for multicast.
+ *
+ * @param ingressPort ingress port of the multicast stream
+ * @param assignedVlan assigned VLAN ID
+ * @param mcastIp the group address
+ * @param routerMac router MAC. This is carried in metadata and used from some switches that
+ * need to put unicast entry before multicast entry in TMAC table.
+ * @param mcastRole the Multicast role
+ * @return filtering objective builder
+ */
+ private FilteringObjective.Builder filterObjBuilder(PortNumber ingressPort, VlanId assignedVlan,
+ IpAddress mcastIp, MacAddress routerMac, McastRole mcastRole) {
+ FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
+ // Let's add the in port matching and the priority
+ filtBuilder.withKey(Criteria.matchInPort(ingressPort))
+ .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
+ // According to the mcast role we match on the proper vlan
+ // If the role is null we are on the transit or on the egress
+ if (mcastRole == null) {
+ filtBuilder.addCondition(Criteria.matchVlanId(egressVlan()));
+ } else {
+ filtBuilder.addCondition(Criteria.matchVlanId(ingressVlan()));
+ }
+ // According to the IP type we set the proper match on the mac address
+ if (mcastIp.isIp4()) {
+ filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
+ MacAddress.IPV4_MULTICAST_MASK));
+ } else {
+ filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
+ MacAddress.IPV6_MULTICAST_MASK));
+ }
+ // We finally build the meta treatment
+ TrafficTreatment tt = DefaultTrafficTreatment.builder()
+ .pushVlan().setVlanId(assignedVlan)
+ .setEthDst(routerMac)
+ .build();
+ filtBuilder.withMeta(tt);
+ // Done, we return a permit filtering objective
+ return filtBuilder.permit().fromApp(srManager.appId());
+ }
+
+ /**
+ * Gets output ports information from treatments.
+ *
+ * @param treatments collection of traffic treatments
+ * @return set of output port numbers
+ */
+ Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
+ ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
+ treatments.forEach(treatment -> treatment.allInstructions().stream()
+ .filter(instr -> instr instanceof Instructions.OutputInstruction)
+ .forEach(instr -> builder.add(((Instructions.OutputInstruction) instr).port())));
+ return builder.build();
+ }
+}