SONA: add connection tracking feature using OVS conntrack function.

Change-Id: I97a6f83c12f27b3fff95d1ce47d715f1b29f36c4
diff --git a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/api/Constants.java b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/api/Constants.java
index 8b58cf6..824ae6b 100644
--- a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/api/Constants.java
+++ b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/api/Constants.java
@@ -40,13 +40,18 @@
     public static final int PRIORITY_EXTERNAL_ROUTING_RULE = 25000;
     public static final int PRIORITY_SNAT_RULE = 26000;
     public static final int PRIORITY_SWITCHING_RULE = 30000;
-    public static final int PRIORITY_ACL_RULE = 30000;
-    public static final int PRIORITY_ADMIN_RULE = 31000;
+    public static final int PRIORITY_ADMIN_RULE = 32000;
+    public static final int PRIORITY_ACL_RULE = 31000;
+    public static final int PRIORITY_CT_HOOK_RULE = 30500;
+    public static final int PRIORITY_CT_RULE = 32000;
+    public static final int PRIORITY_CT_DROP_RULE = 32500;
 
     public static final int SRC_VNI_TABLE = 0;
     public static final int ACL_TABLE = 1;
-    public static final int JUMP_TABLE = 2;
-    public static final int ROUTING_TABLE = 3;
-    public static final int FORWARDING_TABLE = 4;
+    public static final int CT_TABLE = 2;
+    public static final int JUMP_TABLE = 3;
+    public static final int ROUTING_TABLE = 4;
+    public static final int FORWARDING_TABLE = 5;
     public static final int GW_COMMON_TABLE = 0;
+    public static final int ERROR_TABLE = 10;
 }
\ No newline at end of file
diff --git a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
index 690fc6c..f425f3b 100644
--- a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
+++ b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
@@ -34,12 +34,19 @@
 import org.onlab.packet.TpPort;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.ExtensionSelector;
 import org.onosproject.openstacknetworking.api.InstancePort;
 import org.onosproject.openstacknetworking.api.InstancePortEvent;
 import org.onosproject.openstacknetworking.api.InstancePortListener;
@@ -52,6 +59,8 @@
 import org.onosproject.openstacknetworking.api.OpenstackSecurityGroupListener;
 import org.onosproject.openstacknetworking.api.OpenstackSecurityGroupService;
 import org.onosproject.openstacknode.api.OpenstackNode;
+import org.onosproject.openstacknode.api.OpenstackNodeEvent;
+import org.onosproject.openstacknode.api.OpenstackNodeListener;
 import org.onosproject.openstacknode.api.OpenstackNodeService;
 import org.openstack4j.model.network.Port;
 import org.openstack4j.model.network.SecurityGroup;
@@ -72,8 +81,14 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openstacknetworking.api.Constants.ACL_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.JUMP_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.CT_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.ERROR_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
 import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_ACL_RULE;
+import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_CT_DROP_RULE;
+import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_CT_HOOK_RULE;
+import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_CT_RULE;
+import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -114,10 +129,20 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackNodeService osNodeService;
 
+    protected DriverService driverService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
     private final InstancePortListener instancePortListener = new InternalInstancePortListener();
     private final OpenstackNetworkListener portListener = new InternalOpenstackPortListener();
     private final OpenstackSecurityGroupListener securityGroupListener = new InternalSecurityGroupListener();
+    private final OpenstackNodeListener osNodeListener = new InternalNodeListener();
     private ApplicationId appId;
+    private NodeId localNodeId;
 
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
@@ -130,13 +155,26 @@
     private static final String INGRESS = "INGRESS";
     private static final IpPrefix IP_PREFIX_ANY = Ip4Prefix.valueOf("0.0.0.0/0");
 
+    // We expose pipeline structure to SONA application considering removing pipeline soon.
+    private static final int GOTO_CONNTRACK_TABLE = 2;
+    private static final int GOTO_JUMP_TABLE = 3;
+
+    private static final int CT_COMMIT = 0;
+    private static final int CT_NO_COMMIT = 1;
+    private static final short CT_NO_RECIRC = -1;
+
+    private static final int ACTION_NONE = 0;
+    private static final int ACTION_DROP = -1;
+
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
         instancePortService.addListener(instancePortListener);
         securityGroupService.addListener(securityGroupListener);
         osNetService.addListener(portListener);
         configService.registerProperties(getClass());
+        osNodeService.addListener(osNodeListener);
 
         log.info("Started");
     }
@@ -147,6 +185,7 @@
         securityGroupService.removeListener(securityGroupListener);
         osNetService.removeListener(portListener);
         configService.unregisterProperties(getClass(), false);
+        osNodeService.removeListener(osNodeListener);
         eventExecutor.shutdown();
 
         log.info("Stopped");
@@ -170,6 +209,27 @@
         resetSecurityGroupRules();
     }
 
+    private void initializeConnTrackTable(DeviceId deviceId, boolean install) {
+
+        //table=1,ip,ct_state=-trk, actions=ct(table:2)
+        long ctState = RulePopulatorUtil.computeCtStateFlag(false, false, false);
+        long ctMask = RulePopulatorUtil.computeCtMaskFlag(true, false, false);
+        setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, (short) GOTO_CONNTRACK_TABLE,
+                ACTION_NONE, PRIORITY_CT_HOOK_RULE, install);
+
+        //table=2,ip,nw_dst=10.10.0.2,ct_state=+trk+est,action=goto_table:3
+        ctState = RulePopulatorUtil.computeCtStateFlag(true, false, true);
+        ctMask = RulePopulatorUtil.computeCtMaskFlag(true, false, true);
+        setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, CT_NO_RECIRC,
+                GOTO_JUMP_TABLE, PRIORITY_CT_RULE, install);
+
+        //table=2,ip,nw_dst=10.10.0.2,ct_state=+trk+new,action=drop
+        ctState = RulePopulatorUtil.computeCtStateFlag(true, true, false);
+        ctMask = RulePopulatorUtil.computeCtMaskFlag(true, true, false);
+        setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, CT_NO_RECIRC,
+                ACTION_DROP, PRIORITY_CT_DROP_RULE, install);
+    }
+
     private void setSecurityGroupRules(InstancePort instPort, Port port, boolean install) {
         port.getSecurityGroups().forEach(sgId -> {
             SecurityGroup sg = securityGroupService.securityGroup(sgId);
@@ -223,6 +283,73 @@
     }
 
     /**
+     * Sets connection tracking rule using OVS extension commands.
+     * It is not so graceful, but I don't want to make it more general because it is going to be used
+     * only here. The following is the usage of the fucntion.
+     *
+     * @param deviceId Device ID
+     * @param ctState ctState: please use RulePopulatorUtil.computeCtStateFlag() to build the value
+     * @param ctMask crMask: please use RulePopulatorUtil.computeCtMaskFlag() to build the value
+     * @param commit CT_COMMIT for commit action, CT_NO_COMMIT otherwise
+     * @param recircTable table number for recirculation after CT actions. CT_NO_RECIRC with no recirculation
+     * @param action Additional actions. ACTION_DROP, ACTION_NONE, GOTO_XXX_TABLE are supported.
+     * @param priority priority value for the rule
+     * @param install true for insertion, false for removal
+     */
+    private void setConnTrackRule(DeviceId deviceId, long ctState, long ctMask,
+                                  int commit, short recircTable,
+                                  int action, int priority, boolean install) {
+
+        ExtensionSelector esCtSate = RulePopulatorUtil.buildCtExtensionSelector(driverService, deviceId,
+                ctState, ctMask);
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .extension(esCtSate, deviceId)
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .build();
+
+        TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
+
+        if (commit == CT_COMMIT || recircTable > 0) {
+            RulePopulatorUtil.NiriraConnTrackTreatmentBuilder natTreatmentBuilder =
+                    RulePopulatorUtil.niciraConnTrackTreatmentBuilder(driverService, deviceId);
+            natTreatmentBuilder.natAction(false);
+            if (commit == CT_COMMIT) {
+                natTreatmentBuilder.commit(true);
+            } else {
+                natTreatmentBuilder.commit(false);
+            }
+            if (recircTable > 0) {
+                natTreatmentBuilder.table(recircTable);
+            }
+            tb.extension(natTreatmentBuilder.build(), deviceId);
+        } else if (action == ACTION_DROP) {
+            tb.drop();
+        }
+
+        if (action != ACTION_NONE) {
+            tb.transition(action);
+        }
+
+        int tableType = ERROR_TABLE;
+        if (priority == PRIORITY_CT_RULE || priority == PRIORITY_CT_DROP_RULE) {
+            tableType = CT_TABLE;
+        } else if (priority == PRIORITY_CT_HOOK_RULE) {
+            tableType = ACL_TABLE;
+        } else {
+            log.error("Cannot an appropriate table for the conn track rule.");
+        }
+
+        osFlowRuleService.setRule(
+                appId,
+                deviceId,
+                selector,
+                tb.build(),
+                priority,
+                tableType,
+                install);
+    }
+
+    /**
      * Returns a set of host IP addresses engaged with supplied security group ID.
      * It only searches a VM in the same tenant boundary.
      *
@@ -372,11 +499,17 @@
                     .forEach(node -> osFlowRuleService.setUpTableMissEntry(node.intgBridge(), ACL_TABLE));
             securityGroupService.securityGroups().forEach(securityGroup ->
                     securityGroup.getRules().forEach(this::securityGroupRuleAdded));
+            osNodeService.nodes().stream()
+                    .filter(node -> node.type().equals(OpenstackNode.NodeType.COMPUTE))
+                    .forEach(node -> initializeConnTrackTable(node .intgBridge(), true));
         } else {
             osNodeService.completeNodes(OpenstackNode.NodeType.COMPUTE)
                     .forEach(node -> osFlowRuleService.connectTables(node.intgBridge(), ACL_TABLE, JUMP_TABLE));
             securityGroupService.securityGroups().forEach(securityGroup ->
                     securityGroup.getRules().forEach(this::securityGroupRuleRemoved));
+            osNodeService.nodes().stream()
+                    .filter(node -> node.type().equals(OpenstackNode.NodeType.COMPUTE))
+                    .forEach(node -> initializeConnTrackTable(node.intgBridge(), false));
         }
 
         log.info("Reset security group info " + (useSecurityGroup ? " with " : " without") + " Security Group");
@@ -617,4 +750,43 @@
             }
         }
     }
+
+    private class InternalNodeListener implements OpenstackNodeListener {
+
+        @Override
+        public boolean isRelevant(OpenstackNodeEvent event) {
+            // do not allow to proceed without leadership
+            NodeId leader = leadershipService.getLeader(appId.name());
+            if (!Objects.equals(localNodeId, leader)) {
+                return false;
+            }
+            return event.subject().type() == COMPUTE;
+        }
+
+        @Override
+        public void event(OpenstackNodeEvent event) {
+            OpenstackNode osNode = event.subject();
+
+            switch (event.type()) {
+                case OPENSTACK_NODE_COMPLETE:
+                    eventExecutor.execute(() -> {
+                        try {
+                            if (useSecurityGroup) {
+                                initializeConnTrackTable(osNode.intgBridge(), true);
+                                log.warn("SG table initialization : {} is done", osNode.intgBridge());
+                            }
+                        } catch (IllegalArgumentException e) {
+                            log.error("ACL table initialization error : {}", e.getMessage());
+                        }
+                    });
+                    break;
+                case OPENSTACK_NODE_CREATED:
+                case OPENSTACK_NODE_REMOVED:
+                case OPENSTACK_NODE_UPDATED:
+                case OPENSTACK_NODE_INCOMPLETE:
+                default:
+                    break;
+            }
+        }
+    }
 }
diff --git a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
index 4bb42a0..4622660 100644
--- a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
+++ b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
@@ -29,10 +29,12 @@
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.instructions.ExtensionTreatment;
 import org.onosproject.openstacknetworking.api.InstancePort;
 import org.onosproject.openstacknetworking.api.InstancePortEvent;
 import org.onosproject.openstacknetworking.api.InstancePortListener;
@@ -103,6 +105,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackNodeService osNodeService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DriverService driverService;
+
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
     private final InstancePortListener instancePortListener = new InternalInstancePortListener();
@@ -244,9 +249,15 @@
                 .matchInPort(instPort.portNumber())
                 .build();
 
+        // XXX All egress traffic needs to go through connection tracking module, which might hurt its performance.
+        ExtensionTreatment ctTreatment =
+                RulePopulatorUtil.niciraConnTrackTreatmentBuilder(driverService, instPort.deviceId())
+                        .commit(true).build();
+
         TrafficTreatment treatment = DefaultTrafficTreatment.builder()
                 .setTunnelId(getVni(instPort))
                 .transition(ACL_TABLE)
+                .extension(ctTreatment, instPort.deviceId())
                 .build();
 
         osFlowRuleService.setRule(
diff --git a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/RulePopulatorUtil.java b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/RulePopulatorUtil.java
index e6ed5bf..b569da1 100644
--- a/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/RulePopulatorUtil.java
+++ b/apps/openstacknetworking/src/main/java/org/onosproject/openstacknetworking/impl/RulePopulatorUtil.java
@@ -17,24 +17,18 @@
 
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.IpAddress;
-import org.onosproject.core.ApplicationId;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.ExtensionSelectorResolver;
 import org.onosproject.net.behaviour.ExtensionTreatmentResolver;
 import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.instructions.ExtensionPropertyException;
-import org.onosproject.net.flow.instructions.ExtensionTreatment;
 import org.onosproject.net.driver.DriverHandler;
 import org.onosproject.net.driver.DriverService;
-import org.onosproject.net.flow.TrafficSelector;
-import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.ExtensionSelector;
 import org.onosproject.net.flow.criteria.ExtensionSelectorType;
+import org.onosproject.net.flow.instructions.ExtensionPropertyException;
+import org.onosproject.net.flow.instructions.ExtensionTreatment;
 import org.onosproject.net.flow.instructions.ExtensionTreatmentType;
-import org.onosproject.net.flowobjective.DefaultForwardingObjective;
-import org.onosproject.net.flowobjective.FlowObjectiveService;
-import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -143,40 +137,6 @@
     }
 
     /**
-     * Adds flow rules with the supplied information.
-     *
-     * @param flowObjectiveService flow objective service
-     * @param appId application id
-     * @param deviceId device id to remove this flow rule
-     * @param selector traffic selector
-     * @param treatment traffic treatment
-     * @param flag flag
-     * @param priority priority
-     * @param install populate flows if true, remove them otherwise
-     */
-    public static void setRule(FlowObjectiveService flowObjectiveService,
-                               ApplicationId appId,
-                               DeviceId deviceId,
-                               TrafficSelector selector,
-                               TrafficTreatment treatment,
-                               ForwardingObjective.Flag flag,
-                               int priority,
-                               boolean install) {
-        ForwardingObjective.Builder foBuilder = DefaultForwardingObjective.builder()
-                .withSelector(selector)
-                .withTreatment(treatment)
-                .withFlag(flag)
-                .withPriority(priority)
-                .fromApp(appId);
-
-        if (install) {
-            flowObjectiveService.forward(deviceId, foBuilder.add());
-        } else {
-            flowObjectiveService.forward(deviceId, foBuilder.remove());
-        }
-    }
-
-    /**
      * Computes ConnTack State flag values.
      *
      * @param isTracking true for +trk, false for -trk