Fixed bug in OpenstackNode

- OVSDB client has changed to update br-int even if there's br-int already in device
- So OpenstackNode modified only the leader node to  performs node bootstrap

Change-Id: Ie01843ca8ab36ec61b58e80ce20c0c8c31ff8273
diff --git a/apps/openstacknode/src/main/java/org/onosproject/openstacknode/OpenstackNodeManager.java b/apps/openstacknode/src/main/java/org/onosproject/openstacknode/OpenstackNodeManager.java
index 28b41cf..e0667be 100644
--- a/apps/openstacknode/src/main/java/org/onosproject/openstacknode/OpenstackNodeManager.java
+++ b/apps/openstacknode/src/main/java/org/onosproject/openstacknode/OpenstackNodeManager.java
@@ -26,6 +26,8 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.DefaultAnnotations;
@@ -60,6 +62,7 @@
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.Device.Type.SWITCH;
 import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
@@ -69,7 +72,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -82,7 +84,6 @@
 @Service
 public class OpenstackNodeManager implements OpenstackNodeService {
     protected final Logger log = getLogger(getClass());
-    private static final int NUM_THREADS = 1;
     private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
             .register(KryoNamespaces.API)
             .register(OpenstackNode.class)
@@ -127,6 +128,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry configRegistry;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
     private final OvsdbHandler ovsdbHandler = new OvsdbHandler();
     private final BridgeHandler bridgeHandler = new BridgeHandler();
     private final NetworkConfigListener configListener = new InternalConfigListener();
@@ -138,13 +142,15 @@
                 }
             };
 
-    private final ExecutorService eventExecutor = Executors
-            .newFixedThreadPool(NUM_THREADS, groupedThreads("onos/openstacknode", "event-handler"));
+    private final ExecutorService eventExecutor =
+            newSingleThreadScheduledExecutor(groupedThreads("onos/openstacknode", "event-handler"));
+
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
 
     private ApplicationId appId;
     private ConsistentMap<OpenstackNode, NodeState> nodeStore;
+    private NodeId localNodeId;
 
     private enum NodeState {
 
@@ -192,6 +198,9 @@
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(OPENSTACK_NODEMANAGER_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+
         nodeStore = storageService.<OpenstackNode, NodeState>consistentMapBuilder()
                 .withSerializer(Serializer.using(NODE_SERIALIZER.build()))
                 .withName(OPENSTACK_NODESTORE)
@@ -214,6 +223,7 @@
 
         configRegistry.unregisterConfigFactory(configFactory);
         configService.removeListener(configListener);
+        leadershipService.withdraw(appId.name());
 
         log.info("Stopped");
     }
@@ -223,8 +233,15 @@
     public void addNode(OpenstackNode node) {
         checkNotNull(node, "Node cannot be null");
 
-        nodeStore.putIfAbsent(node, checkNodeState(node));
+        NodeId leaderNodeId = leadershipService.getLeader(appId.name());
+        log.debug("Node init requested, localNodeId: {}, leaderNodeId: {}", localNodeId, leaderNodeId);
 
+        //TODO: Fix any node can engage this operation.
+        if (!localNodeId.equals(leaderNodeId)) {
+            log.debug("Only the leaderNode can perform addNode operation");
+            return;
+        }
+        nodeStore.putIfAbsent(node, checkNodeState(node));
         NodeState state = checkNodeState(node);
         state.process(this, node);
     }
@@ -488,6 +505,13 @@
 
         @Override
         public void event(DeviceEvent event) {
+            NodeId leaderNodeId = leadershipService.getLeader(appId.name());
+
+            //TODO: Fix any node can engage this operation.
+            if (!localNodeId.equals(leaderNodeId)) {
+                log.debug("Only the leaderNode can process events");
+                return;
+            }
 
             Device device = event.subject();
             ConnectionHandler<Device> handler =