Improves VERIFY operations
Changes:
- Avoids to sends duplicate next when there are multiple sources
- Introduces a backpressure mechanism to not flood the pipeliners
- Guarantees there are at least 30s between each mcast corrector
execution
- Introduce a pool of 4 verifiers in FlowObjectiveManager to
separate the thread used for the installation/removal of the
FlowObjectives
- Improves logging in verifyGroup
Change-Id: I45aac0f80c9eb6afd763f21977d62df4a98f686e
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 6d2b8cc..d08f111 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1014,21 +1014,11 @@
}
@Override
- public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
- return mcastHandler.getMcastRoles(mcastIp);
- }
-
- @Override
public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp, ConnectPoint sourcecp) {
return mcastHandler.getMcastRoles(mcastIp, sourcecp);
}
@Override
- public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
- return mcastHandler.getMcastPaths(mcastIp);
- }
-
- @Override
public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
ConnectPoint sourcecp) {
return mcastHandler.getMcastTrees(mcastIp, sourcecp);
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
index 74db140..dd84eeb 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
@@ -296,18 +296,6 @@
ImmutableMap<DeviceId, Set<PortNumber>> getDownedPortState();
/**
- * Returns the associated roles to the mcast groups or to the single
- * group if mcastIp is present.
- *
- * @param mcastIp the group ip
- * @return the mapping mcastIp-device to mcast role
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp);
-
- /**
* Returns the associated roles to the mcast groups.
*
* @param mcastIp the group ip
@@ -318,17 +306,6 @@
ConnectPoint sourcecp);
/**
- * Returns the associated paths to the mcast group.
- *
- * @param mcastIp the group ip
- * @return the mapping egress point to mcast path
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp);
-
- /**
* Returns the associated trees to the mcast group.
*
* @param mcastIp the group ip
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 9db890d..f7e9cb3 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -71,6 +71,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -212,6 +213,8 @@
private static final long MCAST_STABLITY_THRESHOLD = 5;
// Last change done
private Instant lastMcastChange = Instant.now();
+ // Last bucker corrector execution
+ private Instant lastBktCorrExecution = Instant.now();
/**
* Determines if mcast in the network has been stable in the last
@@ -223,10 +226,23 @@
private boolean isMcastStable() {
long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
long now = (long) (Instant.now().toEpochMilli() / 1000.0);
- log.trace("Mcast stable since {}s", now - last);
+ log.trace("Multicast stable since {}s", now - last);
return (now - last) > MCAST_STABLITY_THRESHOLD;
}
+ /**
+ * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
+ * by comparing the current time with the last corrector execution.
+ *
+ * @return true if stable
+ */
+ private boolean wasBktCorrRunning() {
+ long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
+ long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+ log.trace("McastBucketCorrector executed {}s ago", now - last);
+ return (now - last) < MCAST_VERIFY_INTERVAL;
+ }
+
// Verify interval for Mcast bucket corrector
private static final long MCAST_VERIFY_INTERVAL = 30;
// Executor for mcast bucket corrector and for cache
@@ -1767,89 +1783,121 @@
*/
private final class McastBucketCorrector implements Runnable {
+ private static final int MAX_VERIFY_ON_FLIGHT = 10;
+ private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
+ // Define the context used for the back pressure mechanism
+ private final ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> {
+ synchronized (verifyOnFlight) {
+ verifyOnFlight.decrementAndGet();
+ verifyOnFlight.notify();
+ }
+ },
+ (objective, error) -> {
+ synchronized (verifyOnFlight) {
+ verifyOnFlight.decrementAndGet();
+ verifyOnFlight.notify();
+ }
+ });
+
@Override
public void run() {
- if (!isMcastStable()) {
+ if (!isMcastStable() || wasBktCorrRunning()) {
return;
}
mcastLock();
try {
// Iterates over the routes and verify the related next objectives
- srManager.multicastRouteService.getRoutes()
- .stream().map(McastRoute::group)
- .forEach(mcastIp -> {
- log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(mcastIp)) {
- log.trace("Skip {} due to lack of leadership", mcastIp);
- return;
+ for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
+ IpAddress mcastIp = mcastRoute.group();
+ log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.trace("Skip {} due to lack of leadership", mcastIp);
+ continue;
+ }
+ // Get sources and sinks from Mcast Route Service and warn about errors
+ Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
+ Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toSet());
+ // Do not proceed if sources of this group are missing
+ if (sources.isEmpty()) {
+ if (!sinks.isEmpty()) {
+ log.warn("Unable to run buckets corrector. " +
+ "Missing source {} for group {}", sources, mcastIp);
}
- // Get sources and sinks from Mcast Route Service and warn about errors
- Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
- Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
- .flatMap(Collection::stream).collect(Collectors.toSet());
- // Do not proceed if sources of this group are missing
- if (sources.isEmpty()) {
+ continue;
+ }
+ // For each group we get current information in the store
+ // and issue a check of the next objectives in place
+ Set<McastStoreKey> processedKeys = Sets.newHashSet();
+ for (ConnectPoint source : sources) {
+ Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
+ Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
+ Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
+ // Do not proceed if ingress devices are missing
+ if (ingressDevices.isEmpty()) {
if (!sinks.isEmpty()) {
log.warn("Unable to run buckets corrector. " +
- "Missing source {} for group {}", sources, mcastIp);
+ "Missing ingress {} for source {} and for group {}",
+ ingressDevices, source, mcastIp);
}
- return;
+ continue;
}
- sources.forEach(source -> {
- // For each group we get current information in the store
- // and issue a check of the next objectives in place
- Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
- Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
- Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
- // Do not proceed if ingress devices are missing
- if (ingressDevices.isEmpty()) {
- if (!sinks.isEmpty()) {
- log.warn("Unable to run buckets corrector. " +
- "Missing ingress {} for source {} and for group {}",
- ingressDevices, source, mcastIp);
- }
+ // Create the set of the devices to be processed
+ ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
+ if (!ingressDevices.isEmpty()) {
+ devicesBuilder.addAll(ingressDevices);
+ }
+ if (!transitDevices.isEmpty()) {
+ devicesBuilder.addAll(transitDevices);
+ }
+ if (!egressDevices.isEmpty()) {
+ devicesBuilder.addAll(egressDevices);
+ }
+ Set<DeviceId> devicesToProcess = devicesBuilder.build();
+ for (DeviceId deviceId : devicesToProcess) {
+ if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+ log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
return;
}
- // Create the set of the devices to be processed
- ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
- if (!ingressDevices.isEmpty()) {
- devicesBuilder.addAll(ingressDevices);
- }
- if (!transitDevices.isEmpty()) {
- devicesBuilder.addAll(transitDevices);
- }
- if (!egressDevices.isEmpty()) {
- devicesBuilder.addAll(egressDevices);
- }
- Set<DeviceId> devicesToProcess = devicesBuilder.build();
- devicesToProcess.forEach(deviceId -> {
- if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
- log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
- return;
+ synchronized (verifyOnFlight) {
+ while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
+ verifyOnFlight.wait();
}
- VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
- source : null);
- McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
- if (mcastNextObjStore.containsKey(currentKey)) {
- NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
- // Rebuild the next objective using assigned vlan
- currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
- mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify();
- // Send to the flowobjective service
- srManager.flowObjectiveService.next(deviceId, currentNext);
- } else {
- log.warn("Unable to run buckets corrector. " +
- "Missing next for {}, for source {} and for group {}",
- deviceId, source, mcastIp);
- }
- });
- });
- });
+ }
+ VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
+ source : null);
+ McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
+ // Check if we already processed this next - trees merge at some point
+ if (processedKeys.contains(currentKey)) {
+ continue;
+ }
+ // Verify the nextobjective or skip to next device
+ if (mcastNextObjStore.containsKey(currentKey)) {
+ NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
+ // Rebuild the next objective using assigned vlan
+ currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
+ mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
+ // Send to the flowobjective service
+ srManager.flowObjectiveService.next(deviceId, currentNext);
+ verifyOnFlight.incrementAndGet();
+ log.trace("Verify on flight {}", verifyOnFlight);
+ processedKeys.add(currentKey);
+ } else {
+ log.warn("Unable to run buckets corrector. " +
+ "Missing next for {}, for source {} and for group {}",
+ deviceId, source, mcastIp);
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ log.warn("BktCorr has been interrupted");
} finally {
+ lastBktCorrExecution = Instant.now();
mcastUnlock();
}
-
}
}
@@ -1884,28 +1932,6 @@
}
/**
- * Returns the associated roles to the mcast groups or to the single
- * group if mcastIp is present.
- *
- * @param mcastIp the group ip
- * @return the mapping mcastIp-device to mcast role
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
- if (mcastIp != null) {
- return mcastRoleStore.entrySet().stream()
- .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
- .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
- entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
- }
- return mcastRoleStore.entrySet().stream()
- .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
- entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
- }
-
- /**
* Returns the associated roles to the mcast groups.
*
* @param mcastIp the group ip
@@ -1932,28 +1958,6 @@
entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
}
-
- /**
- * Returns the associated paths to the mcast group.
- *
- * @param mcastIp the group ip
- * @return the mapping egress point to mcast path
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
- Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
- ConnectPoint source = mcastUtils.getSource(mcastIp);
- if (source != null) {
- Set<DeviceId> visited = Sets.newHashSet();
- List<ConnectPoint> currentPath = Lists.newArrayList(source);
- mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
- currentPath, mcastIp, source);
- }
- return mcastPaths;
- }
-
/**
* Returns the associated trees to the mcast group.
*
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index 6d3f938..78f7cb3 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -250,24 +250,6 @@
}
/**
- * Gets source connect point of given multicast group.
- *
- * @param mcastIp multicast IP
- * @return source connect point or null if not found
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- ConnectPoint getSource(IpAddress mcastIp) {
- McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
- .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
- .findFirst().orElse(null);
- return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
- .stream()
- .findFirst().orElse(null);
- }
-
- /**
* Gets sources connect points of given multicast group.
*
* @param mcastIp multicast IP