Implement L2 load balancer support in XConnectManager

Change-Id: Ib310a1dde72db38abb60273ce66b5f72768bf4ca
diff --git a/apps/segmentrouting/BUILD b/apps/segmentrouting/BUILD
index a6c8aca..d3846f3 100644
--- a/apps/segmentrouting/BUILD
+++ b/apps/segmentrouting/BUILD
@@ -10,6 +10,7 @@
     required_apps = [
         "org.onosproject.route-service",
         "org.onosproject.mcast",
+        "org.onosproject.l2lb",
     ],
     title = "Segment Routing",
     url = "http://onosproject.org",
diff --git a/apps/segmentrouting/app/BUILD b/apps/segmentrouting/app/BUILD
index 70c3589..de9beab 100644
--- a/apps/segmentrouting/app/BUILD
+++ b/apps/segmentrouting/app/BUILD
@@ -4,6 +4,7 @@
     "//apps/route-service/api:onos-apps-route-service-api",
     "//apps/mcast/api:onos-apps-mcast-api",
     "//apps/mcast/cli:onos-apps-mcast-cli",
+    "//apps/l2lb:onos-apps-l2lb",
 ]
 
 TEST_DEPS = TEST_ADAPTERS + [
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/XconnectAddCommand.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/XconnectAddCommand.java
index 762b9f3..a986c30 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/XconnectAddCommand.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/XconnectAddCommand.java
@@ -26,11 +26,12 @@
 import org.onosproject.cli.net.DeviceIdCompleter;
 import org.onosproject.cli.net.PortNumberCompleter;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.PortNumber;
 import org.onosproject.segmentrouting.xconnect.api.XconnectService;
 
 import java.util.Set;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 /**
  * Creates Xconnect.
  */
@@ -50,25 +51,27 @@
     private String vlanIdStr;
 
     @Argument(index = 2, name = "port1",
-            description = "Port 1",
+            description = "Port 1. Can also specify L2 load balancer by L2LB(<key>)",
             required = true, multiValued = false)
     @Completion(PortNumberCompleter.class)
     private String port1Str;
 
     @Argument(index = 3, name = "port2",
-            description = "Port 2",
+            description = "Port 2. Can also specify L2 load balancer by L2LB(<key>)",
             required = true, multiValued = false)
     @Completion(PortNumberCompleter.class)
     private String port2Str;
 
+    private static final String L2LB_PATTERN = "^(\\d*|L2LB\\(\\d*\\))$";
 
     @Override
     protected void doExecute() {
         DeviceId deviceId = DeviceId.deviceId(deviceIdStr);
         VlanId vlanId = VlanId.vlanId(vlanIdStr);
-        PortNumber port1 = PortNumber.portNumber(port1Str);
-        PortNumber port2 = PortNumber.portNumber(port2Str);
-        Set<PortNumber> ports = Sets.newHashSet(port1, port2);
+        Set<String> ports = Sets.newHashSet(port1Str, port2Str);
+
+        checkArgument(port1Str.matches(L2LB_PATTERN), "Wrong L2 load balancer format " + port1Str);
+        checkArgument(port2Str.matches(L2LB_PATTERN), "Wrong L2 load balancer format " + port2Str);
 
         XconnectService xconnectService = get(XconnectService.class);
         xconnectService.addOrUpdateXconnect(deviceId, vlanId, ports);
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectCodec.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectCodec.java
index 77824e8..08558a8 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectCodec.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectCodec.java
@@ -23,7 +23,6 @@
 import org.onosproject.codec.CodecContext;
 import org.onosproject.codec.JsonCodec;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.PortNumber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,10 +51,10 @@
         DeviceId deviceId = DeviceId.deviceId(json.path(DEVICE_ID).asText());
         VlanId vlanId = VlanId.vlanId(json.path(VLAN_ID).asText());
 
-        Set<PortNumber> ports = Sets.newHashSet();
+        Set<String> ports = Sets.newHashSet();
         JsonNode portNodes = json.get(PORTS);
         if (portNodes != null) {
-            portNodes.forEach(portNode -> ports.add(PortNumber.portNumber(portNode.asInt())));
+            portNodes.forEach(portNode -> ports.add(portNode.asText()));
         }
 
         XconnectKey key = new XconnectKey(deviceId, vlanId);
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectDesc.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectDesc.java
index 0eead31..e94c1db 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectDesc.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectDesc.java
@@ -16,7 +16,6 @@
 package org.onosproject.segmentrouting.xconnect.api;
 
 import com.google.common.base.MoreObjects;
-import org.onosproject.net.PortNumber;
 
 import java.util.Objects;
 import java.util.Set;
@@ -26,7 +25,7 @@
  */
 public class XconnectDesc {
     private XconnectKey key;
-    private Set<PortNumber> ports;
+    private Set<String> ports;
 
     /**
      * Constructs new Xconnect description with given device ID and VLAN ID.
@@ -34,7 +33,7 @@
      * @param key Xconnect key
      * @param ports set of ports
      */
-    public XconnectDesc(XconnectKey key, Set<PortNumber> ports) {
+    public XconnectDesc(XconnectKey key, Set<String> ports) {
         this.key = key;
         this.ports = ports;
     }
@@ -53,7 +52,7 @@
      *
      * @return set of ports
      */
-    public Set<PortNumber> ports() {
+    public Set<String> ports() {
         return ports;
     }
 
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectService.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectService.java
index 9ef7672..fe51b06 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectService.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/api/XconnectService.java
@@ -46,7 +46,7 @@
      * @param vlanId VLAN ID
      * @param ports set of ports
      */
-    void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<PortNumber> ports);
+    void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<String> ports);
 
     /**
      * Deletes Xconnect.
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
index b4440f1..aacc16a 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -30,6 +30,7 @@
 import org.onosproject.codec.CodecService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.l2lb.api.L2LbService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
@@ -48,11 +49,14 @@
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultNextTreatment;
 import org.onosproject.net.flowobjective.DefaultObjectiveContext;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.IdNextTreatment;
 import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.NextTreatment;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
@@ -133,19 +137,24 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     HostService hostService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    L2LbService l2LbService;
+
     private static final String APP_NAME = "org.onosproject.xconnect";
     private static final String ERROR_NOT_LEADER = "Not leader controller";
+    private static final String ERROR_NEXT_OBJ_BUILDER = "Unable to construct next objective builder";
+    private static final String ERROR_NEXT_ID = "Unable to get next id";
 
     private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
 
     private ApplicationId appId;
-    private ConsistentMap<XconnectKey, Set<PortNumber>> xconnectStore;
+    private ConsistentMap<XconnectKey, Set<String>> xconnectStore;
     private ConsistentMap<XconnectKey, Integer> xconnectNextObjStore;
 
     private ConsistentMap<VlanNextObjectiveStoreKey, Integer> xconnectMulticastNextStore;
     private ConsistentMap<VlanNextObjectiveStoreKey, List<PortNumber>> xconnectMulticastPortsStore;
 
-    private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
+    private final MapEventListener<XconnectKey, Set<String>> xconnectListener = new XconnectMapListener();
     private ExecutorService xConnectExecutor;
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
@@ -165,7 +174,7 @@
                 .register(XconnectKey.class)
                 .register(VlanNextObjectiveStoreKey.class);
 
-        xconnectStore = storageService.<XconnectKey, Set<PortNumber>>consistentMapBuilder()
+        xconnectStore = storageService.<XconnectKey, Set<String>>consistentMapBuilder()
                 .withName("onos-sr-xconnect")
                 .withRelaxedReadConsistency()
                 .withSerializer(Serializer.using(serializer.build()))
@@ -215,7 +224,7 @@
     }
 
     @Override
-    public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<PortNumber> ports) {
+    public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<String> ports) {
         log.info("Adding or updating xconnect. deviceId={}, vlanId={}, ports={}",
                  deviceId, vlanId, ports);
         final XconnectKey key = new XconnectKey(deviceId, vlanId);
@@ -246,14 +255,14 @@
     @Override
     public boolean hasXconnect(ConnectPoint cp) {
         return getXconnects().stream().anyMatch(desc -> desc.key().deviceId().equals(cp.deviceId())
-                && desc.ports().contains(cp.port())
+                && desc.ports().contains(cp.port().toString())
         );
     }
 
     @Override
     public List<VlanId> getXconnectVlans(DeviceId deviceId, PortNumber port) {
         return getXconnects().stream()
-                .filter(desc -> desc.key().deviceId().equals(deviceId) && desc.ports().contains(port))
+                .filter(desc -> desc.key().deviceId().equals(deviceId) && desc.ports().contains(port.toString()))
                 .map(desc -> desc.key().vlanId())
                 .collect(Collectors.toList());
     }
@@ -287,22 +296,22 @@
         });
     }
 
-    private class XconnectMapListener implements MapEventListener<XconnectKey, Set<PortNumber>> {
+    private class XconnectMapListener implements MapEventListener<XconnectKey, Set<String>> {
         @Override
-        public void event(MapEvent<XconnectKey, Set<PortNumber>> event) {
+        public void event(MapEvent<XconnectKey, Set<String>> event) {
             XconnectKey key = event.key();
-            Versioned<Set<PortNumber>> ports = event.newValue();
-            Versioned<Set<PortNumber>> oldPorts = event.oldValue();
+            Set<String> ports = Versioned.valueOrNull(event.newValue());
+            Set<String> oldPorts = Versioned.valueOrNull(event.oldValue());
 
             switch (event.type()) {
                 case INSERT:
-                    populateXConnect(key, ports.value());
+                    populateXConnect(key, ports);
                     break;
                 case UPDATE:
-                    updateXConnect(key, oldPorts.value(), ports.value());
+                    updateXConnect(key, oldPorts, ports);
                     break;
                 case REMOVE:
-                    revokeXConnect(key, oldPorts.value());
+                    revokeXConnect(key, oldPorts);
                     break;
                 default:
                     break;
@@ -450,14 +459,19 @@
      * @param key   XConnect key
      * @param ports a set of ports to be cross-connected
      */
-    private void populateXConnect(XconnectKey key, Set<PortNumber> ports) {
+    private void populateXConnect(XconnectKey key, Set<String> ports) {
         if (!isLocalLeader(key.deviceId())) {
             log.debug("Abort populating XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
         }
 
+        int nextId = populateNext(key, ports);
+        if (nextId == -1) {
+            log.warn("Fail to populateXConnect {}: {}", key, ERROR_NEXT_ID);
+            return;
+        }
         populateFilter(key, ports);
-        populateFwd(key, populateNext(key, ports));
+        populateFwd(key, nextId);
         populateAcl(key);
     }
 
@@ -467,15 +481,24 @@
      * @param key   XConnect store key
      * @param ports XConnect ports
      */
-    private void populateFilter(XconnectKey key, Set<PortNumber> ports) {
-        ports.forEach(port -> {
-            FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
+    private void populateFilter(XconnectKey key, Set<String> ports) {
+        // FIXME Improve the logic
+        //       If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
+        //       The purpose is to make sure existing XConnect logic can still work on a configured port.
+        boolean filtered = ports.stream()
+                .map(p -> getNextTreatment(key.deviceId(), p))
+                .allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
+
+        ports.stream()
+                .map(p -> getPhysicalPorts(key.deviceId(), p))
+                .flatMap(Set::stream).forEach(port -> {
+            FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port, filtered);
             ObjectiveContext context = new DefaultObjectiveContext(
                     (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
-                                             key, port),
+                            key, port),
                     (objective, error) ->
                             log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
-                                     key, port, error));
+                                    key, port, error));
             flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
         });
     }
@@ -485,14 +508,19 @@
      *
      * @param key   XConnect store key
      * @param ports XConnect ports
+     * @return next id
      */
-    private int populateNext(XconnectKey key, Set<PortNumber> ports) {
+    private int populateNext(XconnectKey key, Set<String> ports) {
         int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
         if (nextId != -1) {
             log.debug("NextObj for {} found, id={}", key, nextId);
             return nextId;
         } else {
             NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports);
+            if (nextObjBuilder == null) {
+                log.warn("Fail to populate {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
+                return -1;
+            }
             ObjectiveContext nextContext = new DefaultObjectiveContext(
                     // To serialize this with kryo
                     (Serializable & Consumer<Objective>) (objective) ->
@@ -544,7 +572,7 @@
      * @param key   XConnect key
      * @param ports XConnect ports
      */
-    private void revokeXConnect(XconnectKey key, Set<PortNumber> ports) {
+    private void revokeXConnect(XconnectKey key, Set<String> ports) {
         if (!isLocalLeader(key.deviceId())) {
             log.debug("Abort revoking XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
@@ -558,6 +586,7 @@
         } else {
             log.warn("NextObj for {} does not exist in the store.", key);
         }
+        revokeFilter(key, ports);
         revokeAcl(key);
     }
 
@@ -567,9 +596,15 @@
      * @param key   XConnect store key
      * @param ports XConnect ports
      */
-    private void revokeFilter(XconnectKey key, Set<PortNumber> ports) {
-        ports.forEach(port -> {
-            FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
+    private void revokeFilter(XconnectKey key, Set<String> ports) {
+        // FIXME Improve the logic
+        //       If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
+        //       The purpose is to make sure existing XConnect logic can still work on a configured port.
+        Set<Set<PortNumber>> portsSet = ports.stream()
+                .map(p -> getPhysicalPorts(key.deviceId(), p)).collect(Collectors.toSet());
+        boolean filtered = portsSet.stream().allMatch(s -> s.size() == 1);
+        portsSet.stream().flatMap(Set::stream).forEach(port -> {
+            FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port, filtered);
             ObjectiveContext context = new DefaultObjectiveContext(
                     (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
                                              key, port),
@@ -588,7 +623,7 @@
      * @param nextId     next objective id
      * @param nextFuture completable future for this next objective operation
      */
-    private void revokeNext(XconnectKey key, Set<PortNumber> ports, int nextId,
+    private void revokeNext(XconnectKey key, Set<String> ports, int nextId,
                             CompletableFuture<ObjectiveError> nextFuture) {
         ObjectiveContext context = new ObjectiveContext() {
             @Override
@@ -608,7 +643,13 @@
                 srService.invalidateNextObj(objective.id());
             }
         };
-        flowObjectiveService.next(key.deviceId(), nextObjBuilder(key, ports, nextId).remove(context));
+
+        NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports, nextId);
+        if (nextObjBuilder == null) {
+            log.warn("Fail to revokeNext {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
+            return;
+        }
+        flowObjectiveService.next(key.deviceId(), nextObjBuilder.remove(context));
         xconnectNextObjStore.remove(key);
     }
 
@@ -662,8 +703,8 @@
      * @param prevPorts previous XConnect ports
      * @param ports     new XConnect ports
      */
-    private void updateXConnect(XconnectKey key, Set<PortNumber> prevPorts,
-                                Set<PortNumber> ports) {
+    private void updateXConnect(XconnectKey key, Set<String> prevPorts,
+                                Set<String> ports) {
         if (!isLocalLeader(key.deviceId())) {
             log.debug("Abort updating XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
@@ -673,12 +714,10 @@
 
         // remove old filter
         prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port ->
-                                                                                 revokeFilter(key,
-                                                                                              ImmutableSet.of(port)));
+                revokeFilter(key, ImmutableSet.of(port)));
         // install new filter
         ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port ->
-                                                                                 populateFilter(key,
-                                                                                                ImmutableSet.of(port)));
+                populateFilter(key, ImmutableSet.of(port)));
 
         CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
         CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
@@ -697,7 +736,12 @@
             nextFuture.thenAcceptAsync(nextStatus -> {
                 if (nextStatus == null) {
                     log.debug("Installing new group and flow for {}", key);
-                    populateFwd(key, populateNext(key, ports));
+                    int newNextId = populateNext(key, ports);
+                    if (newNextId == -1) {
+                        log.warn("Fail to updateXConnect {}: {}", key, ERROR_NEXT_ID);
+                        return;
+                    }
+                    populateFwd(key, newNextId);
                 }
             });
         } else {
@@ -708,23 +752,28 @@
     /**
      * Creates a next objective builder for XConnect with given nextId.
      *
-     * @param key    XConnect key
-     * @param ports  set of XConnect ports
-     * @param nextId next objective id
+     * @param key     XConnect key
+     * @param ports   ports or L2 load balancer key
+     * @param nextId  next objective id
      * @return next objective builder
      */
-    private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports, int nextId) {
+    private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<String> ports, int nextId) {
         TrafficSelector metadata =
                 DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
         NextObjective.Builder nextObjBuilder = DefaultNextObjective
                 .builder().withId(nextId)
                 .withType(NextObjective.Type.BROADCAST).fromApp(appId)
                 .withMeta(metadata);
-        ports.forEach(port -> {
-            TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
-            tBuilder.setOutput(port);
-            nextObjBuilder.addTreatment(tBuilder.build());
-        });
+
+        for (String port : ports) {
+            NextTreatment nextTreatment = getNextTreatment(key.deviceId(), port);
+            if (nextTreatment == null) {
+                log.warn("Unable to create nextObj. Null NextTreatment");
+                return null;
+            }
+            nextObjBuilder.addTreatment(nextTreatment);
+        }
+
         return nextObjBuilder;
     }
 
@@ -735,7 +784,7 @@
      * @param ports set of XConnect ports
      * @return next objective builder
      */
-    private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports) {
+    private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<String> ports) {
         int nextId = flowObjectiveService.allocateNextId();
         return nextObjBuilder(key, ports, nextId);
     }
@@ -794,14 +843,19 @@
      *
      * @param key  XConnect key
      * @param port XConnect ports
+     * @param filtered true if this is a filtered port
      * @return next objective builder
      */
-    private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port) {
+    private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port, boolean filtered) {
         FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
         fob.withKey(Criteria.matchInPort(port))
-                .addCondition(Criteria.matchVlanId(key.vlanId()))
                 .addCondition(Criteria.matchEthDst(MacAddress.NONE))
                 .withPriority(XCONNECT_PRIORITY);
+        if (filtered) {
+            fob.addCondition(Criteria.matchVlanId(key.vlanId()));
+        } else {
+            fob.addCondition(Criteria.matchVlanId(VlanId.ANY));
+        }
         return fob.permit().fromApp(appId);
     }
 
@@ -1127,4 +1181,46 @@
         return true;
     }
 
+    private Set<PortNumber> getPhysicalPorts(DeviceId deviceId, String port) {
+        // If port is numeric, treat it as regular port.
+        // Otherwise try to parse it as load balancer key and get the physical port the LB maps to.
+        try {
+            return Sets.newHashSet(PortNumber.portNumber(Integer.parseInt(port)));
+        } catch (NumberFormatException e) {
+            log.debug("Port {} is not numeric. Try to parse it as load balancer key", port);
+        }
+
+        String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+        try {
+            return Sets.newHashSet(l2LbService.getL2Lb(deviceId, Integer.parseInt(l2LbKey)).ports());
+        } catch (NumberFormatException e) {
+            log.debug("Port {} is not load balancer key either. Ignore", port);
+        } catch (NullPointerException e) {
+            log.debug("L2 load balancer {} not found. Ignore", l2LbKey);
+        }
+
+        return Sets.newHashSet();
+    }
+
+    private NextTreatment getNextTreatment(DeviceId deviceId, String port) {
+        // If port is numeric, treat it as regular port.
+        // Otherwise try to parse it as load balancer key and get the physical port the LB maps to.
+        try {
+            PortNumber portNumber = PortNumber.portNumber(Integer.parseInt(port));
+            return DefaultNextTreatment.of(DefaultTrafficTreatment.builder().setOutput(portNumber).build());
+        } catch (NumberFormatException e) {
+            log.debug("Port {} is not numeric. Try to parse it as load balancer key", port);
+        }
+
+        String l2LbKey = port.substring("L2LB(".length(), port.length() - 1);
+        try {
+            return IdNextTreatment.of(l2LbService.getL2LbNexts(deviceId, Integer.parseInt(l2LbKey)));
+        } catch (NumberFormatException e) {
+            log.debug("Port {} is not load balancer key either. Ignore", port);
+        } catch (NullPointerException e) {
+            log.debug("L2 load balancer {} not found. Ignore", l2LbKey);
+        }
+
+        return null;
+    }
 }