Extends XConnectManager
- Listens for L2LbEvent
- Leverages a cache to wait l2lb nextid
Includes a small change in the L2LbService interface
Change-Id: Id545e2228dde013620b4b415d1d2619027ef15d2
diff --git a/apps/l2lb/src/main/java/org/onosproject/l2lb/api/L2LbService.java b/apps/l2lb/src/main/java/org/onosproject/l2lb/api/L2LbService.java
index db5f20c..ce396ac 100644
--- a/apps/l2lb/src/main/java/org/onosproject/l2lb/api/L2LbService.java
+++ b/apps/l2lb/src/main/java/org/onosproject/l2lb/api/L2LbService.java
@@ -56,7 +56,7 @@
* @param key L2 load balancer key
* @return next ID
*/
- int getL2LbNexts(DeviceId deviceId, int key);
+ int getL2LbNext(DeviceId deviceId, int key);
/**
* Reserves a l2 load balancer. Only one application
diff --git a/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java b/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java
index 3a195dc..e54f2f2 100644
--- a/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java
+++ b/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java
@@ -230,7 +230,7 @@
}
@Override
- public int getL2LbNexts(DeviceId deviceId, int key) {
+ public int getL2LbNext(DeviceId deviceId, int key) {
return Versioned.valueOrNull(l2LbNextStore.get(new L2LbId(deviceId, key)));
}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
index aacc16a..e135f04 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -16,6 +16,9 @@
package org.onosproject.segmentrouting.xconnect.impl;
import com.google.common.collect.ImmutableList;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -30,6 +33,9 @@
import org.onosproject.codec.CodecService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.l2lb.api.L2LbEvent;
+import org.onosproject.l2lb.api.L2LbId;
+import org.onosproject.l2lb.api.L2LbListener;
import org.onosproject.l2lb.api.L2LbService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
@@ -93,10 +99,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
@Component(immediate = true, service = XconnectService.class)
@@ -163,6 +172,17 @@
private final HostListener hostListener = new InternalHostListener();
private ExecutorService hostEventExecutor;
+ // Wait time for the cache
+ private static final int WAIT_TIME_MS = 15000;
+ //The cache is implemented as buffer for waiting the installation of L2Lb when present
+ private Cache<L2LbId, XconnectKey> l2LbCache;
+ // Executor for the cache
+ private ScheduledExecutorService l2lbExecutor;
+ // Pattern for L2Lb key
+ private static final String L2LB_PATTERN = "^(L2LB\\(\\d*\\))$";
+ // We need to listen for some events to properly installed the xconnect with l2lb
+ private final L2LbListener l2LbListener = new InternalL2LbListener();
+
@Activate
void activate() {
appId = coreService.registerApplication(APP_NAME);
@@ -206,6 +226,18 @@
groupedThreads("sr-xconnect-host-event", "%d", log));
hostService.addListener(hostListener);
+ l2LbCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<L2LbId, XconnectKey> notification) ->
+ log.debug("L2Lb cache removal event. l2LbId={}, xConnectKey={}",
+ notification.getKey(), notification.getValue())).build();
+ l2lbExecutor = newScheduledThreadPool(1,
+ groupedThreads("l2LbCacheWorker", "l2LbCacheWorker-%d", log));
+ // Let's schedule the cleanup of the cache
+ l2lbExecutor.scheduleAtFixedRate(l2LbCache::cleanUp, 0,
+ WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+ l2LbService.addListener(l2LbListener);
+
log.info("Started");
}
@@ -219,6 +251,7 @@
deviceEventExecutor.shutdown();
hostEventExecutor.shutdown();
xConnectExecutor.shutdown();
+ l2lbExecutor.shutdown();
log.info("Stopped");
}
@@ -486,7 +519,7 @@
// If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
// The purpose is to make sure existing XConnect logic can still work on a configured port.
boolean filtered = ports.stream()
- .map(p -> getNextTreatment(key.deviceId(), p))
+ .map(p -> getNextTreatment(key.deviceId(), p, false))
.allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
ports.stream()
@@ -649,6 +682,13 @@
log.warn("Fail to revokeNext {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
return;
}
+ // Release the L2Lbs if present
+ ports.forEach(port -> {
+ if (isL2LbKey(port)) {
+ String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+ l2LbService.release(new L2LbId(key.deviceId(), Integer.parseInt(l2LbKey)), appId);
+ }
+ });
flowObjectiveService.next(key.deviceId(), nextObjBuilder.remove(context));
xconnectNextObjStore.remove(key);
}
@@ -766,9 +806,17 @@
.withMeta(metadata);
for (String port : ports) {
- NextTreatment nextTreatment = getNextTreatment(key.deviceId(), port);
+ NextTreatment nextTreatment = getNextTreatment(key.deviceId(), port, true);
if (nextTreatment == null) {
- log.warn("Unable to create nextObj. Null NextTreatment");
+ // If a L2Lb is used in the XConnect - putting on hold
+ if (isL2LbKey(port)) {
+ log.warn("Unable to create nextObj. L2Lb not ready");
+ String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+ l2LbCache.asMap().putIfAbsent(new L2LbId(key.deviceId(), Integer.parseInt(l2LbKey)),
+ key);
+ } else {
+ log.warn("Unable to create nextObj. Null NextTreatment");
+ }
return null;
}
nextObjBuilder.addTreatment(nextTreatment);
@@ -1202,7 +1250,7 @@
return Sets.newHashSet();
}
- private NextTreatment getNextTreatment(DeviceId deviceId, String port) {
+ private NextTreatment getNextTreatment(DeviceId deviceId, String port, boolean reserve) {
// If port is numeric, treat it as regular port.
// Otherwise try to parse it as load balancer key and get the physical port the LB maps to.
try {
@@ -1214,7 +1262,17 @@
String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
try {
- return IdNextTreatment.of(l2LbService.getL2LbNexts(deviceId, Integer.parseInt(l2LbKey)));
+ NextTreatment idNextTreatment = IdNextTreatment.of(
+ l2LbService.getL2LbNext(deviceId, Integer.parseInt(l2LbKey)));
+ // Reserve only one time during next objective creation
+ if (reserve) {
+ L2LbId l2LbId = new L2LbId(deviceId, Integer.parseInt(l2LbKey));
+ if (!l2LbService.reserve(new L2LbId(deviceId, Integer.parseInt(l2LbKey)), appId)) {
+ log.warn("Reservation failed for {}", l2LbId);
+ idNextTreatment = null;
+ }
+ }
+ return idNextTreatment;
} catch (NumberFormatException e) {
log.debug("Port {} is not load balancer key either. Ignore", port);
} catch (NullPointerException e) {
@@ -1223,4 +1281,40 @@
return null;
}
+
+ private boolean isL2LbKey(String l2LbKey) {
+ return l2LbKey.matches(L2LB_PATTERN);
+ }
+
+ private class InternalL2LbListener implements L2LbListener {
+ // Populate xconnect once l2lb is available
+ @Override
+ public void event(L2LbEvent event) {
+ l2lbExecutor.execute(() -> dequeue(event.subject().l2LbId()));
+ }
+ // When we receive INSTALLED l2 load balancing is ready
+ @Override
+ public boolean isRelevant(L2LbEvent event) {
+ return event.type() == L2LbEvent.Type.INSTALLED;
+ }
+ }
+
+ // Invalidate the cache and re-start the xconnect installation
+ private void dequeue(L2LbId l2LbId) {
+ XconnectKey xconnectKey = l2LbCache.getIfPresent(l2LbId);
+ if (xconnectKey == null) {
+ log.trace("{} not present in the cache", l2LbId);
+ return;
+ }
+ log.debug("Dequeue {}", l2LbId);
+ l2LbCache.invalidate(l2LbId);
+ Set<String> ports = Versioned.valueOrNull(xconnectStore.get(xconnectKey));
+ if (ports == null || ports.isEmpty()) {
+ log.warn("Ports not found for XConnect {}", xconnectKey);
+ return;
+ }
+ populateXConnect(xconnectKey, ports);
+ log.trace("L2Lb cache size {}", l2LbCache.size());
+ }
+
}