Add unit tests for OpenstackFlowRuleManager

Change-Id: If7c89cd0eb0115972dcefd6bc026414e34029df2
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackFlowRuleManager.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackFlowRuleManager.java
index 4c5b0b7..a1289c4 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackFlowRuleManager.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackFlowRuleManager.java
@@ -22,6 +22,9 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.DeviceId;
@@ -42,11 +45,13 @@
 import org.onosproject.openstacknode.api.OpenstackNodeService;
 import org.slf4j.Logger;
 
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
+import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -69,21 +74,31 @@
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackNodeService osNodeService;
 
     private final ExecutorService deviceEventExecutor =
-            Executors.newSingleThreadExecutor(groupedThreads("openstacknetworking", "device-event"));
-    private final OpenstackNodeListener internalNodeListener = new InternalOpenstackNodeListener();
+            Executors.newSingleThreadExecutor(groupedThreads(
+                    getClass().getSimpleName(), "device-event"));
+    private final OpenstackNodeListener internalNodeListener =
+                                        new InternalOpenstackNodeListener();
 
     private ApplicationId appId;
+    private NodeId localNodeId;
 
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
         coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
         osNodeService.addListener(internalNodeListener);
-
-        osNodeService.completeNodes(OpenstackNode.NodeType.COMPUTE)
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        osNodeService.completeNodes(COMPUTE)
                 .forEach(node -> initializePipeline(node.intgBridge()));
 
         log.info("Started");
@@ -92,20 +107,20 @@
     @Deactivate
     protected void deactivate() {
         osNodeService.removeListener(internalNodeListener);
+        leadershipService.withdraw(appId.name());
         deviceEventExecutor.shutdown();
 
         log.info("Stopped");
     }
 
-
     @Override
     public void setRule(ApplicationId appId,
-                               DeviceId deviceId,
-                               TrafficSelector selector,
-                               TrafficTreatment treatment,
-                               int priority,
-                               int tableType,
-                               boolean install) {
+                        DeviceId deviceId,
+                        TrafficSelector selector,
+                        TrafficTreatment treatment,
+                        int priority,
+                        int tableType,
+                        boolean install) {
 
         FlowRule.Builder flowRuleBuilder = DefaultFlowRule.builder()
                 .forDevice(deviceId)
@@ -124,49 +139,6 @@
         applyRule(flowRuleBuilder.build(), install);
     }
 
-    private void applyRule(FlowRule flowRule, boolean install) {
-        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
-
-        flowOpsBuilder = install ? flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
-
-        flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
-            @Override
-            public void onSuccess(FlowRuleOperations ops) {
-                log.debug("Provisioned vni or forwarding table");
-            }
-
-            @Override
-            public void onError(FlowRuleOperations ops) {
-                log.debug("Failed to provision vni or forwarding table");
-            }
-        }));
-    }
-
-    private void initializePipeline(DeviceId deviceId) {
-        // for inbound table transition
-        connectTables(deviceId, Constants.STAT_INBOUND_TABLE, Constants.VTAP_INBOUND_TABLE);
-        connectTables(deviceId, Constants.VTAP_INBOUND_TABLE, Constants.DHCP_ARP_TABLE);
-
-        // for vTag and ACL table transition
-        connectTables(deviceId, Constants.DHCP_ARP_TABLE, Constants.VTAG_TABLE);
-        connectTables(deviceId, Constants.VTAG_TABLE, Constants.ACL_TABLE);
-        connectTables(deviceId, Constants.ACL_TABLE, Constants.JUMP_TABLE);
-
-        // for JUMP table transition
-        // we need JUMP table for bypassing routing table which contains large
-        // amount of flow rules which might cause performance degradation during
-        // table lookup
-        setupJumpTable(deviceId);
-
-        // for outbound table transition
-        connectTables(deviceId, Constants.STAT_OUTBOUND_TABLE, Constants.VTAP_OUTBOUND_TABLE);
-        connectTables(deviceId, Constants.VTAP_OUTBOUND_TABLE, Constants.FORWARDING_TABLE);
-
-        // for FLAT outbound table transition
-        connectTables(deviceId, Constants.STAT_FLAT_OUTBOUND_TABLE, Constants.VTAP_FLAT_OUTBOUND_TABLE);
-        connectTables(deviceId, Constants.VTAP_FLAT_OUTBOUND_TABLE, Constants.FLAT_TABLE);
-    }
-
     @Override
     public void connectTables(DeviceId deviceId, int fromTable, int toTable) {
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -207,6 +179,49 @@
         applyRule(flowRule, true);
     }
 
+    private void applyRule(FlowRule flowRule, boolean install) {
+        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
+
+        flowOpsBuilder = install ? flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
+
+        flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                log.debug("Provisioned vni or forwarding table");
+            }
+
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                log.debug("Failed to provision vni or forwarding table");
+            }
+        }));
+    }
+
+    protected void initializePipeline(DeviceId deviceId) {
+        // for inbound table transition
+        connectTables(deviceId, Constants.STAT_INBOUND_TABLE, Constants.VTAP_INBOUND_TABLE);
+        connectTables(deviceId, Constants.VTAP_INBOUND_TABLE, Constants.DHCP_ARP_TABLE);
+
+        // for vTag and ACL table transition
+        connectTables(deviceId, Constants.DHCP_ARP_TABLE, Constants.VTAG_TABLE);
+        connectTables(deviceId, Constants.VTAG_TABLE, Constants.ACL_TABLE);
+        connectTables(deviceId, Constants.ACL_TABLE, Constants.JUMP_TABLE);
+
+        // for JUMP table transition
+        // we need JUMP table for bypassing routing table which contains large
+        // amount of flow rules which might cause performance degradation during
+        // table lookup
+        setupJumpTable(deviceId);
+
+        // for outbound table transition
+        connectTables(deviceId, Constants.STAT_OUTBOUND_TABLE, Constants.VTAP_OUTBOUND_TABLE);
+        connectTables(deviceId, Constants.VTAP_OUTBOUND_TABLE, Constants.FORWARDING_TABLE);
+
+        // for FLAT outbound table transition
+        connectTables(deviceId, Constants.STAT_FLAT_OUTBOUND_TABLE, Constants.VTAP_FLAT_OUTBOUND_TABLE);
+        connectTables(deviceId, Constants.VTAP_FLAT_OUTBOUND_TABLE, Constants.FLAT_TABLE);
+    }
+
     private void setupJumpTable(DeviceId deviceId) {
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
         TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
@@ -247,15 +262,22 @@
     private class InternalOpenstackNodeListener implements OpenstackNodeListener {
 
         @Override
+        public boolean isRelevant(OpenstackNodeEvent event) {
+            // do not allow to proceed without leadership
+            NodeId leader = leadershipService.getLeader(appId.name());
+            return Objects.equals(localNodeId, leader) &&
+                    event.subject().type().equals(COMPUTE);
+        }
+
+        @Override
         public void event(OpenstackNodeEvent event) {
             OpenstackNode osNode = event.subject();
-            // TODO check leadership of the node and make only the leader process
 
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
                     deviceEventExecutor.execute(() -> {
                         log.info("COMPLETE node {} is detected", osNode.hostname());
-                        processCompleteNode(event.subject());
+                        initializePipeline(osNode.intgBridge());
                     });
                     break;
                 case OPENSTACK_NODE_CREATED:
@@ -267,11 +289,5 @@
                     break;
             }
         }
-
-        private void processCompleteNode(OpenstackNode osNode) {
-            if (osNode.type().equals(OpenstackNode.NodeType.COMPUTE)) {
-                initializePipeline(osNode.intgBridge());
-            }
-        }
     }
 }