Optimize Multicast sinks processing
Change-Id: I5e4c48076edbadc38943d9ee05359341ef55aaca
(cherry picked from commit a17ad9e3f8fe1d2925e6d9fd84d3d96ef79cc78d)
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 4c83a38..2c82cf5 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -222,7 +222,7 @@
case SINKS_REMOVED:
// FIXME To be addressed with multiple sources support
processSinksRemovedInternal(source, mcastIp,
- mcastUpdate.sinks(), prevSinks);
+ mcastUpdate.sinks(), mcastPrevUpdate.sinks());
break;
default:
break;
@@ -462,32 +462,24 @@
* @param source the source connect point
* @param mcastIp the ip address of the group
* @param newSinks the new sinks to be processed
- * @param allPrevSinks all previous sinks
+ * @param prevSinks the previous sinks
*/
private void processSinksRemovedInternal(ConnectPoint source, IpAddress mcastIp,
Map<HostId, Set<ConnectPoint>> newSinks,
- Set<ConnectPoint> allPrevSinks) {
+ Map<HostId, Set<ConnectPoint>> prevSinks) {
lastMcastChange = Instant.now();
mcastLock();
- // Let's instantiate the sinks to be removed
- Set<ConnectPoint> sinksToBeRemoved = Sets.newHashSet();
try {
+ // Remove the previous ones
+ Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
+ newSinks);
+ sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp));
// Recover the dual-homed sinks
- Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks);
+ Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
+ prevSinks);
sinksToBeRecovered.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
- // Get the only sinks to be processed (old ones)
- Set<ConnectPoint> allNewSinks = newSinks.values()
- .stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- // Remove the previous one
- sinksToBeRemoved.addAll(Sets.difference(allPrevSinks, allNewSinks));
} finally {
mcastUnlock();
- // Let's schedule the removal of the previous sinks
- executorService.schedule(
- () -> sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp)),
- HOST_MOVED_DELAY_MS, TimeUnit.MILLISECONDS);
}
}
@@ -952,26 +944,65 @@
}
/**
+ * Process all the sinks related to a mcast group and return
+ * the ones to be removed.
+ *
+ * @param mcastIp the group address
+ * @param prevsinks the previous sinks to be evaluated
+ * @param newSinks the new sinks to be evaluted
+ * @return the set of the sinks to be removed
+ */
+ private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
+ Map<HostId, Set<ConnectPoint>> prevsinks,
+ Map<HostId, Set<ConnectPoint>> newSinks) {
+ // Iterate over the sinks in order to build the set
+ // of the connect points to be removed from this group
+ final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+ prevsinks.forEach(((hostId, connectPoints) -> {
+ // We have to check with the existing flows
+ ConnectPoint sinkToBeProcessed = connectPoints.stream()
+ .filter(connectPoint -> isSink(mcastIp, connectPoint))
+ .findFirst().orElse(null);
+ if (sinkToBeProcessed != null) {
+ // If the host has been removed or location has been removed
+ if (!newSinks.containsKey(hostId) ||
+ !newSinks.get(hostId).contains(sinkToBeProcessed)) {
+ sinksToBeProcessed.add(sinkToBeProcessed);
+ }
+ }
+ }));
+ // We have done, return the set
+ return sinksToBeProcessed;
+ }
+
+ /**
* Process new locations and return the set of sinks to be added
* in the context of the recovery.
*
- * @param sinks the remaining locations
+ * @param newSinks the remaining sinks
+ * @param prevSinks the previous sinks
* @return the set of the sinks to be processed
*/
private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
- Map<HostId, Set<ConnectPoint>> sinks) {
+ Map<HostId, Set<ConnectPoint>> newSinks,
+ Map<HostId, Set<ConnectPoint>> prevSinks) {
// Iterate over the sinks in order to build the set
// of the connect points to be served by this group
final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
- sinks.forEach((hostId, connectPoints) -> {
+ newSinks.forEach((hostId, connectPoints) -> {
// If it has more than 1 locations
if (connectPoints.size() > 1 || connectPoints.size() == 0) {
log.debug("Skip {} since sink {} has {} locations",
mcastIp, hostId, connectPoints.size());
return;
}
- sinksToBeProcessed.add(connectPoints.stream()
- .findFirst().orElseGet(null));
+ // If previously it had two locations, we need to recover it
+ // Filter out if the remaining location is already served
+ if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
+ sinksToBeProcessed.add(connectPoints.stream()
+ .filter(connectPoint -> !isSink(mcastIp, connectPoint))
+ .findFirst().orElseGet(null));
+ }
});
return sinksToBeProcessed;
}
@@ -1005,16 +1036,7 @@
}
// We prefer to reuse existing flows
ConnectPoint sinkToBeProcessed = connectPoints.stream()
- .filter(connectPoint -> {
- // Let's check if we are already serving that location
- McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
- if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
- return false;
- }
- // Get next and check with the port
- NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
- return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
- })
+ .filter(connectPoint -> isSink(mcastIp, connectPoint))
.findFirst().orElse(null);
if (sinkToBeProcessed != null) {
sinksToBeProcessed.add(sinkToBeProcessed);
@@ -1023,7 +1045,7 @@
// Otherwise we prefer to reuse existing egresses
Set<DeviceId> egresses = getDevice(mcastIp, EGRESS);
sinkToBeProcessed = connectPoints.stream()
- .filter(egresses::contains)
+ .filter(connectPoint -> egresses.contains(connectPoint.deviceId()))
.findFirst().orElse(null);
if (sinkToBeProcessed != null) {
sinksToBeProcessed.add(sinkToBeProcessed);
@@ -1603,6 +1625,24 @@
}
/**
+ * Verify if a given connect point is sink for this group.
+ *
+ * @param mcastIp group address
+ * @param connectPoint connect point to be verified
+ * @return true if the connect point is sink of the group
+ */
+ private boolean isSink(IpAddress mcastIp, ConnectPoint connectPoint) {
+ // Let's check if we are already serving that location
+ McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
+ if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
+ return false;
+ }
+ // Get next and check with the port
+ NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
+ return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
+ }
+
+ /**
* Updates filtering objective for given device and port.
* It is called in general when the mcast config has been
* changed.