[CORD-2607] Mcast buckets correction
Change-Id: Ib47b2d8e40babdbb2ccdba61b48365a141752016
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
index 94b3a4c..e917b30 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
@@ -50,6 +50,7 @@
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.mcast.McastEvent;
+import org.onosproject.net.mcast.McastRoute;
import org.onosproject.net.mcast.McastRouteInfo;
import org.onosproject.net.topology.TopologyService;
import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
@@ -62,6 +63,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -69,9 +71,15 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
/**
@@ -87,6 +95,49 @@
private final KryoNamespace.Builder mcastKryo;
private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
+ // Mcast lock to serialize local operations
+ private final Lock mcastLock = new ReentrantLock();
+
+ /**
+ * Acquires the lock used when making mcast changes.
+ */
+ private void mcastLock() {
+ mcastLock.lock();
+ }
+
+ /**
+ * Releases the lock used when making mcast changes.
+ */
+ private void mcastUnlock() {
+ mcastLock.unlock();
+ }
+
+ // Stability threshold for Mcast. Seconds
+ private static final long MCAST_STABLITY_THRESHOLD = 5;
+ // Last change done
+ private Instant lastMcastChange = Instant.now();
+
+ /**
+ * Determines if mcast in the network has been stable in the last
+ * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
+ * to the last mcast change timestamp.
+ *
+ * @return true if stable
+ */
+ private boolean isMcastStable() {
+ long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
+ long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+ log.debug("Mcast stable since {}s", now - last);
+ return (now - last) > MCAST_STABLITY_THRESHOLD;
+ }
+
+ // Verify interval for Mcast
+ private static final long MCAST_VERIFY_INTERVAL = 30;
+
+ // Executor for mcast bucket corrector
+ private ScheduledExecutorService executorService
+ = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
+
/**
* Role in the multicast tree.
*/
@@ -129,6 +180,10 @@
.withName("onos-mcast-role-store")
.withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
.build();
+ // Init the executor service and the buckets corrector
+ executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
+ MCAST_VERIFY_INTERVAL,
+ TimeUnit.SECONDS);
}
/**
@@ -145,6 +200,13 @@
}
/**
+ * Clean up when deactivating the application.
+ */
+ protected void terminate() {
+ executorService.shutdown();
+ }
+
+ /**
* Processes the SOURCE_ADDED event.
*
* @param event McastEvent with SOURCE_ADDED type
@@ -160,9 +222,7 @@
Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
IpAddress mcastIp = mcastRouteInfo.route().group();
- sinks.forEach(sink -> {
- processSinkAddedInternal(source, sink, mcastIp);
- });
+ sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
}
/**
@@ -200,43 +260,65 @@
ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
IpAddress mcastIp = mcastRouteInfo.route().group();
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- log.info("Skip {} due to lack of mastership of the source device {}",
- mcastIp, source.deviceId());
- return;
- }
+ processSinkRemovedInternal(source, sink, mcastIp);
+ }
- // When source and sink are on the same device
- if (source.deviceId().equals(sink.deviceId())) {
- // Source and sink are on even the same port. There must be something wrong.
- if (source.port().equals(sink.port())) {
- log.warn("Sink is on the same port of source. Abort");
+ /**
+ * Removes a path from source to sink for given multicast group.
+ *
+ * @param source connect point of the multicast source
+ * @param sink connection point of the multicast sink
+ * @param mcastIp multicast group IP address
+ */
+ private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
+ IpAddress mcastIp) {
+ lastMcastChange = Instant.now();
+ mcastLock();
+ try {
+ // Continue only when this instance is the master of source device
+ if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+ log.debug("Skip {} due to lack of mastership of the source device {}",
+ mcastIp, source.deviceId());
return;
}
- removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
- return;
- }
- // Process the egress device
- boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
- if (isLast) {
- mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
- }
+ // When source and sink are on the same device
+ if (source.deviceId().equals(sink.deviceId())) {
+ // Source and sink are on even the same port. There must be something wrong.
+ if (source.port().equals(sink.port())) {
+ log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
+ mcastIp, sink, source);
+ return;
+ }
+ removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
+ return;
+ }
- // If this is the last sink on the device, also update upstream
- Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
- if (mcastPath.isPresent()) {
- List<Link> links = Lists.newArrayList(mcastPath.get().links());
- Collections.reverse(links);
- for (Link link : links) {
- if (isLast) {
- isLast = removePortFromDevice(link.src().deviceId(), link.src().port(),
- mcastIp,
- assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
- mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
+ // Process the egress device
+ boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+ if (isLast) {
+ mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
+ }
+
+ // If this is the last sink on the device, also update upstream
+ Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
+ if (mcastPath.isPresent()) {
+ List<Link> links = Lists.newArrayList(mcastPath.get().links());
+ Collections.reverse(links);
+ for (Link link : links) {
+ if (isLast) {
+ isLast = removePortFromDevice(
+ link.src().deviceId(),
+ link.src().port(),
+ mcastIp,
+ assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
+ );
+ mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
+ }
}
}
+ } finally {
+ mcastUnlock();
}
}
@@ -249,54 +331,61 @@
*/
private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
IpAddress mcastIp) {
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- log.info("Skip {} due to lack of mastership of the source device {}",
- source.deviceId());
- return;
- }
-
- // Process the ingress device
- addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
-
- // When source and sink are on the same device
- if (source.deviceId().equals(sink.deviceId())) {
- // Source and sink are on even the same port. There must be something wrong.
- if (source.port().equals(sink.port())) {
- log.warn("Sink is on the same port of source. Abort");
+ lastMcastChange = Instant.now();
+ mcastLock();
+ try {
+ // Continue only when this instance is the master of source device
+ if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+ log.debug("Skip {} due to lack of mastership of the source device {}",
+ mcastIp, source.deviceId());
return;
}
- addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
- mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
- return;
- }
- // Find a path. If present, create/update groups and flows for each hop
- Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
- if (mcastPath.isPresent()) {
- List<Link> links = mcastPath.get().links();
- checkState(links.size() == 2,
- "Path in leaf-spine topology should always be two hops: ", links);
+ // Process the ingress device
+ addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
- links.forEach(link -> {
- addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
- assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
- addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
- });
+ // When source and sink are on the same device
+ if (source.deviceId().equals(sink.deviceId())) {
+ // Source and sink are on even the same port. There must be something wrong.
+ if (source.port().equals(sink.port())) {
+ log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
+ mcastIp, sink, source);
+ return;
+ }
+ addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
+ mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
+ return;
+ }
- // Process the egress device
- addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+ // Find a path. If present, create/update groups and flows for each hop
+ Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
+ if (mcastPath.isPresent()) {
+ List<Link> links = mcastPath.get().links();
+ checkState(links.size() == 2,
+ "Path in leaf-spine topology should always be two hops: ", links);
- // Setup mcast roles
- mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
- McastRole.INGRESS);
- mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
- McastRole.TRANSIT);
- mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
- McastRole.EGRESS);
- } else {
- log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
- source.deviceId(), sink.deviceId());
+ links.forEach(link -> {
+ addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
+ assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
+ addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
+ });
+
+ // Process the egress device
+ addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+
+ // Setup mcast roles
+ mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
+ McastRole.INGRESS);
+ mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
+ McastRole.TRANSIT);
+ mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
+ McastRole.EGRESS);
+ } else {
+ log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
+ source.deviceId(), sink.deviceId());
+ }
+ } finally {
+ mcastUnlock();
}
}
@@ -306,56 +395,63 @@
* @param affectedLink Link that is going down
*/
protected void processLinkDown(Link affectedLink) {
- getAffectedGroups(affectedLink).forEach(mcastIp -> {
- // TODO Optimize when the group editing is in place
- log.debug("Processing link down {} for group {}",
- affectedLink, mcastIp);
+ lastMcastChange = Instant.now();
+ mcastLock();
+ try {
+ // Get groups affected by the link down event
+ getAffectedGroups(affectedLink).forEach(mcastIp -> {
+ // TODO Optimize when the group editing is in place
+ log.debug("Processing link down {} for group {}",
+ affectedLink, mcastIp);
- // Find out the ingress, transit and egress device of affected group
- DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
- .stream().findAny().orElse(null);
- DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
- .stream().findAny().orElse(null);
- Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
- ConnectPoint source = getSource(mcastIp);
+ // Find out the ingress, transit and egress device of affected group
+ DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+ .stream().findAny().orElse(null);
+ DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
+ .stream().findAny().orElse(null);
+ Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
+ ConnectPoint source = getSource(mcastIp);
- // Do not proceed if any of these info is missing
- if (ingressDevice == null || transitDevice == null
- || egressDevices == null || source == null) {
- log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
- ingressDevice, transitDevice, egressDevices, source);
- return;
- }
-
- // Continue only when this instance is the master of source device
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- log.info("Skip {} due to lack of mastership of the source device {}",
- source.deviceId());
- return;
- }
-
- // Remove entire transit
- removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
-
- // Remove transit-facing port on ingress device
- PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
- if (ingressTransitPort != null) {
- removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
- mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
- }
-
- // Construct a new path for each egress device
- egressDevices.forEach(egressDevice -> {
- Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
- if (mcastPath.isPresent()) {
- installPath(mcastIp, source, mcastPath.get());
- } else {
- log.warn("Fail to recover egress device {} from link failure {}",
- egressDevice, affectedLink);
- removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+ // Do not proceed if any of these info is missing
+ if (ingressDevice == null || transitDevice == null
+ || egressDevices == null || source == null) {
+ log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
+ ingressDevice, transitDevice, egressDevices, source);
+ return;
}
+
+ // Continue only when this instance is the master of source device
+ if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+ log.debug("Skip {} due to lack of mastership of the source device {}",
+ source.deviceId());
+ return;
+ }
+
+ // Remove entire transit
+ removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
+
+ // Remove transit-facing port on ingress device
+ PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
+ if (ingressTransitPort != null) {
+ removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
+ mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
+ }
+
+ // Construct a new path for each egress device
+ egressDevices.forEach(egressDevice -> {
+ Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+ if (mcastPath.isPresent()) {
+ installPath(mcastIp, source, mcastPath.get());
+ } else {
+ log.warn("Fail to recover egress device {} from link failure {}",
+ egressDevice, affectedLink);
+ removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+ }
+ });
});
- });
+ } finally {
+ mcastUnlock();
+ }
}
/**
@@ -364,103 +460,109 @@
* @param deviceDown device going down
*/
protected void processDeviceDown(DeviceId deviceDown) {
- // Get the mcast groups affected by the device going down
- getAffectedGroups(deviceDown).forEach(mcastIp -> {
- // TODO Optimize when the group editing is in place
- log.debug("Processing device down {} for group {}",
- deviceDown, mcastIp);
+ lastMcastChange = Instant.now();
+ mcastLock();
+ try {
+ // Get the mcast groups affected by the device going down
+ getAffectedGroups(deviceDown).forEach(mcastIp -> {
+ // TODO Optimize when the group editing is in place
+ log.debug("Processing device down {} for group {}",
+ deviceDown, mcastIp);
- // Find out the ingress, transit and egress device of affected group
- DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
- .stream().findAny().orElse(null);
- DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
- .stream().findAny().orElse(null);
- Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
- ConnectPoint source = getSource(mcastIp);
+ // Find out the ingress, transit and egress device of affected group
+ DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+ .stream().findAny().orElse(null);
+ DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
+ .stream().findAny().orElse(null);
+ Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
+ ConnectPoint source = getSource(mcastIp);
- // Do not proceed if ingress device or source of this group are missing
- // If sinks are in other leafs, we have ingress, transit, egress, and source
- // If sinks are in the same leaf, we have just ingress and source
- if (ingressDevice == null || source == null) {
- log.warn("Missing ingress {} or source {} for group {}",
- ingressDevice, source, mcastIp);
- return;
- }
-
- // Continue only when we have the mastership on the operation
- if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
- // When the source is available we just check the mastership
- if (srManager.deviceService.isAvailable(source.deviceId())) {
- log.info("Skip {} due to lack of mastership of the source device {}",
- mcastIp, source.deviceId());
+ // Do not proceed if ingress device or source of this group are missing
+ // If sinks are in other leafs, we have ingress, transit, egress, and source
+ // If sinks are in the same leaf, we have just ingress and source
+ if (ingressDevice == null || source == null) {
+ log.warn("Missing ingress {} or source {} for group {}",
+ ingressDevice, source, mcastIp);
return;
}
- // Fallback with Leadership service
- // source id is used a topic
- NodeId leader = srManager.leadershipService.runForLeadership(
- source.deviceId().toString()).leaderNodeId();
- // Verify if this node is the leader
- if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
- log.info("Skip {} due to lack of leadership on the topic {}",
- mcastIp, source.deviceId());
- return;
- }
- }
- // If it exists, we have to remove it in any case
- if (transitDevice != null) {
- // Remove entire transit
- removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
- }
- // If the ingress is down
- if (ingressDevice.equals(deviceDown)) {
- // Remove entire ingress
- removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
- // If other sinks different from the ingress exist
- if (!egressDevices.isEmpty()) {
- // Remove all the remaining egress
- egressDevices.forEach(
- egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
- );
- }
- } else {
- // Egress or transit could be down at this point
- // Get the ingress-transit port if it exists
- PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
- if (ingressTransitPort != null) {
- // Remove transit-facing port on ingress device
- removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
- }
- // One of the egress device is down
- if (egressDevices.contains(deviceDown)) {
- // Remove entire device down
- removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
- // Remove the device down from egress
- egressDevices.remove(deviceDown);
- // If there are no more egress and ingress does not have sinks
- if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
- // Remove entire ingress
- mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
- // We have done
+ // Continue only when we have the mastership on the operation
+ if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+ // When the source is available we just check the mastership
+ if (srManager.deviceService.isAvailable(source.deviceId())) {
+ log.debug("Skip {} due to lack of mastership of the source device {}",
+ mcastIp, source.deviceId());
+ return;
+ }
+ // Fallback with Leadership service
+ // source id is used a topic
+ NodeId leader = srManager.leadershipService.runForLeadership(
+ source.deviceId().toString()).leaderNodeId();
+ // Verify if this node is the leader
+ if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
+ log.debug("Skip {} due to lack of leadership on the topic {}",
+ mcastIp, source.deviceId());
return;
}
}
- // Construct a new path for each egress device
- egressDevices.forEach(egressDevice -> {
- Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
- // If there is a new path
- if (mcastPath.isPresent()) {
- // Let's install the new mcast path for this egress
- installPath(mcastIp, source, mcastPath.get());
- } else {
- // We were not able to find an alternative path for this egress
- log.warn("Fail to recover egress device {} from device down {}",
- egressDevice, deviceDown);
- removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+
+ // If it exists, we have to remove it in any case
+ if (transitDevice != null) {
+ // Remove entire transit
+ removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
+ }
+ // If the ingress is down
+ if (ingressDevice.equals(deviceDown)) {
+ // Remove entire ingress
+ removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
+ // If other sinks different from the ingress exist
+ if (!egressDevices.isEmpty()) {
+ // Remove all the remaining egress
+ egressDevices.forEach(
+ egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
+ );
}
- });
- }
- });
+ } else {
+ // Egress or transit could be down at this point
+ // Get the ingress-transit port if it exists
+ PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
+ if (ingressTransitPort != null) {
+ // Remove transit-facing port on ingress device
+ removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
+ }
+ // One of the egress device is down
+ if (egressDevices.contains(deviceDown)) {
+ // Remove entire device down
+ removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
+ // Remove the device down from egress
+ egressDevices.remove(deviceDown);
+ // If there are no more egress and ingress does not have sinks
+ if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
+ // Remove entire ingress
+ mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
+ // We have done
+ return;
+ }
+ }
+ // Construct a new path for each egress device
+ egressDevices.forEach(egressDevice -> {
+ Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+ // If there is a new path
+ if (mcastPath.isPresent()) {
+ // Let's install the new mcast path for this egress
+ installPath(mcastIp, source, mcastPath.get());
+ } else {
+ // We were not able to find an alternative path for this egress
+ log.warn("Fail to recover egress device {} from device down {}",
+ egressDevice, deviceDown);
+ removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+ }
+ });
+ }
+ });
+ } finally {
+ mcastUnlock();
+ }
}
/**
@@ -1056,7 +1158,9 @@
}
/**
- * Adds or removes filtering objective for given device and port.
+ * Updates filtering objective for given device and port.
+ * It is called in general when the mcast config has been
+ * changed.
*
* @param deviceId device ID
* @param portNum ingress port number
@@ -1065,15 +1169,118 @@
*/
protected void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
VlanId vlanId, boolean install) {
- srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
- ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
- if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
- if (install) {
- addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
- } else {
- removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
+ lastMcastChange = Instant.now();
+ mcastLock();
+ try {
+ // Iterates over the route and updates properly the filtering objective
+ // on the source device.
+ srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+ ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
+ if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
+ if (install) {
+ addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
+ } else {
+ removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
+ }
}
+ });
+ } finally {
+ mcastUnlock();
+ }
+ }
+
+ /**
+ * Performs bucket verification operation for all mcast groups in the devices.
+ * Firstly, it verifies that mcast is stable before trying verification operation.
+ * Verification consists in creating new nexts with VERIFY operation. Actually,
+ * the operation is totally delegated to the driver.
+ */
+ private final class McastBucketCorrector implements Runnable {
+
+ @Override
+ public void run() {
+ // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
+ if (!isMcastStable()) {
+ return;
}
- });
+ // Acquires lock
+ mcastLock();
+ try {
+ // Iterates over the routes and verify the related next objectives
+ srManager.multicastRouteService.getRoutes()
+ .stream()
+ .map(McastRoute::group)
+ .forEach(mcastIp -> {
+ log.trace("Running mcast buckets corrector for mcast group: {}",
+ mcastIp);
+
+ // For each group we get current information in the store
+ // and issue a check of the next objectives in place
+ DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+ .stream().findAny().orElse(null);
+ DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
+ .stream().findAny().orElse(null);
+ Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
+ ConnectPoint source = getSource(mcastIp);
+
+ // Do not proceed if ingress device or source of this group are missing
+ if (ingressDevice == null || source == null) {
+ log.warn("Unable to run buckets corrector. " +
+ "Missing ingress {} or source {} for group {}",
+ ingressDevice, source, mcastIp);
+ return;
+ }
+
+ // Continue only when this instance is the master of source device
+ if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+ log.trace("Unable to run buckets corrector. " +
+ "Skip {} due to lack of mastership " +
+ "of the source device {}",
+ mcastIp, source.deviceId());
+ return;
+ }
+
+ // Create the set of the devices to be processed
+ ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
+ devicesBuilder.add(ingressDevice);
+ if (transitDevice != null) {
+ devicesBuilder.add(transitDevice);
+ }
+ if (!egressDevices.isEmpty()) {
+ devicesBuilder.addAll(egressDevices);
+ }
+ Set<DeviceId> devicesToProcess = devicesBuilder.build();
+
+ // Iterate over the devices
+ devicesToProcess.forEach(deviceId -> {
+ McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
+ // If next exists in our store verify related next objective
+ if (mcastNextObjStore.containsKey(currentKey)) {
+ NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
+ // Get current ports
+ Set<PortNumber> currentPorts = getPorts(currentNext.next());
+ // Rebuild the next objective
+ currentNext = nextObjBuilder(
+ mcastIp,
+ assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
+ currentPorts,
+ currentNext.id()
+ ).verify();
+ // Send to the flowobjective service
+ srManager.flowObjectiveService.next(deviceId, currentNext);
+ } else {
+ log.warn("Unable to run buckets corrector." +
+ "Missing next for {} and group {}",
+ deviceId, mcastIp);
+ }
+ });
+
+ });
+ } finally {
+ // Finally, it releases the lock
+ mcastUnlock();
+ }
+
+ }
}
}
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index c3980b2..2b607af 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -490,6 +490,8 @@
portNextObjStore.destroy();
tunnelStore.destroy();
policyStore.destroy();
+
+ mcastHandler.terminate();
log.info("Stopped");
}
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index a3670e6..e41ca4c 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -1588,8 +1588,8 @@
* modified to match the given next objective
*/
protected void verifyGroup(NextObjective nextObjective, NextGroup next) {
- if (nextObjective.type() != NextObjective.Type.HASHED) {
- log.warn("verification not supported for {} group", nextObjective.type());
+ if (nextObjective.type() == NextObjective.Type.SIMPLE) {
+ log.warn("verification not supported for indirect group");
fail(nextObjective, ObjectiveError.UNSUPPORTED);
return;
}
@@ -1640,17 +1640,25 @@
indicesToRemove.addAll(otherIndices);
}
+ log.debug("Buckets to create {}", bucketsToCreate);
+ log.debug("Indices to remove {}", indicesToRemove);
+
if (!bucketsToCreate.isEmpty()) {
log.info("creating {} buckets as part of nextId: {} verification",
bucketsToCreate.size(), nextObjective.id());
//create a nextObjective only with these buckets
NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
.withId(nextObjective.id())
- .withType(NextObjective.Type.HASHED)
+ .withType(nextObjective.type())
.withMeta(nextObjective.meta())
.fromApp(nextObjective.appId());
- bucketsToCreate.forEach(bucket -> nextObjBuilder.addTreatment(bucket));
- addBucketToHashGroup(nextObjBuilder.addToExisting(), allActiveKeys);
+ bucketsToCreate.forEach(nextObjBuilder::addTreatment);
+ // According to the next type we call the proper add function
+ if (nextObjective.type() == NextObjective.Type.HASHED) {
+ addBucketToHashGroup(nextObjBuilder.addToExisting(), allActiveKeys);
+ } else {
+ addBucketToBroadcastGroup(nextObjBuilder.addToExisting(), allActiveKeys);
+ }
}
if (!indicesToRemove.isEmpty()) {
@@ -1667,9 +1675,9 @@
// Nevertheless groupStore may not be in sync due to bug in the store
// - see CORD-1844. XXX When this bug is fixed, the rest of this verify
// method will not be required.
- GroupKey hashGroupKey = allActiveKeys.get(0).peekFirst();
- Group hashGroup = groupService.getGroup(deviceId, hashGroupKey);
- int actualGroupSize = hashGroup.buckets().buckets().size();
+ GroupKey topGroupKey = allActiveKeys.get(0).peekFirst();
+ Group topGroup = groupService.getGroup(deviceId, topGroupKey);
+ int actualGroupSize = topGroup.buckets().buckets().size();
int objGroupSize = nextObjective.next().size();
if (actualGroupSize != objGroupSize) {
log.warn("Mismatch detected in device:{}, nextId:{}, nextObjective-size"
@@ -1677,9 +1685,10 @@
objGroupSize, actualGroupSize);
}
if (actualGroupSize > objGroupSize) {
+ // Group in the device has more chains
List<GroupBucket> bucketsToRemove = Lists.newArrayList();
//check every bucket in the actual group
- for (GroupBucket bucket : hashGroup.buckets().buckets()) {
+ for (GroupBucket bucket : topGroup.buckets().buckets()) {
GroupInstruction g = (GroupInstruction) bucket.treatment()
.allInstructions().iterator().next();
GroupId gidToCheck = g.groupId(); // the group pointed to
@@ -1707,11 +1716,12 @@
+ "buckets to remove");
} else {
GroupBuckets removeBuckets = new GroupBuckets(bucketsToRemove);
- groupService.removeBucketsFromGroup(deviceId, hashGroupKey,
- removeBuckets, hashGroupKey,
+ groupService.removeBucketsFromGroup(deviceId, topGroupKey,
+ removeBuckets, topGroupKey,
nextObjective.appId());
}
} else if (actualGroupSize < objGroupSize) {
+ // Group in the device has less chains
// should also add buckets not in group-store but in obj-store
List<GroupBucket> bucketsToAdd = Lists.newArrayList();
//check every bucket in the obj
@@ -1727,7 +1737,7 @@
continue;
}
boolean matches = false;
- for (GroupBucket bucket : hashGroup.buckets().buckets()) {
+ for (GroupBucket bucket : topGroup.buckets().buckets()) {
GroupInstruction g = (GroupInstruction) bucket.treatment()
.allInstructions().iterator().next();
GroupId gidToCheck = g.groupId(); // the group pointed to
@@ -1741,7 +1751,12 @@
TrafficTreatment t = DefaultTrafficTreatment.builder()
.group(pointedGroup.id())
.build();
- bucketsToAdd.add(DefaultGroupBucket.createSelectGroupBucket(t));
+ // Create the proper bucket according to the next type
+ if (nextObjective.type() == NextObjective.Type.HASHED) {
+ bucketsToAdd.add(DefaultGroupBucket.createSelectGroupBucket(t));
+ } else {
+ bucketsToAdd.add(DefaultGroupBucket.createAllGroupBucket(t));
+ }
}
}
if (bucketsToAdd.isEmpty()) {
@@ -1749,8 +1764,8 @@
+ "buckets to add");
} else {
GroupBuckets addBuckets = new GroupBuckets(bucketsToAdd);
- groupService.addBucketsToGroup(deviceId, hashGroupKey,
- addBuckets, hashGroupKey,
+ groupService.addBucketsToGroup(deviceId, topGroupKey,
+ addBuckets, topGroupKey,
nextObjective.appId());
}
}