Extends XConnectManager

- Listens for L2LbEvent
- Leverages a cache to wait l2lb nextid

Includes a small change in the L2LbService interface

Change-Id: Id545e2228dde013620b4b415d1d2619027ef15d2
diff --git a/apps/l2lb/src/main/java/org/onosproject/l2lb/api/L2LbService.java b/apps/l2lb/src/main/java/org/onosproject/l2lb/api/L2LbService.java
index db5f20c..ce396ac 100644
--- a/apps/l2lb/src/main/java/org/onosproject/l2lb/api/L2LbService.java
+++ b/apps/l2lb/src/main/java/org/onosproject/l2lb/api/L2LbService.java
@@ -56,7 +56,7 @@
      * @param key L2 load balancer key
      * @return next ID
      */
-    int getL2LbNexts(DeviceId deviceId, int key);
+    int getL2LbNext(DeviceId deviceId, int key);
 
     /**
      * Reserves a l2 load balancer. Only one application
diff --git a/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java b/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java
index 6e3c6b6..e5973aa 100644
--- a/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java
+++ b/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java
@@ -226,7 +226,7 @@
     }
 
     @Override
-    public int getL2LbNexts(DeviceId deviceId, int key) {
+    public int getL2LbNext(DeviceId deviceId, int key) {
         return Versioned.valueOrNull(l2LbNextStore.get(new L2LbId(deviceId, key)));
     }
 
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
index ea34ff2..9ba6f81 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -15,6 +15,9 @@
  */
 package org.onosproject.segmentrouting.xconnect.impl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
@@ -28,9 +31,15 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.codec.CodecService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.l2lb.api.L2LbEvent;
+import org.onosproject.l2lb.api.L2LbId;
+import org.onosproject.l2lb.api.L2LbListener;
 import org.onosproject.l2lb.api.L2LbService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
@@ -91,10 +100,13 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 
 @Service
@@ -122,6 +134,12 @@
     public MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     public SegmentRoutingService srService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -134,7 +152,7 @@
     L2LbService l2LbService;
 
     private static final String APP_NAME = "org.onosproject.xconnect";
-    private static final String ERROR_NOT_MASTER = "Not master controller";
+    private static final String ERROR_NOT_LEADER = "Not leader controller";
     private static final String ERROR_NEXT_OBJ_BUILDER = "Unable to construct next objective builder";
     private static final String ERROR_NEXT_ID = "Unable to get next id";
 
@@ -155,6 +173,16 @@
     private final HostListener hostListener = new InternalHostListener();
     private ExecutorService hostEventExecutor;
 
+    // Wait time for the cache
+    private static final int WAIT_TIME_MS = 15000;
+    //The cache is implemented as buffer for waiting the installation of L2Lb when present
+    private Cache<L2LbId, XconnectKey> l2LbCache;
+    // Executor for the cache
+    private ScheduledExecutorService l2lbExecutor;
+    // Pattern for L2Lb key
+    private static final String L2LB_PATTERN = "^(L2LB\\(\\d*\\))$";
+    // We need to listen for some events to properly installed the xconnect with l2lb
+    private final L2LbListener l2LbListener = new InternalL2LbListener();
 
     @Activate
     void activate() {
@@ -191,14 +219,24 @@
 
         deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
                 groupedThreads("sr-xconnect-device-event", "%d", log));
-
         deviceService.addListener(deviceListener);
 
         hostEventExecutor = Executors.newSingleThreadExecutor(
                 groupedThreads("sr-xconnect-host-event", "%d", log));
-
         hostService.addListener(hostListener);
 
+        l2LbCache = CacheBuilder.newBuilder()
+                .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
+                .removalListener((RemovalNotification<L2LbId, XconnectKey> notification) ->
+                                         log.debug("L2Lb cache removal event. l2LbId={}, xConnectKey={}",
+                                                   notification.getKey(), notification.getValue())).build();
+        l2lbExecutor = newScheduledThreadPool(1,
+                                              groupedThreads("l2LbCacheWorker", "l2LbCacheWorker-%d", log));
+        // Let's schedule the cleanup of the cache
+        l2lbExecutor.scheduleAtFixedRate(l2LbCache::cleanUp, 0,
+                                         WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+        l2LbService.addListener(l2LbListener);
+
         log.info("Started");
     }
 
@@ -211,6 +249,7 @@
 
         deviceEventExecutor.shutdown();
         hostEventExecutor.shutdown();
+        l2lbExecutor.shutdown();
 
         log.info("Stopped");
     }
@@ -317,29 +356,30 @@
     }
 
     private class InternalDeviceListener implements DeviceListener {
+        // We want to manage only a subset of events and if we are the leader
         @Override
         public void event(DeviceEvent event) {
             deviceEventExecutor.execute(() -> {
                 DeviceId deviceId = event.subject().id();
-                if (!mastershipService.isLocalMaster(deviceId)) {
+                if (!isLocalLeader(deviceId)) {
+                    log.debug("Not the leader of {}. Skip event {}", deviceId, event);
                     return;
                 }
-
-                switch (event.type()) {
-                    case DEVICE_ADDED:
-                    case DEVICE_AVAILABILITY_CHANGED:
-                    case DEVICE_UPDATED:
-                        if (deviceService.isAvailable(deviceId)) {
-                            init(deviceId);
-                        } else {
-                            cleanup(deviceId);
-                        }
-                        break;
-                    default:
-                        break;
+                // Populate or revoke according to the device availability
+                if (deviceService.isAvailable(deviceId)) {
+                    init(deviceId);
+                } else {
+                    cleanup(deviceId);
                 }
             });
         }
+        // Some events related to the devices are skipped
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            return event.type() == DeviceEvent.Type.DEVICE_ADDED ||
+                    event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+                    event.type() == DeviceEvent.Type.DEVICE_UPDATED;
+        }
     }
 
     private class InternalHostListener implements HostListener {
@@ -454,8 +494,8 @@
      * @param ports a set of ports to be cross-connected
      */
     private void populateXConnect(XconnectKey key, Set<String> ports) {
-        if (!mastershipService.isLocalMaster(key.deviceId())) {
-            log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
+        if (!isLocalLeader(key.deviceId())) {
+            log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
         }
 
@@ -480,7 +520,7 @@
         //       If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
         //       The purpose is to make sure existing XConnect logic can still work on a configured port.
         boolean filtered = ports.stream()
-                .map(p -> getNextTreatment(key.deviceId(), p))
+                .map(p -> getNextTreatment(key.deviceId(), p, false))
                 .allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
 
         ports.stream()
@@ -567,8 +607,8 @@
      * @param ports XConnect ports
      */
     private void revokeXConnect(XconnectKey key, Set<String> ports) {
-        if (!mastershipService.isLocalMaster(key.deviceId())) {
-            log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
+        if (!isLocalLeader(key.deviceId())) {
+            log.info("Abort revoking XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
         }
 
@@ -642,6 +682,13 @@
             log.warn("Fail to revokeNext {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
             return;
         }
+        // Release the L2Lbs if present
+        ports.forEach(port -> {
+            if (isL2LbKey(port)) {
+                String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+                l2LbService.release(new L2LbId(key.deviceId(), Integer.parseInt(l2LbKey)), appId);
+            }
+        });
         flowObjectiveService.next(key.deviceId(), nextObjBuilder.remove(context));
         xconnectNextObjStore.remove(key);
     }
@@ -698,6 +745,10 @@
      */
     private void updateXConnect(XconnectKey key, Set<String> prevPorts,
                                 Set<String> ports) {
+        if (!isLocalLeader(key.deviceId())) {
+            log.info("Abort updating XConnect {}: {}", key, ERROR_NOT_LEADER);
+            return;
+        }
         // NOTE: ACL flow doesn't include port information. No need to update it.
         //       Pair port is built-in and thus not going to change. No need to update it.
 
@@ -755,9 +806,17 @@
                 .withMeta(metadata);
 
         for (String port : ports) {
-            NextTreatment nextTreatment = getNextTreatment(key.deviceId(), port);
+            NextTreatment nextTreatment = getNextTreatment(key.deviceId(), port, true);
             if (nextTreatment == null) {
-                log.warn("Unable to create nextObj. Null NextTreatment");
+                // If a L2Lb is used in the XConnect - putting on hold
+                if (isL2LbKey(port)) {
+                    log.warn("Unable to create nextObj. L2Lb not ready");
+                    String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+                    l2LbCache.asMap().putIfAbsent(new L2LbId(key.deviceId(), Integer.parseInt(l2LbKey)),
+                                                  key);
+                } else {
+                    log.warn("Unable to create nextObj. Null NextTreatment");
+                }
                 return null;
             }
             nextObjBuilder.addTreatment(nextTreatment);
@@ -859,7 +918,7 @@
     private void updateL2Flooding(DeviceId deviceId, final PortNumber port, VlanId vlanId, boolean install) {
 
         // Ensure mastership on device
-        if (!mastershipService.isLocalMaster(deviceId)) {
+        if (!isLocalLeader(deviceId)) {
             return;
         }
 
@@ -1186,7 +1245,7 @@
         return Sets.newHashSet();
     }
 
-    private NextTreatment getNextTreatment(DeviceId deviceId, String port) {
+    private NextTreatment getNextTreatment(DeviceId deviceId, String port, boolean reserve) {
         // If port is numeric, treat it as regular port.
         // Otherwise try to parse it as load balancer key and get the physical port the LB maps to.
         try {
@@ -1198,7 +1257,17 @@
 
         String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
         try {
-            return IdNextTreatment.of(l2LbService.getL2LbNexts(deviceId, Integer.parseInt(l2LbKey)));
+            NextTreatment idNextTreatment =  IdNextTreatment.of(
+                    l2LbService.getL2LbNext(deviceId, Integer.parseInt(l2LbKey)));
+            // Reserve only one time during next objective creation
+            if (reserve) {
+                L2LbId l2LbId = new L2LbId(deviceId, Integer.parseInt(l2LbKey));
+                if (!l2LbService.reserve(new L2LbId(deviceId, Integer.parseInt(l2LbKey)), appId)) {
+                    log.warn("Reservation failed for {}", l2LbId);
+                    idNextTreatment = null;
+                }
+            }
+            return idNextTreatment;
         } catch (NumberFormatException e) {
             log.debug("Port {} is not load balancer key either. Ignore", port);
         } catch (NullPointerException e) {
@@ -1207,4 +1276,56 @@
 
         return null;
     }
+
+    // Custom-built function, when the device is not available we need a fallback mechanism
+    private boolean isLocalLeader(DeviceId deviceId) {
+        if (!mastershipService.isLocalMaster(deviceId)) {
+            // When the device is available we just check the mastership
+            if (deviceService.isAvailable(deviceId)) {
+                return false;
+            }
+            // Fallback with Leadership service - device id is used as a topic
+            NodeId leader = leadershipService.runForLeadership(
+                    deviceId.toString()).leaderNodeId();
+            // Verify if this node is the leader
+            return clusterService.getLocalNode().id().equals(leader);
+        }
+        return true;
+    }
+
+    private boolean isL2LbKey(String l2LbKey) {
+        return l2LbKey.matches(L2LB_PATTERN);
+    }
+
+    private class InternalL2LbListener implements L2LbListener {
+        // Populate xconnect once l2lb is available
+        @Override
+        public void event(L2LbEvent event) {
+            l2lbExecutor.execute(() -> dequeue(event.subject().l2LbId()));
+        }
+        // When we receive INSTALLED l2 load balancing is ready
+        @Override
+        public boolean isRelevant(L2LbEvent event) {
+            return event.type() == L2LbEvent.Type.INSTALLED;
+        }
+    }
+
+    // Invalidate the cache and re-start the xconnect installation
+    private void dequeue(L2LbId l2LbId) {
+        XconnectKey xconnectKey = l2LbCache.getIfPresent(l2LbId);
+        if (xconnectKey == null) {
+            log.trace("{} not present in the cache", l2LbId);
+            return;
+        }
+        log.debug("Dequeue {}", l2LbId);
+        l2LbCache.invalidate(l2LbId);
+        Set<String> ports = Versioned.valueOrNull(xconnectStore.get(xconnectKey));
+        if (ports == null || ports.isEmpty()) {
+            log.warn("Ports not found for XConnect {}", xconnectKey);
+            return;
+        }
+        populateXConnect(xconnectKey, ports);
+        log.trace("L2Lb cache size {}", l2LbCache.size());
+    }
+
 }