Extends XConnectManager

- Listens for L2LbEvent
- Leverages a cache to wait l2lb nextid

Includes a small change in the L2LbService interface

Change-Id: Id545e2228dde013620b4b415d1d2619027ef15d2
diff --git a/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
index aacc16a..e135f04 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -16,6 +16,9 @@
 package org.onosproject.segmentrouting.xconnect.impl;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -30,6 +33,9 @@
 import org.onosproject.codec.CodecService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.l2lb.api.L2LbEvent;
+import org.onosproject.l2lb.api.L2LbId;
+import org.onosproject.l2lb.api.L2LbListener;
 import org.onosproject.l2lb.api.L2LbService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
@@ -93,10 +99,13 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 
 @Component(immediate = true, service = XconnectService.class)
@@ -163,6 +172,17 @@
     private final HostListener hostListener = new InternalHostListener();
     private ExecutorService hostEventExecutor;
 
+    // Wait time for the cache
+    private static final int WAIT_TIME_MS = 15000;
+    //The cache is implemented as buffer for waiting the installation of L2Lb when present
+    private Cache<L2LbId, XconnectKey> l2LbCache;
+    // Executor for the cache
+    private ScheduledExecutorService l2lbExecutor;
+    // Pattern for L2Lb key
+    private static final String L2LB_PATTERN = "^(L2LB\\(\\d*\\))$";
+    // We need to listen for some events to properly installed the xconnect with l2lb
+    private final L2LbListener l2LbListener = new InternalL2LbListener();
+
     @Activate
     void activate() {
         appId = coreService.registerApplication(APP_NAME);
@@ -206,6 +226,18 @@
                 groupedThreads("sr-xconnect-host-event", "%d", log));
         hostService.addListener(hostListener);
 
+        l2LbCache = CacheBuilder.newBuilder()
+                .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
+                .removalListener((RemovalNotification<L2LbId, XconnectKey> notification) ->
+                                         log.debug("L2Lb cache removal event. l2LbId={}, xConnectKey={}",
+                                                   notification.getKey(), notification.getValue())).build();
+        l2lbExecutor = newScheduledThreadPool(1,
+                                              groupedThreads("l2LbCacheWorker", "l2LbCacheWorker-%d", log));
+        // Let's schedule the cleanup of the cache
+        l2lbExecutor.scheduleAtFixedRate(l2LbCache::cleanUp, 0,
+                                         WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+        l2LbService.addListener(l2LbListener);
+
         log.info("Started");
     }
 
@@ -219,6 +251,7 @@
         deviceEventExecutor.shutdown();
         hostEventExecutor.shutdown();
         xConnectExecutor.shutdown();
+        l2lbExecutor.shutdown();
 
         log.info("Stopped");
     }
@@ -486,7 +519,7 @@
         //       If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
         //       The purpose is to make sure existing XConnect logic can still work on a configured port.
         boolean filtered = ports.stream()
-                .map(p -> getNextTreatment(key.deviceId(), p))
+                .map(p -> getNextTreatment(key.deviceId(), p, false))
                 .allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
 
         ports.stream()
@@ -649,6 +682,13 @@
             log.warn("Fail to revokeNext {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
             return;
         }
+        // Release the L2Lbs if present
+        ports.forEach(port -> {
+            if (isL2LbKey(port)) {
+                String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+                l2LbService.release(new L2LbId(key.deviceId(), Integer.parseInt(l2LbKey)), appId);
+            }
+        });
         flowObjectiveService.next(key.deviceId(), nextObjBuilder.remove(context));
         xconnectNextObjStore.remove(key);
     }
@@ -766,9 +806,17 @@
                 .withMeta(metadata);
 
         for (String port : ports) {
-            NextTreatment nextTreatment = getNextTreatment(key.deviceId(), port);
+            NextTreatment nextTreatment = getNextTreatment(key.deviceId(), port, true);
             if (nextTreatment == null) {
-                log.warn("Unable to create nextObj. Null NextTreatment");
+                // If a L2Lb is used in the XConnect - putting on hold
+                if (isL2LbKey(port)) {
+                    log.warn("Unable to create nextObj. L2Lb not ready");
+                    String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+                    l2LbCache.asMap().putIfAbsent(new L2LbId(key.deviceId(), Integer.parseInt(l2LbKey)),
+                                                  key);
+                } else {
+                    log.warn("Unable to create nextObj. Null NextTreatment");
+                }
                 return null;
             }
             nextObjBuilder.addTreatment(nextTreatment);
@@ -1202,7 +1250,7 @@
         return Sets.newHashSet();
     }
 
-    private NextTreatment getNextTreatment(DeviceId deviceId, String port) {
+    private NextTreatment getNextTreatment(DeviceId deviceId, String port, boolean reserve) {
         // If port is numeric, treat it as regular port.
         // Otherwise try to parse it as load balancer key and get the physical port the LB maps to.
         try {
@@ -1214,7 +1262,17 @@
 
         String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
         try {
-            return IdNextTreatment.of(l2LbService.getL2LbNexts(deviceId, Integer.parseInt(l2LbKey)));
+            NextTreatment idNextTreatment =  IdNextTreatment.of(
+                    l2LbService.getL2LbNext(deviceId, Integer.parseInt(l2LbKey)));
+            // Reserve only one time during next objective creation
+            if (reserve) {
+                L2LbId l2LbId = new L2LbId(deviceId, Integer.parseInt(l2LbKey));
+                if (!l2LbService.reserve(new L2LbId(deviceId, Integer.parseInt(l2LbKey)), appId)) {
+                    log.warn("Reservation failed for {}", l2LbId);
+                    idNextTreatment = null;
+                }
+            }
+            return idNextTreatment;
         } catch (NumberFormatException e) {
             log.debug("Port {} is not load balancer key either. Ignore", port);
         } catch (NullPointerException e) {
@@ -1223,4 +1281,40 @@
 
         return null;
     }
+
+    private boolean isL2LbKey(String l2LbKey) {
+        return l2LbKey.matches(L2LB_PATTERN);
+    }
+
+    private class InternalL2LbListener implements L2LbListener {
+        // Populate xconnect once l2lb is available
+        @Override
+        public void event(L2LbEvent event) {
+            l2lbExecutor.execute(() -> dequeue(event.subject().l2LbId()));
+        }
+        // When we receive INSTALLED l2 load balancing is ready
+        @Override
+        public boolean isRelevant(L2LbEvent event) {
+            return event.type() == L2LbEvent.Type.INSTALLED;
+        }
+    }
+
+    // Invalidate the cache and re-start the xconnect installation
+    private void dequeue(L2LbId l2LbId) {
+        XconnectKey xconnectKey = l2LbCache.getIfPresent(l2LbId);
+        if (xconnectKey == null) {
+            log.trace("{} not present in the cache", l2LbId);
+            return;
+        }
+        log.debug("Dequeue {}", l2LbId);
+        l2LbCache.invalidate(l2LbId);
+        Set<String> ports = Versioned.valueOrNull(xconnectStore.get(xconnectKey));
+        if (ports == null || ports.isEmpty()) {
+            log.warn("Ports not found for XConnect {}", xconnectKey);
+            return;
+        }
+        populateXConnect(xconnectKey, ports);
+        log.trace("L2Lb cache size {}", l2LbCache.size());
+    }
+
 }