[CORD-2838][CORD-2833] Revisit McastHandler and handle shortest paths with pair links
Includes also a refactoring of the path computation
Change-Id: Iff63780a3bb3e895e55c52211290c19d993e1905
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index a791ecb..968ad3b 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -33,6 +33,9 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.mcast.api.McastEvent;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.McastRouteUpdate;
import org.onosproject.net.config.basics.McastConfig;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
@@ -54,11 +57,10 @@
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.ObjectiveContext;
-import org.onosproject.net.mcast.McastEvent;
-import org.onosproject.net.mcast.McastRoute;
-import org.onosproject.net.mcast.McastRouteInfo;
+import org.onosproject.net.topology.LinkWeigher;
import org.onosproject.net.topology.Topology;
import org.onosproject.net.topology.TopologyService;
+import org.onosproject.segmentrouting.SRLinkWeigher;
import org.onosproject.segmentrouting.SegmentRoutingManager;
import org.onosproject.segmentrouting.SegmentRoutingService;
import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
@@ -78,6 +80,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
@@ -88,10 +91,9 @@
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
+
+import static org.onosproject.mcast.api.McastEvent.Type.*;
import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
-import static org.onosproject.net.mcast.McastEvent.Type.ROUTE_REMOVED;
-import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_ADDED;
-import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_UPDATED;
import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
@@ -139,63 +141,116 @@
private void enqueueMcastEvent(McastEvent mcastEvent) {
log.debug("Enqueue mcastEvent {}", mcastEvent);
- final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
+ final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
+ final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
+ final IpAddress group = prevUpdate.route().group();
// Let's create the keys of the cache
ImmutableSet.Builder<ConnectPoint> sinksBuilder = ImmutableSet.builder();
+ Set<ConnectPoint> sinks;
// For this event we will have a set of sinks
- if (mcastEvent.type() == SOURCE_ADDED ||
- mcastEvent.type() == SOURCE_UPDATED ||
- mcastEvent.type() == ROUTE_REMOVED) {
- // Add all the sinks
- sinksBuilder.addAll(mcastRouteInfo.sinks());
+ if (mcastEvent.type() == SOURCES_ADDED ||
+ mcastEvent.type() == SOURCES_REMOVED) {
+ // FIXME To be addressed with multiple sources support
+ sinks = mcastRouteUpdate.sinks()
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
} else {
- // We have just one sink in this case
- ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
- // It is always true, unless something of bad happened
- // in the mcast route store
- if (sink != null) {
- sinksBuilder.add(sink);
+ Set<ConnectPoint> prevSinks = prevUpdate.sinks()
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ if (mcastEvent.type() == ROUTE_REMOVED) {
+ // Get the old sinks, since current subject is null
+ sinks = prevSinks;
+ } else {
+ // Get new sinks
+ Set<ConnectPoint> newsinks = mcastRouteUpdate.sinks()
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ // If it is a SINKS_ADDED event
+ if (mcastEvent.type() == SINKS_ADDED) {
+ // Let's do the difference between current and prev subjects
+ sinks = Sets.difference(newsinks, prevSinks);
+ } else {
+ // Let's do the difference between prev and current subjects
+ sinks = Sets.difference(prevSinks, newsinks);
+ }
}
}
+ // Add all the sinks
+ sinksBuilder.addAll(sinks);
// Push the elements in the cache
sinksBuilder.build().forEach(sink -> {
- McastCacheKey cacheKey = new McastCacheKey(mcastRouteInfo.route().group(),
- sink);
+ McastCacheKey cacheKey = new McastCacheKey(group, sink);
mcastEventCache.put(cacheKey, mcastEvent);
});
}
private void dequeueMcastEvent(McastEvent mcastEvent) {
log.debug("Dequeue mcastEvent {}", mcastEvent);
- final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
+ final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
+ final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
// Get source, mcast group
- ConnectPoint source = mcastRouteInfo.source().orElse(null);
- IpAddress mcastIp = mcastRouteInfo.route().group();
+ // FIXME To be addressed with multiple sources support
+ ConnectPoint prevSource = prevUpdate.sources()
+ .stream()
+ .findFirst()
+ .orElse(null);
+ IpAddress mcastIp = prevUpdate.route().group();
+ Set<ConnectPoint> prevSinks = prevUpdate.sinks()
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ Set<ConnectPoint> newSinks;
+ // Sinks to handled by SINKS_ADDED and SINKS_REMOVED procedures
+ Set<ConnectPoint> sinks;
// According to the event type let's call the proper method
switch (mcastEvent.type()) {
- case SOURCE_ADDED:
- // Get all the sinks and process
- Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
- sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
+ case SOURCES_ADDED:
+ // FIXME To be addressed with multiple sources support
+ // Get all the sinks
+ //Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
+ // Compute the Mcast tree
+ //Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
+ // Process the given sinks using the pre-computed paths
+ //mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
break;
- case SOURCE_UPDATED:
+ case SOURCES_REMOVED:
+ // FIXME To be addressed with multiple sources support
// Get old source
- ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
+ //ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
// Just the first cached element will be processed
- processSourceUpdatedInternal(mcastIp, source, oldSource);
+ //processSourceUpdatedInternal(mcastIp, source, oldSource);
break;
case ROUTE_REMOVED:
// Process the route removed, just the first cached element will be processed
- processRouteRemovedInternal(source, mcastIp);
+ processRouteRemovedInternal(prevSource, mcastIp);
break;
- case SINK_ADDED:
- // Get the only sink and process
- ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
- processSinkAddedInternal(source, sink, mcastIp);
+ case SINKS_ADDED:
+ // Get the only sinks to be processed (new ones)
+ newSinks = mcastRouteUpdate.sinks()
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ sinks = Sets.difference(newSinks, prevSinks);
+ sinks.forEach(sink -> processSinkAddedInternal(prevSource, sink, mcastIp, null));
break;
- case SINK_REMOVED:
- sink = mcastRouteInfo.sink().orElse(null);
- processSinkRemovedInternal(source, sink, mcastIp);
+ case SINKS_REMOVED:
+ // Get the only sinks to be processed (old ones)
+ newSinks = mcastRouteUpdate.sinks()
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ sinks = Sets.difference(prevSinks, newSinks);
+ sinks.forEach(sink -> processSinkRemovedInternal(prevSource, sink, mcastIp));
break;
default:
break;
@@ -283,11 +338,26 @@
*/
public void init() {
srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
- ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
- Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
- sinks.forEach(sink -> {
- processSinkAddedInternal(source, sink, mcastRoute.group());
- });
+ // FIXME To be addressed with multiple sources support
+ ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
+ .stream()
+ .findFirst()
+ .orElse(null);
+ Set<ConnectPoint> sinks = srManager.multicastRouteService.sinks(mcastRoute);
+ // Filter out all the working sinks, we do not want to move them
+ sinks = sinks.stream()
+ .filter(sink -> {
+ McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
+ Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
+ return verMcastNext == null ||
+ !getPorts(verMcastNext.value().next()).contains(sink.port());
+ })
+ .collect(Collectors.toSet());
+ // Compute the Mcast tree
+ Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
+ // Process the given sinks using the pre-computed paths
+ mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
+ mcastRoute.group(), paths));
});
}
@@ -306,12 +376,6 @@
*/
public void processMcastEvent(McastEvent event) {
log.info("process {}", event);
- // Verify if it is a complete event
- McastRouteInfo mcastRouteInfo = event.subject();
- if (!mcastRouteInfo.isComplete()) {
- log.info("Incompleted McastRouteInfo. Abort {}", event.type());
- return;
- }
// Just enqueue for now
enqueueMcastEvent(event);
}
@@ -451,7 +515,8 @@
}
// If this is the last sink on the device, also update upstream
- Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
+ Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
+ mcastIp, null);
if (mcastPath.isPresent()) {
List<Link> links = Lists.newArrayList(mcastPath.get().links());
Collections.reverse(links);
@@ -482,7 +547,7 @@
* @param mcastIp multicast group IP address
*/
private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
- IpAddress mcastIp) {
+ IpAddress mcastIp, List<Path> allPaths) {
lastMcastChange = Instant.now();
mcastLock();
try {
@@ -511,7 +576,8 @@
}
// Find a path. If present, create/update groups and flows for each hop
- Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
+ Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
+ mcastIp, allPaths);
if (mcastPath.isPresent()) {
List<Link> links = mcastPath.get().links();
@@ -581,7 +647,7 @@
// Continue only when this instance is the master of source device
if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
log.debug("Skip {} due to lack of mastership of the source device {}",
- source.deviceId());
+ mcastIp, source.deviceId());
return;
}
@@ -592,9 +658,15 @@
// Remove transit-facing ports on the ingress device
removeIngressTransitPorts(mcastIp, ingressDevice, source);
+ // Compute mcast tree for the the egress devices
+ Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
+
// Construct a new path for each egress device
- egressDevices.forEach(egressDevice -> {
- Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+ mcastTree.forEach((egressDevice, paths) -> {
+ // We try to enforce the sinks path on the mcast tree
+ Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+ mcastIp, paths);
+ // If a path is present, let's install it
if (mcastPath.isPresent()) {
installPath(mcastIp, source, mcastPath.get());
} else {
@@ -680,9 +752,14 @@
return;
}
}
+
+ // Compute mcast tree for the the egress devices
+ Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
+
// Construct a new path for each egress device
- egressDevices.forEach(egressDevice -> {
- Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+ mcastTree.forEach((egressDevice, paths) -> {
+ Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+ mcastIp, null);
// If there is a new path
if (mcastPath.isPresent()) {
// Let's install the new mcast path for this egress
@@ -1072,31 +1149,157 @@
return builder.build();
}
- // Utility method to verify is a link is a pair-link
- private boolean isPairLink(Link link) {
- // Take src id, src port, dst id and dst port
- final DeviceId srcId = link.src().deviceId();
- final PortNumber srcPort = link.src().port();
- final DeviceId dstId = link.dst().deviceId();
- final PortNumber dstPort = link.dst().port();
- // init as true
- boolean isPairLink = true;
- try {
- // If one of this condition is not true; it is not a pair link
- if (!(srManager.deviceConfiguration().isEdgeDevice(srcId) &&
- srManager.deviceConfiguration().isEdgeDevice(dstId) &&
- srManager.deviceConfiguration().getPairDeviceId(srcId).equals(dstId) &&
- srManager.deviceConfiguration().getPairLocalPort(srcId).equals(srcPort) &&
- srManager.deviceConfiguration().getPairLocalPort(dstId).equals(dstPort))) {
- isPairLink = false;
- }
- } catch (DeviceConfigNotFoundException e) {
- // Configuration not provided
- log.warn("Could not check if the link {} is pairlink "
- + "config not yet provided", link);
- isPairLink = false;
+ /**
+ * Go through all the paths, looking for shared links to be used
+ * in the final path computation.
+ *
+ * @param egresses egress devices
+ * @param availablePaths all the available paths towards the egress
+ * @return shared links between egress devices
+ */
+ private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
+ Map<DeviceId, List<Path>> availablePaths) {
+ // Length of the shortest path
+ int minLength = Integer.MAX_VALUE;
+ int length;
+ // Current paths
+ List<Path> currentPaths;
+ // Verify the source can still reach all the egresses
+ for (DeviceId egress : egresses) {
+ // From the source we cannot reach all the sinks
+ // just continue and let's figured out after
+ currentPaths = availablePaths.get(egress);
+ if (currentPaths.isEmpty()) {
+ continue;
+ }
+ // Get the length of the first one available,
+ // update the min length and save the paths
+ length = currentPaths.get(0).links().size();
+ if (length < minLength) {
+ minLength = length;
+ }
}
- return isPairLink;
+ // If there are no paths
+ if (minLength == Integer.MAX_VALUE) {
+ return Collections.emptySet();
+ }
+ // Iterate looking for shared links
+ int index = 0;
+ // Define the sets for the intersection
+ Set<Link> sharedLinks = Sets.newHashSet();
+ Set<Link> currentSharedLinks;
+ Set<Link> currentLinks;
+ DeviceId deviceToRemove = null;
+ // Let's find out the shared links
+ while (index < minLength) {
+ // Initialize the intersection with the paths related to the first egress
+ currentPaths = availablePaths.get(
+ egresses.stream()
+ .findFirst()
+ .orElse(null)
+ );
+ currentSharedLinks = Sets.newHashSet();
+ // Iterate over the paths and take the "index" links
+ for (Path path : currentPaths) {
+ currentSharedLinks.add(path.links().get(index));
+ }
+ // Iterate over the remaining egress
+ for (DeviceId egress : egresses) {
+ // Iterate over the paths and take the "index" links
+ currentLinks = Sets.newHashSet();
+ for (Path path : availablePaths.get(egress)) {
+ currentLinks.add(path.links().get(index));
+ }
+ // Do intersection
+ currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
+ // If there are no shared paths exit and record the device to remove
+ // we have to retry with a subset of sinks
+ if (currentSharedLinks.isEmpty()) {
+ deviceToRemove = egress;
+ index = minLength;
+ break;
+ }
+ }
+ sharedLinks.addAll(currentSharedLinks);
+ index++;
+ }
+ // If the shared links is empty and there are egress
+ // let's retry another time with less sinks, we can
+ // still build optimal subtrees
+ if (sharedLinks.isEmpty() && egresses.size() > 1 && deviceToRemove != null) {
+ egresses.remove(deviceToRemove);
+ sharedLinks = exploreMcastTree(egresses, availablePaths);
+ }
+ return sharedLinks;
+ }
+
+ /**
+ * Build Mcast tree having as root the given source and as leaves the given egress points.
+ *
+ * @param source source of the tree
+ * @param sinks leaves of the tree
+ * @return the computed Mcast tree
+ */
+ private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
+ Set<ConnectPoint> sinks) {
+ // Get the egress devices, remove source from the egress if present
+ Set<DeviceId> egresses = sinks.stream()
+ .map(ConnectPoint::deviceId)
+ .filter(deviceId -> !deviceId.equals(source))
+ .collect(Collectors.toSet());
+ Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
+ // Build final tree nad return it as it is
+ final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
+ mcastTree.forEach((egress, paths) ->
+ sinks.stream().filter(sink -> sink.deviceId().equals(egress))
+ .forEach(sink -> finalTree.put(sink, mcastTree.get(sink.deviceId()))));
+ return finalTree;
+ }
+
+ /**
+ * Build Mcast tree having as root the given source and as leaves the given egress.
+ *
+ * @param source source of the tree
+ * @param egresses leaves of the tree
+ * @return the computed Mcast tree
+ */
+ private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
+ Set<DeviceId> egresses) {
+ // Pre-compute all the paths
+ Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
+ // No links to enforce
+ egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
+ Collections.emptySet())));
+ // Explore the topology looking for shared links amongst the egresses
+ Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
+ // Remove all the paths from the previous computation
+ availablePaths.clear();
+ // Build the final paths enforcing the shared links between egress devices
+ egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
+ linksToEnforce)));
+ return availablePaths;
+ }
+
+ /**
+ * Gets path from src to dst computed using the custom link weigher.
+ *
+ * @param src source device ID
+ * @param dst destination device ID
+ * @return list of paths from src to dst
+ */
+ private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
+ // Takes a snapshot of the topology
+ final Topology currentTopology = topologyService.currentTopology();
+ // Build a specific link weigher for this path computation
+ final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
+ // We will use our custom link weigher for our path
+ // computations and build the list of valid paths
+ List<Path> allPaths = Lists.newArrayList(
+ topologyService.getPaths(currentTopology, src, dst, linkWeigher)
+ );
+ // If there are no valid paths, just exit
+ log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
+ return allPaths;
}
/**
@@ -1107,18 +1310,17 @@
* @param src source device ID
* @param dst destination device ID
* @param mcastIp multicast group
+ * @param allPaths paths list
* @return an optional path from src to dst
*/
- private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
- // Takes a snapshot of the topology
- final Topology currentTopology = topologyService.currentTopology();
- List<Path> allPaths = Lists.newArrayList(
- topologyService.getPaths(currentTopology, src, dst)
- );
- // Create list of valid paths
- allPaths.removeIf(path -> path.links().stream().anyMatch(this::isPairLink));
- // If there are no valid paths, just exit
- log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
+ private Optional<Path> getPath(DeviceId src, DeviceId dst,
+ IpAddress mcastIp, List<Path> allPaths) {
+ // Firstly we get all the valid paths, if the supplied are null
+ if (allPaths == null) {
+ allPaths = getPaths(src, dst, Collections.emptySet());
+ }
+
+ // If there are no paths just exit
if (allPaths.isEmpty()) {
return Optional.empty();
}
@@ -1205,7 +1407,7 @@
return mcastRoleStore.entrySet().stream()
.filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
entry.getValue().value() == role)
- .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
+ .map(Entry::getKey).map(McastStoreKey::deviceId)
.collect(Collectors.toSet());
}
@@ -1215,11 +1417,15 @@
* @param mcastIp multicast IP
* @return source connect point or null if not found
*/
+ // FIXME To be addressed with multiple sources support
private ConnectPoint getSource(IpAddress mcastIp) {
- return srManager.multicastRouteService.getRoutes().stream()
- .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
- .map(mcastRoute -> srManager.multicastRouteService.fetchSource(mcastRoute))
- .findAny().orElse(null);
+ // FIXME we should support different types of routes
+ McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
+ .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
+ .findFirst().orElse(null);
+ return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
+ .stream()
+ .findFirst().orElse(null);
}
/**
* Gets sinks of given multicast group.
@@ -1228,10 +1434,12 @@
* @return set of sinks or empty set if not found
*/
private Set<ConnectPoint> getSinks(IpAddress mcastIp) {
- return srManager.multicastRouteService.getRoutes().stream()
- .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
- .map(mcastRoute -> srManager.multicastRouteService.fetchSinks(mcastRoute))
- .findAny().orElse(Collections.emptySet());
+ // FIXME we should support different types of routes
+ McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
+ .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
+ .findFirst().orElse(null);
+ return mcastRoute == null ?
+ Collections.emptySet() : srManager.multicastRouteService.sinks(mcastRoute);
}
/**
@@ -1246,7 +1454,7 @@
return mcastNextObjStore.entrySet().stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
getPorts(entry.getValue().value().next()).contains(port))
- .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
+ .map(Entry::getKey).map(McastStoreKey::mcastIp)
.collect(Collectors.toSet());
}
@@ -1259,7 +1467,7 @@
private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
return mcastNextObjStore.entrySet().stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
- .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
+ .map(Entry::getKey).map(McastStoreKey::mcastIp)
.collect(Collectors.toSet());
}
@@ -1433,7 +1641,10 @@
// Iterates over the route and updates properly the filtering objective
// on the source device.
srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
- ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
+ // FIXME To be addressed with multiple sources support
+ ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
+ .stream()
+ .findFirst().orElse(null);
if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
if (install) {
addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
@@ -1570,12 +1781,12 @@
if (mcastIp != null) {
return mcastNextObjStore.entrySet().stream()
.filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
- .collect(Collectors.toMap(Map.Entry::getKey,
+ .collect(Collectors.toMap(Entry::getKey,
entry -> entry.getValue().value().id()));
}
// Otherwise take all the groups
return mcastNextObjStore.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
+ .collect(Collectors.toMap(Entry::getKey,
entry -> entry.getValue().value().id()));
}
@@ -1584,12 +1795,12 @@
if (mcastIp != null) {
return mcastRoleStore.entrySet().stream()
.filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
- .collect(Collectors.toMap(Map.Entry::getKey,
+ .collect(Collectors.toMap(Entry::getKey,
entry -> entry.getValue().value()));
}
// Otherwise take all the groups
return mcastRoleStore.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
+ .collect(Collectors.toMap(Entry::getKey,
entry -> entry.getValue().value()));
}