Improve Xconnect to support BSOD use case

- Push ACL flow to ignore xconnect VLAN
- Add pair port to the L2FG internally
- Remove the ETH_TYPE restriction in OFDPA processVersatile
- Remove the NOACTION warning in OFDPA processVersatile
- Do not push bridging flow for hosts that have wrong VLAN tag

Known issue:
- flooding issue on the pair port
- tagged host will be learnt before xconnect config is pushed (will be ignored by SR)

Change-Id: I30e4f46e54daa0f7bd349419df100523dceb2c4c
diff --git a/app/src/main/java/org/onosproject/segmentrouting/XConnectHandler.java b/app/src/main/java/org/onosproject/segmentrouting/XConnectHandler.java
index aa60684..96c1a38 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/XConnectHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/XConnectHandler.java
@@ -17,7 +17,9 @@
 package org.onosproject.segmentrouting;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
@@ -43,7 +45,6 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,18 +60,15 @@
     private static final String CONFIG_NOT_FOUND = "XConnect config not found";
     private static final String NOT_MASTER = "Not master controller";
     private final SegmentRoutingManager srManager;
-    private final StorageService storageService;
     private final ConsistentMap<XConnectStoreKey, NextObjective> xConnectNextObjStore;
-    private final KryoNamespace.Builder xConnectKryo;
 
-    protected XConnectHandler(SegmentRoutingManager srManager) {
+    XConnectHandler(SegmentRoutingManager srManager) {
         this.srManager = srManager;
-        this.storageService = srManager.storageService;
-        xConnectKryo = new KryoNamespace.Builder()
+        KryoNamespace.Builder xConnectKryo = new KryoNamespace.Builder()
                 .register(KryoNamespaces.API)
                 .register(XConnectStoreKey.class)
                 .register(NextObjContext.class);
-        xConnectNextObjStore = storageService
+        xConnectNextObjStore = srManager.storageService
                 .<XConnectStoreKey, NextObjective>consistentMapBuilder()
                 .withName("onos-xconnect-nextobj-store")
                 .withSerializer(Serializer.using(xConnectKryo.build()))
@@ -91,9 +89,7 @@
             return;
         }
 
-        config.getXconnects(deviceId).forEach(key -> {
-            populateXConnect(key, config.getPorts(key));
-        });
+        config.getXconnects(deviceId).forEach(key -> populateXConnect(key, config.getPorts(key)));
     }
 
     /**
@@ -101,12 +97,10 @@
      *
      * @param event network config added event
      */
-    protected void processXConnectConfigAdded(NetworkConfigEvent event) {
+    void processXConnectConfigAdded(NetworkConfigEvent event) {
         log.info("Processing XConnect CONFIG_ADDED");
         XConnectConfig config = (XConnectConfig) event.config().get();
-        config.getXconnects().forEach(key -> {
-            populateXConnect(key, config.getPorts(key));
-        });
+        config.getXconnects().forEach(key -> populateXConnect(key, config.getPorts(key)));
     }
 
     /**
@@ -114,7 +108,7 @@
      *
      * @param event network config updated event
      */
-    protected void processXConnectConfigUpdated(NetworkConfigEvent event) {
+    void processXConnectConfigUpdated(NetworkConfigEvent event) {
         log.info("Processing XConnect CONFIG_UPDATED");
         XConnectConfig prevConfig = (XConnectConfig) event.prevConfig().get();
         XConnectConfig config = (XConnectConfig) event.config().get();
@@ -130,15 +124,10 @@
                         !config.getPorts(key).equals(prevConfig.getPorts(key)))
                 .collect(Collectors.toSet());
 
-        pendingRemove.forEach(key -> {
-            revokeXConnect(key, prevConfig.getPorts(key));
-        });
-        pendingAdd.forEach(key -> {
-            populateXConnect(key, config.getPorts(key));
-        });
-        pendingUpdate.forEach(key -> {
-            updateXConnect(key, prevConfig.getPorts(key), config.getPorts(key));
-        });
+        pendingRemove.forEach(key -> revokeXConnect(key, prevConfig.getPorts(key)));
+        pendingAdd.forEach(key -> populateXConnect(key, config.getPorts(key)));
+        pendingUpdate.forEach(key ->
+                updateXConnect(key, prevConfig.getPorts(key), config.getPorts(key)));
     }
 
     /**
@@ -146,7 +135,7 @@
      *
      * @param event network config removed event
      */
-    protected void processXConnectConfigRemoved(NetworkConfigEvent event) {
+    void processXConnectConfigRemoved(NetworkConfigEvent event) {
         log.info("Processing XConnect CONFIG_REMOVED");
         XConnectConfig prevConfig = (XConnectConfig) event.prevConfig().get();
         prevConfig.getXconnects().forEach(key -> {
@@ -183,8 +172,11 @@
             log.info("Abort populating XConnect {}: {}", key, NOT_MASTER);
             return;
         }
+
+        ports = addPairPort(key.deviceId(), ports);
         populateFilter(key, ports);
         populateFwd(key, populateNext(key, ports));
+        populateAcl(key);
     }
 
     /**
@@ -213,7 +205,7 @@
      * @param ports XConnect ports
      */
     private NextObjective populateNext(XConnectStoreKey key, Set<PortNumber> ports) {
-        NextObjective nextObj = null;
+        NextObjective nextObj;
         if (xConnectNextObjStore.containsKey(key)) {
             nextObj = xConnectNextObjStore.get(key).value();
             log.debug("NextObj for {} found, id={}", key, nextObj.id());
@@ -229,7 +221,7 @@
     }
 
     /**
-     * Populates forwarding objectives for given XConnect.
+     * Populates bridging forwarding objectives for given XConnect.
      *
      * @param key XConnect store key
      * @param nextObj next objective
@@ -244,6 +236,20 @@
     }
 
     /**
+     * Populates ACL forwarding objectives for given XConnect.
+     *
+     * @param key XConnect store key
+     */
+    private void populateAcl(XConnectStoreKey key) {
+        ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
+        ObjectiveContext aclContext = new DefaultObjectiveContext(
+                (objective) -> log.debug("XConnect AclObj for {} populated", key),
+                (objective, error) ->
+                        log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
+        srManager.flowObjectiveService.forward(key.deviceId(), aclObjBuilder.add(aclContext));
+    }
+
+    /**
      * Revokes XConnect groups and flows for given key.
      *
      * @param key XConnect key
@@ -255,6 +261,7 @@
             return;
         }
 
+        ports = addPairPort(key.deviceId(), ports);
         revokeFilter(key, ports);
         if (xConnectNextObjStore.containsKey(key)) {
             NextObjective nextObj = xConnectNextObjStore.get(key).value();
@@ -263,6 +270,7 @@
         } else {
             log.warn("NextObj for {} does not exist in the store.", key);
         }
+        revokeAcl(key);
     }
 
     /**
@@ -316,7 +324,7 @@
     }
 
     /**
-     * Revokes forwarding objectives for given XConnect.
+     * Revokes bridging forwarding objectives for given XConnect.
      *
      * @param key XConnect store key
      * @param nextObj next objective
@@ -347,6 +355,21 @@
     }
 
     /**
+     * Revokes ACL forwarding objectives for given XConnect.
+     *
+     * @param key XConnect store key
+     */
+    private void revokeAcl(XConnectStoreKey key) {
+        ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
+        ObjectiveContext aclContext = new DefaultObjectiveContext(
+                (objective) -> log.debug("XConnect AclObj for {} populated", key),
+                (objective, error) ->
+                        log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
+        srManager.flowObjectiveService
+                .forward(key.deviceId(), aclObjBuilder.remove(aclContext));
+    }
+
+    /**
      * Updates XConnect groups and flows for given key.
      *
      * @param key XConnect key
@@ -355,14 +378,15 @@
      */
     private void updateXConnect(XConnectStoreKey key, Set<PortNumber> prevPorts,
             Set<PortNumber> ports) {
+        // NOTE: ACL flow doesn't include port information. No need to update it.
+        //       Pair port is built-in and thus not going to change. No need to update it.
+
         // remove old filter
-        prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port -> {
-            revokeFilter(key, ImmutableSet.of(port));
-        });
+        prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port ->
+            revokeFilter(key, ImmutableSet.of(port)));
         // install new filter
-        ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port -> {
-            populateFilter(key, ImmutableSet.of(port));
-        });
+        ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port ->
+            populateFilter(key, ImmutableSet.of(port)));
 
         CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
         CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
@@ -394,12 +418,10 @@
      *
      * @param deviceId device ID
      */
-    protected void removeDevice(DeviceId deviceId) {
+    void removeDevice(DeviceId deviceId) {
         xConnectNextObjStore.entrySet().stream()
                 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
-                .forEach(entry -> {
-                    xConnectNextObjStore.remove(entry.getKey());
-                });
+                .forEach(entry -> xConnectNextObjStore.remove(entry.getKey()));
         log.debug("{} is removed from xConnectNextObjStore", deviceId);
     }
 
@@ -427,11 +449,11 @@
     }
 
     /**
-     * Creates a forwarding objective builder for XConnect.
+     * Creates a bridging forwarding objective builder for XConnect.
      *
      * @param key XConnect key
      * @param nextId next ID of the broadcast group for this XConnect key
-     * @return next objective builder
+     * @return forwarding objective builder
      */
     private ForwardingObjective.Builder fwdObjBuilder(XConnectStoreKey key, int nextId) {
         /*
@@ -453,6 +475,28 @@
     }
 
     /**
+     * Creates an ACL forwarding objective builder for XConnect.
+     *
+     * @param vlanId cross connect VLAN id
+     * @return forwarding objective builder
+     */
+    private ForwardingObjective.Builder aclObjBuilder(VlanId vlanId) {
+        TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
+        sbuilder.matchVlanId(vlanId);
+
+        TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
+
+        ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
+        fob.withFlag(ForwardingObjective.Flag.VERSATILE)
+                .withSelector(sbuilder.build())
+                .withTreatment(tbuilder.build())
+                .withPriority(SegmentRoutingService.XCONNECT_ACL_PRIORITY)
+                .fromApp(srManager.appId)
+                .makePermanent();
+        return fob;
+    }
+
+    /**
      * Creates a filtering objective builder for XConnect.
      *
      * @param key XConnect key
@@ -468,6 +512,19 @@
         return fob.permit().fromApp(srManager.appId);
     }
 
+    /**
+     * Add pair port to the given set of port.
+     *
+     * @param deviceId device Id
+     * @param ports ports specified in the xconnect config
+     * @return port specified in the xconnect config plus the pair port (if configured)
+     */
+    private Set<PortNumber> addPairPort(DeviceId deviceId, Set<PortNumber> ports) {
+        Set<PortNumber> newPorts = Sets.newHashSet(ports);
+        srManager.getPairLocalPort(deviceId).ifPresent(newPorts::add);
+        return newPorts;
+    }
+
     // TODO: Lambda closure in DefaultObjectiveContext cannot be serialized properly
     //       with Kryo 3.0.3. It will be fixed in 3.0.4. By then we can use
     //       DefaultObjectiveContext again.