Implement L2 load balancer support in XConnectManager

Change-Id: Ib310a1dde72db38abb60273ce66b5f72768bf4ca
diff --git a/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
index b4440f1..aacc16a 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -30,6 +30,7 @@
 import org.onosproject.codec.CodecService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.l2lb.api.L2LbService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
@@ -48,11 +49,14 @@
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultNextTreatment;
 import org.onosproject.net.flowobjective.DefaultObjectiveContext;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.IdNextTreatment;
 import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.NextTreatment;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
@@ -133,19 +137,24 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     HostService hostService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    L2LbService l2LbService;
+
     private static final String APP_NAME = "org.onosproject.xconnect";
     private static final String ERROR_NOT_LEADER = "Not leader controller";
+    private static final String ERROR_NEXT_OBJ_BUILDER = "Unable to construct next objective builder";
+    private static final String ERROR_NEXT_ID = "Unable to get next id";
 
     private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
 
     private ApplicationId appId;
-    private ConsistentMap<XconnectKey, Set<PortNumber>> xconnectStore;
+    private ConsistentMap<XconnectKey, Set<String>> xconnectStore;
     private ConsistentMap<XconnectKey, Integer> xconnectNextObjStore;
 
     private ConsistentMap<VlanNextObjectiveStoreKey, Integer> xconnectMulticastNextStore;
     private ConsistentMap<VlanNextObjectiveStoreKey, List<PortNumber>> xconnectMulticastPortsStore;
 
-    private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
+    private final MapEventListener<XconnectKey, Set<String>> xconnectListener = new XconnectMapListener();
     private ExecutorService xConnectExecutor;
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
@@ -165,7 +174,7 @@
                 .register(XconnectKey.class)
                 .register(VlanNextObjectiveStoreKey.class);
 
-        xconnectStore = storageService.<XconnectKey, Set<PortNumber>>consistentMapBuilder()
+        xconnectStore = storageService.<XconnectKey, Set<String>>consistentMapBuilder()
                 .withName("onos-sr-xconnect")
                 .withRelaxedReadConsistency()
                 .withSerializer(Serializer.using(serializer.build()))
@@ -215,7 +224,7 @@
     }
 
     @Override
-    public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<PortNumber> ports) {
+    public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<String> ports) {
         log.info("Adding or updating xconnect. deviceId={}, vlanId={}, ports={}",
                  deviceId, vlanId, ports);
         final XconnectKey key = new XconnectKey(deviceId, vlanId);
@@ -246,14 +255,14 @@
     @Override
     public boolean hasXconnect(ConnectPoint cp) {
         return getXconnects().stream().anyMatch(desc -> desc.key().deviceId().equals(cp.deviceId())
-                && desc.ports().contains(cp.port())
+                && desc.ports().contains(cp.port().toString())
         );
     }
 
     @Override
     public List<VlanId> getXconnectVlans(DeviceId deviceId, PortNumber port) {
         return getXconnects().stream()
-                .filter(desc -> desc.key().deviceId().equals(deviceId) && desc.ports().contains(port))
+                .filter(desc -> desc.key().deviceId().equals(deviceId) && desc.ports().contains(port.toString()))
                 .map(desc -> desc.key().vlanId())
                 .collect(Collectors.toList());
     }
@@ -287,22 +296,22 @@
         });
     }
 
-    private class XconnectMapListener implements MapEventListener<XconnectKey, Set<PortNumber>> {
+    private class XconnectMapListener implements MapEventListener<XconnectKey, Set<String>> {
         @Override
-        public void event(MapEvent<XconnectKey, Set<PortNumber>> event) {
+        public void event(MapEvent<XconnectKey, Set<String>> event) {
             XconnectKey key = event.key();
-            Versioned<Set<PortNumber>> ports = event.newValue();
-            Versioned<Set<PortNumber>> oldPorts = event.oldValue();
+            Set<String> ports = Versioned.valueOrNull(event.newValue());
+            Set<String> oldPorts = Versioned.valueOrNull(event.oldValue());
 
             switch (event.type()) {
                 case INSERT:
-                    populateXConnect(key, ports.value());
+                    populateXConnect(key, ports);
                     break;
                 case UPDATE:
-                    updateXConnect(key, oldPorts.value(), ports.value());
+                    updateXConnect(key, oldPorts, ports);
                     break;
                 case REMOVE:
-                    revokeXConnect(key, oldPorts.value());
+                    revokeXConnect(key, oldPorts);
                     break;
                 default:
                     break;
@@ -450,14 +459,19 @@
      * @param key   XConnect key
      * @param ports a set of ports to be cross-connected
      */
-    private void populateXConnect(XconnectKey key, Set<PortNumber> ports) {
+    private void populateXConnect(XconnectKey key, Set<String> ports) {
         if (!isLocalLeader(key.deviceId())) {
             log.debug("Abort populating XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
         }
 
+        int nextId = populateNext(key, ports);
+        if (nextId == -1) {
+            log.warn("Fail to populateXConnect {}: {}", key, ERROR_NEXT_ID);
+            return;
+        }
         populateFilter(key, ports);
-        populateFwd(key, populateNext(key, ports));
+        populateFwd(key, nextId);
         populateAcl(key);
     }
 
@@ -467,15 +481,24 @@
      * @param key   XConnect store key
      * @param ports XConnect ports
      */
-    private void populateFilter(XconnectKey key, Set<PortNumber> ports) {
-        ports.forEach(port -> {
-            FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
+    private void populateFilter(XconnectKey key, Set<String> ports) {
+        // FIXME Improve the logic
+        //       If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
+        //       The purpose is to make sure existing XConnect logic can still work on a configured port.
+        boolean filtered = ports.stream()
+                .map(p -> getNextTreatment(key.deviceId(), p))
+                .allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
+
+        ports.stream()
+                .map(p -> getPhysicalPorts(key.deviceId(), p))
+                .flatMap(Set::stream).forEach(port -> {
+            FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port, filtered);
             ObjectiveContext context = new DefaultObjectiveContext(
                     (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
-                                             key, port),
+                            key, port),
                     (objective, error) ->
                             log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
-                                     key, port, error));
+                                    key, port, error));
             flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
         });
     }
@@ -485,14 +508,19 @@
      *
      * @param key   XConnect store key
      * @param ports XConnect ports
+     * @return next id
      */
-    private int populateNext(XconnectKey key, Set<PortNumber> ports) {
+    private int populateNext(XconnectKey key, Set<String> ports) {
         int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
         if (nextId != -1) {
             log.debug("NextObj for {} found, id={}", key, nextId);
             return nextId;
         } else {
             NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports);
+            if (nextObjBuilder == null) {
+                log.warn("Fail to populate {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
+                return -1;
+            }
             ObjectiveContext nextContext = new DefaultObjectiveContext(
                     // To serialize this with kryo
                     (Serializable & Consumer<Objective>) (objective) ->
@@ -544,7 +572,7 @@
      * @param key   XConnect key
      * @param ports XConnect ports
      */
-    private void revokeXConnect(XconnectKey key, Set<PortNumber> ports) {
+    private void revokeXConnect(XconnectKey key, Set<String> ports) {
         if (!isLocalLeader(key.deviceId())) {
             log.debug("Abort revoking XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
@@ -558,6 +586,7 @@
         } else {
             log.warn("NextObj for {} does not exist in the store.", key);
         }
+        revokeFilter(key, ports);
         revokeAcl(key);
     }
 
@@ -567,9 +596,15 @@
      * @param key   XConnect store key
      * @param ports XConnect ports
      */
-    private void revokeFilter(XconnectKey key, Set<PortNumber> ports) {
-        ports.forEach(port -> {
-            FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
+    private void revokeFilter(XconnectKey key, Set<String> ports) {
+        // FIXME Improve the logic
+        //       If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
+        //       The purpose is to make sure existing XConnect logic can still work on a configured port.
+        Set<Set<PortNumber>> portsSet = ports.stream()
+                .map(p -> getPhysicalPorts(key.deviceId(), p)).collect(Collectors.toSet());
+        boolean filtered = portsSet.stream().allMatch(s -> s.size() == 1);
+        portsSet.stream().flatMap(Set::stream).forEach(port -> {
+            FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port, filtered);
             ObjectiveContext context = new DefaultObjectiveContext(
                     (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
                                              key, port),
@@ -588,7 +623,7 @@
      * @param nextId     next objective id
      * @param nextFuture completable future for this next objective operation
      */
-    private void revokeNext(XconnectKey key, Set<PortNumber> ports, int nextId,
+    private void revokeNext(XconnectKey key, Set<String> ports, int nextId,
                             CompletableFuture<ObjectiveError> nextFuture) {
         ObjectiveContext context = new ObjectiveContext() {
             @Override
@@ -608,7 +643,13 @@
                 srService.invalidateNextObj(objective.id());
             }
         };
-        flowObjectiveService.next(key.deviceId(), nextObjBuilder(key, ports, nextId).remove(context));
+
+        NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports, nextId);
+        if (nextObjBuilder == null) {
+            log.warn("Fail to revokeNext {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
+            return;
+        }
+        flowObjectiveService.next(key.deviceId(), nextObjBuilder.remove(context));
         xconnectNextObjStore.remove(key);
     }
 
@@ -662,8 +703,8 @@
      * @param prevPorts previous XConnect ports
      * @param ports     new XConnect ports
      */
-    private void updateXConnect(XconnectKey key, Set<PortNumber> prevPorts,
-                                Set<PortNumber> ports) {
+    private void updateXConnect(XconnectKey key, Set<String> prevPorts,
+                                Set<String> ports) {
         if (!isLocalLeader(key.deviceId())) {
             log.debug("Abort updating XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
@@ -673,12 +714,10 @@
 
         // remove old filter
         prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port ->
-                                                                                 revokeFilter(key,
-                                                                                              ImmutableSet.of(port)));
+                revokeFilter(key, ImmutableSet.of(port)));
         // install new filter
         ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port ->
-                                                                                 populateFilter(key,
-                                                                                                ImmutableSet.of(port)));
+                populateFilter(key, ImmutableSet.of(port)));
 
         CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
         CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
@@ -697,7 +736,12 @@
             nextFuture.thenAcceptAsync(nextStatus -> {
                 if (nextStatus == null) {
                     log.debug("Installing new group and flow for {}", key);
-                    populateFwd(key, populateNext(key, ports));
+                    int newNextId = populateNext(key, ports);
+                    if (newNextId == -1) {
+                        log.warn("Fail to updateXConnect {}: {}", key, ERROR_NEXT_ID);
+                        return;
+                    }
+                    populateFwd(key, newNextId);
                 }
             });
         } else {
@@ -708,23 +752,28 @@
     /**
      * Creates a next objective builder for XConnect with given nextId.
      *
-     * @param key    XConnect key
-     * @param ports  set of XConnect ports
-     * @param nextId next objective id
+     * @param key     XConnect key
+     * @param ports   ports or L2 load balancer key
+     * @param nextId  next objective id
      * @return next objective builder
      */
-    private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports, int nextId) {
+    private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<String> ports, int nextId) {
         TrafficSelector metadata =
                 DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
         NextObjective.Builder nextObjBuilder = DefaultNextObjective
                 .builder().withId(nextId)
                 .withType(NextObjective.Type.BROADCAST).fromApp(appId)
                 .withMeta(metadata);
-        ports.forEach(port -> {
-            TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
-            tBuilder.setOutput(port);
-            nextObjBuilder.addTreatment(tBuilder.build());
-        });
+
+        for (String port : ports) {
+            NextTreatment nextTreatment = getNextTreatment(key.deviceId(), port);
+            if (nextTreatment == null) {
+                log.warn("Unable to create nextObj. Null NextTreatment");
+                return null;
+            }
+            nextObjBuilder.addTreatment(nextTreatment);
+        }
+
         return nextObjBuilder;
     }
 
@@ -735,7 +784,7 @@
      * @param ports set of XConnect ports
      * @return next objective builder
      */
-    private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports) {
+    private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<String> ports) {
         int nextId = flowObjectiveService.allocateNextId();
         return nextObjBuilder(key, ports, nextId);
     }
@@ -794,14 +843,19 @@
      *
      * @param key  XConnect key
      * @param port XConnect ports
+     * @param filtered true if this is a filtered port
      * @return next objective builder
      */
-    private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port) {
+    private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port, boolean filtered) {
         FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
         fob.withKey(Criteria.matchInPort(port))
-                .addCondition(Criteria.matchVlanId(key.vlanId()))
                 .addCondition(Criteria.matchEthDst(MacAddress.NONE))
                 .withPriority(XCONNECT_PRIORITY);
+        if (filtered) {
+            fob.addCondition(Criteria.matchVlanId(key.vlanId()));
+        } else {
+            fob.addCondition(Criteria.matchVlanId(VlanId.ANY));
+        }
         return fob.permit().fromApp(appId);
     }
 
@@ -1127,4 +1181,46 @@
         return true;
     }
 
+    private Set<PortNumber> getPhysicalPorts(DeviceId deviceId, String port) {
+        // If port is numeric, treat it as regular port.
+        // Otherwise try to parse it as load balancer key and get the physical port the LB maps to.
+        try {
+            return Sets.newHashSet(PortNumber.portNumber(Integer.parseInt(port)));
+        } catch (NumberFormatException e) {
+            log.debug("Port {} is not numeric. Try to parse it as load balancer key", port);
+        }
+
+        String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+        try {
+            return Sets.newHashSet(l2LbService.getL2Lb(deviceId, Integer.parseInt(l2LbKey)).ports());
+        } catch (NumberFormatException e) {
+            log.debug("Port {} is not load balancer key either. Ignore", port);
+        } catch (NullPointerException e) {
+            log.debug("L2 load balancer {} not found. Ignore", l2LbKey);
+        }
+
+        return Sets.newHashSet();
+    }
+
+    private NextTreatment getNextTreatment(DeviceId deviceId, String port) {
+        // If port is numeric, treat it as regular port.
+        // Otherwise try to parse it as load balancer key and get the physical port the LB maps to.
+        try {
+            PortNumber portNumber = PortNumber.portNumber(Integer.parseInt(port));
+            return DefaultNextTreatment.of(DefaultTrafficTreatment.builder().setOutput(portNumber).build());
+        } catch (NumberFormatException e) {
+            log.debug("Port {} is not numeric. Try to parse it as load balancer key", port);
+        }
+
+        String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+        try {
+            return IdNextTreatment.of(l2LbService.getL2LbNexts(deviceId, Integer.parseInt(l2LbKey)));
+        } catch (NumberFormatException e) {
+            log.debug("Port {} is not load balancer key either. Ignore", port);
+        } catch (NullPointerException e) {
+            log.debug("L2 load balancer {} not found. Ignore", l2LbKey);
+        }
+
+        return null;
+    }
 }