Removing flow modification during sink/source addition/removal.
Changes:
- fwd objectives are pushed only the first time for a given device
- fwd objectives are removed only if the dev is no more serving the group
- mac/vlan filt objectives are pushed only the first time for a given device
- vlan filt objectives are pushed only the first time for a given cpoint
- filt objectives are purged upon device, link and edge port failures
- adds sr-filt-mcast to dump the installed filt obj
- updates onos-diags
Change-Id: I802ebec7e160486fea9ad4bf53aca36a2b1c7146
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index ae6902d..52618f4 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -38,11 +38,13 @@
import org.onosproject.mcast.api.McastRoute;
import org.onosproject.mcast.api.McastRouteData;
import org.onosproject.mcast.api.McastRouteUpdate;
+import org.onosproject.net.Device;
import org.onosproject.net.HostId;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.Path;
+import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flowobjective.DefaultObjectiveContext;
import org.onosproject.net.flowobjective.ForwardingObjective;
@@ -55,6 +57,7 @@
import org.onosproject.segmentrouting.SegmentRoutingManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
@@ -64,6 +67,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -100,6 +104,7 @@
private final McastUtils mcastUtils;
private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
+ private final DistributedSet<McastFilteringObjStoreKey> mcastFilteringObjStore;
// Wait time for the cache
private static final int WAIT_TIME_MS = 1000;
@@ -273,6 +278,15 @@
.withName("onos-mcast-role-store")
.withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
.build();
+ mcastKryo = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
+ .register(new McastFilteringObjStoreSerializer(), McastFilteringObjStoreKey.class);
+ mcastFilteringObjStore = srManager.storageService
+ .<McastFilteringObjStoreKey>setBuilder()
+ .withName("onos-mcast-filtering-store")
+ .withSerializer(Serializer.using(mcastKryo.build("McastHandler-FilteringObj")))
+ .build()
+ .asDistributedSet();
mcastUtils = new McastUtils(srManager, coreAppId, log);
// Init the executor service, the buckets corrector and schedule the clean up
executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
@@ -334,6 +348,7 @@
executorService.shutdown();
mcastNextObjStore.destroy();
mcastRoleStore.destroy();
+ mcastFilteringObjStore.destroy();
mcastUtils.terminate();
log.info("Terminated");
}
@@ -414,6 +429,7 @@
log.debug("Skip {} due to empty sources to be removed", mcastIp);
return;
}
+ // Let's heal the trees
Set<Link> remainingLinks = Sets.newHashSet();
Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
@@ -713,8 +729,9 @@
try {
log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
// Process the ingress device
- mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
- mcastUtils.assignedVlan(source), mcastIp, INGRESS);
+ McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
+ mcastUtils.assignedVlan(source), mcastIp.isIp4());
+ addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
if (source.deviceId().equals(sink.deviceId())) {
if (source.port().equals(sink.port())) {
log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
@@ -736,14 +753,18 @@
addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
mcastUtils.assignedVlan(link.src().deviceId()
.equals(source.deviceId()) ? source : null));
- mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
- mcastUtils.assignedVlan(null), mcastIp, null);
+ McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
+ mcastUtils.assignedVlan(null), mcastIp.isIp4());
+ addFilterToDevice(filteringKey, mcastIp, null);
});
// Setup mcast role for the transit
links.stream()
.filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
- .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
- source), TRANSIT));
+ .forEach(link -> {
+ log.trace("Transit links {}", link);
+ mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
+ source), TRANSIT);
+ });
// Process the egress device
addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
// Setup mcast role for egress
@@ -757,6 +778,20 @@
}
/**
+ * Processes PORT_UPDATED event.
+ *
+ * @param affectedDevice Affected device
+ * @param affectedPort Affected port
+ */
+ public void processPortUpdate(Device affectedDevice, Port affectedPort) {
+ // Clean the filtering obj store. Edge port case.
+ ConnectPoint portDown = new ConnectPoint(affectedDevice.id(), affectedPort.number());
+ if (!affectedPort.isEnabled()) {
+ updateFilterObjStoreByPort(portDown);
+ }
+ }
+
+ /**
* Processes the LINK_DOWN event.
*
* @param linkDown Link that is going down
@@ -765,7 +800,7 @@
lastMcastChange = Instant.now();
mcastLock();
try {
- // Get groups affected by the link down event
+ // Get mcast groups affected by the link going down
Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
affectedGroups.forEach(mcastIp -> {
@@ -789,6 +824,7 @@
// Get the mcast groups affected by the device going down
Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
+ updateFilterObjStoreByDevice(deviceDown);
affectedGroups.forEach(mcastIp -> {
log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
recoverFailure(mcastIp, deviceDown);
@@ -1193,6 +1229,23 @@
}
// Store the new port
mcastNextObjStore.put(mcastStoreKey, newNextObj);
+ // Create, store and apply the new nextObj and fwdObj
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
+ mcastIp, deviceId, port.toLong(), assignedVlan),
+ (objective, error) -> {
+ log.warn("Failed to add {} on {}/{}, vlan {}: {}",
+ mcastIp, deviceId, port.toLong(), assignedVlan, error);
+ srManager.invalidateNextObj(objective.id());
+ });
+ ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
+ newNextObj.id()).add(context);
+ if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+ log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
+ } else {
+ srManager.flowObjectiveService.next(deviceId, newNextObj);
+ srManager.flowObjectiveService.forward(deviceId, fwdObj);
+ }
} else {
// This device already serves some subscribers of this mcast group
NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
@@ -1214,23 +1267,13 @@
portBuilder.add(port);
newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
portBuilder.build(), nextObj.id()).addToExisting();
- }
- // Create, store and apply the new nextObj and fwdObj
- ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
- mcastIp, deviceId, port.toLong(), assignedVlan),
- (objective, error) -> {
- log.warn("Failed to add {} on {}/{}, vlan {}: {}",
- mcastIp, deviceId, port.toLong(), assignedVlan, error);
- srManager.invalidateNextObj(objective.id());
- });
- ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
- newNextObj.id()).add(context);
- if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
- log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
- } else {
- srManager.flowObjectiveService.next(deviceId, newNextObj);
- srManager.flowObjectiveService.forward(deviceId, fwdObj);
+ if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+ log.debug("skip next flowobjective update for device: {}", deviceId);
+ } else {
+ // no need to update the flow here since we have updated the nextobjective/group
+ // the existing flow will keep pointing to the updated nextobj
+ srManager.flowObjectiveService.next(deviceId, newNextObj);
+ }
}
}
@@ -1276,31 +1319,27 @@
(objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
mcastIp, deviceId, port.toLong(), assignedVlan, error));
fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
+ if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+ log.debug("skip forward flowobjective removal for device: {}", deviceId);
+ } else {
+ srManager.flowObjectiveService.forward(deviceId, fwdObj);
+ }
mcastNextObjStore.remove(mcastStoreKey);
} else {
- // If this is not the last sink, update flows and groups
- context = new DefaultObjectiveContext(
- (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
- mcastIp, deviceId, port.toLong(), assignedVlan),
- (objective, error) -> {
- log.warn("Failed to update {} on {}/{}, vlan {}: {}",
- mcastIp, deviceId, port.toLong(), assignedVlan, error);
- srManager.invalidateNextObj(objective.id());
- });
// Here we store the next objective with the remaining port
newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
existingPorts, nextObj.id()).removeFromExisting();
- fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
mcastNextObjStore.put(mcastStoreKey, newNextObj);
- }
- // Let's modify the next objective removing the bucket
- newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
+ // Let's modify the next objective removing the bucket
+ newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
ImmutableSet.of(port), nextObj.id()).removeFromExisting();
- if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
- log.debug("skip forward and next flow objectives from adding flows on device: {}", deviceId);
- } else {
- srManager.flowObjectiveService.next(deviceId, newNextObj);
- srManager.flowObjectiveService.forward(deviceId, fwdObj);
+ if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+ log.debug("skip next flowobjective update for device: {}", deviceId);
+ } else {
+ // no need to update the flow here since we have updated the next objective + group
+ // the existing flow will keep pointing to the updated nextobj
+ srManager.flowObjectiveService.next(deviceId, newNextObj);
+ }
}
return existingPorts.isEmpty();
}
@@ -1349,8 +1388,9 @@
links.forEach(link -> {
addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
- mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
- mcastUtils.assignedVlan(null), mcastIp, null);
+ McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(link.dst(),
+ mcastUtils.assignedVlan(null), mcastIp.isIp4());
+ addFilterToDevice(mcastFilterObjStoreKey, mcastIp, null);
});
// Setup mcast role for the transit
links.stream()
@@ -1779,7 +1819,9 @@
sources.forEach(source -> {
if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
if (install) {
- mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
+ McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
+ vlanId, mcastRoute.group().isIp4());
+ addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
} else {
mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
}
@@ -1792,13 +1834,104 @@
}
/**
+ * Add filtering to the device if needed.
+ *
+ * @param filterObjStoreKey the filtering obj key
+ * @param mcastIp the multicast group
+ * @param mcastRole the multicast role
+ */
+ private void addFilterToDevice(McastFilteringObjStoreKey filterObjStoreKey,
+ IpAddress mcastIp,
+ McastRole mcastRole) {
+ if (!containsFilterInTheDevice(filterObjStoreKey)) {
+ // if this is the first sink for this group/device
+ // match additionally on mac
+ log.debug("Filtering not available for device {}, vlan {} and {}",
+ filterObjStoreKey.ingressCP().deviceId(), filterObjStoreKey.vlanId(),
+ filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
+ mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
+ filterObjStoreKey.ingressCP().port(),
+ filterObjStoreKey.vlanId(), mcastIp,
+ mcastRole, true);
+ mcastFilteringObjStore.add(filterObjStoreKey);
+ } else if (!mcastFilteringObjStore.contains(filterObjStoreKey)) {
+ // match only vlan
+ log.debug("Filtering not available for connect point {}, vlan {} and {}",
+ filterObjStoreKey.ingressCP(), filterObjStoreKey.vlanId(),
+ filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
+ mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
+ filterObjStoreKey.ingressCP().port(),
+ filterObjStoreKey.vlanId(), mcastIp,
+ mcastRole, false);
+ mcastFilteringObjStore.add(filterObjStoreKey);
+ } else {
+ // do nothing
+ log.debug("Filtering already present. Abort");
+ }
+ }
+
+ /**
+ * Verify if there are related filtering obj in the device.
+ *
+ * @param filteringKey the filtering obj key
+ * @return true if related filtering obj are found
+ */
+ private boolean containsFilterInTheDevice(McastFilteringObjStoreKey filteringKey) {
+ // check if filters are already added on the device
+ McastFilteringObjStoreKey key = mcastFilteringObjStore.stream()
+ .filter(mcastFilteringKey ->
+ mcastFilteringKey.ingressCP().deviceId().equals(filteringKey.ingressCP().deviceId())
+ && mcastFilteringKey.isIpv4() == filteringKey.isIpv4()
+ && mcastFilteringKey.vlanId().equals(filteringKey.vlanId())
+ ).findFirst().orElse(null);
+ // we are interested to filt obj on the same device, same vlan and same ip type
+ return key != null;
+ }
+
+ /**
+ * Update the filtering objective store upon device failure.
+ *
+ * @param affectedDevice the affected device
+ */
+ private void updateFilterObjStoreByDevice(DeviceId affectedDevice) {
+ // purge the related filter objective key
+ Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
+ Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
+ McastFilteringObjStoreKey filterKey;
+ while (filterIterator.hasNext()) {
+ filterKey = filterIterator.next();
+ if (filterKey.ingressCP().deviceId().equals(affectedDevice)) {
+ mcastFilteringObjStore.remove(filterKey);
+ }
+ }
+ }
+
+ /**
+ * Update the filtering objective store upon port failure.
+ *
+ * @param affectedPort the affected port
+ */
+ private void updateFilterObjStoreByPort(ConnectPoint affectedPort) {
+ // purge the related filter objective key
+ Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
+ Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
+ McastFilteringObjStoreKey filterKey;
+ while (filterIterator.hasNext()) {
+ filterKey = filterIterator.next();
+ if (filterKey.ingressCP().equals(affectedPort)) {
+ mcastFilteringObjStore.remove(filterKey);
+ }
+ }
+ }
+
+ /**
* Performs bucket verification operation for all mcast groups in the devices.
* Firstly, it verifies that mcast is stable before trying verification operation.
* Verification consists in creating new nexts with VERIFY operation. Actually,
* the operation is totally delegated to the driver.
*/
private final class McastBucketCorrector implements Runnable {
-
+ // Internal params
private static final int MAX_VERIFY_ON_FLIGHT = 10;
private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
// Define the context used for the back pressure mechanism
@@ -1875,7 +2008,7 @@
for (DeviceId deviceId : devicesToProcess) {
if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
- return;
+ continue;
}
synchronized (verifyOnFlight) {
while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
@@ -2011,4 +2144,25 @@
public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
return mcastUtils.getMcastLeaders(mcastIp);
}
+
+ /**
+ * Returns the mcast filtering obj.
+ *
+ * @return the mapping group-node
+ */
+ public Map<DeviceId, List<McastFilteringObjStoreKey>> getMcastFilters() {
+ Map<DeviceId, List<McastFilteringObjStoreKey>> mapping = Maps.newHashMap();
+ Set<McastFilteringObjStoreKey> currentKeys = Sets.newHashSet(mcastFilteringObjStore);
+ currentKeys.forEach(filteringObjStoreKey ->
+ mapping.compute(filteringObjStoreKey.ingressCP().deviceId(), (k, v) -> {
+ List<McastFilteringObjStoreKey> values = v;
+ if (values == null) {
+ values = Lists.newArrayList();
+ }
+ values.add(filteringObjStoreKey);
+ return values;
+ })
+ );
+ return mapping;
+ }
}