CORD-151 Implement initial compute node setup

Followings are changed
- Changed nodeStore from eventually consistent map to consistent map
- Removed ovsdb connection management(ovsdb controller has connection status)
- Not only one leader but all onos instances make ovsdb session

Following jobs are done
- Reads compute node and ovsdb access info from network config
- Initiates ovsdb connection to the nodes
- Creates integration bridge on each ovsdbs
- Creates vxlan tunnel port on each integration bridges

Change-Id: I8df4061fcb1eae9b0abd545b7a3f540be50607a9
diff --git a/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java b/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java
index cb8acab..ba70780 100644
--- a/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java
+++ b/apps/cordvtn/src/main/java/org/onosproject/cordvtn/CordVtn.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.cordvtn;
 
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -22,31 +24,39 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
+import org.onosproject.net.behaviour.ControllerInfo;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.host.HostEvent;
 import org.onosproject.net.host.HostListener;
 import org.onosproject.net.host.HostService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.onosproject.ovsdb.controller.OvsdbNodeId;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.LogicalClockService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.cordvtn.OvsdbNode.State;
-import static org.onosproject.cordvtn.OvsdbNode.State.INIT;
-import static org.onosproject.cordvtn.OvsdbNode.State.DISCONNECT;
 import static org.onosproject.net.Device.Type.SWITCH;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -63,7 +73,17 @@
     private static final int NUM_THREADS = 1;
     private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
             .register(KryoNamespaces.API)
-            .register(OvsdbNode.class);
+            .register(DefaultOvsdbNode.class);
+    private static final String DEFAULT_BRIDGE_NAME = "br-int";
+    private static final Map<String, String> VXLAN_OPTIONS = new HashMap<String, String>() {
+        {
+            put("key", "flow");
+            put("local_ip", "flow");
+            put("remote_ip", "flow");
+        }
+    };
+    private static final int DPID_BEGIN = 3;
+    private static final int OFPORT = 6653;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
@@ -72,14 +92,20 @@
     protected StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LogicalClockService clockService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected HostService hostService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OvsdbController controller;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
     private final ExecutorService eventExecutor = Executors
             .newFixedThreadPool(NUM_THREADS, groupedThreads("onos/cordvtn", "event-handler"));
 
@@ -90,15 +116,16 @@
     private final BridgeHandler bridgeHandler = new BridgeHandler();
     private final VmHandler vmHandler = new VmHandler();
 
-    private EventuallyConsistentMap<DeviceId, OvsdbNode> nodeStore;
+    private ConsistentMap<DeviceId, OvsdbNode> nodeStore;
+    private ApplicationId appId;
 
     @Activate
     protected void activate() {
-        coreService.registerApplication("org.onosproject.cordvtn");
-        nodeStore = storageService.<DeviceId, OvsdbNode>eventuallyConsistentMapBuilder()
+        appId = coreService.registerApplication("org.onosproject.cordvtn");
+        nodeStore = storageService.<DeviceId, OvsdbNode>consistentMapBuilder()
+                .withSerializer(Serializer.using(NODE_SERIALIZER.build()))
                 .withName("cordvtn-nodestore")
-                .withSerializer(NODE_SERIALIZER)
-                .withTimestampProvider((k, v) -> clockService.getTimestamp())
+                .withApplicationId(appId)
                 .build();
 
         deviceService.addListener(deviceListener);
@@ -113,43 +140,59 @@
         hostService.removeListener(hostListener);
 
         eventExecutor.shutdown();
-        nodeStore.destroy();
+        nodeStore.clear();
 
         log.info("Stopped");
     }
 
     @Override
-    public void addNode(OvsdbNode ovsdbNode) {
-        if (nodeStore.containsKey(ovsdbNode.deviceId())) {
-            log.warn("Node {} already exists", ovsdbNode.host());
+    public void addNode(OvsdbNode ovsdb) {
+        checkNotNull(ovsdb);
+        nodeStore.put(ovsdb.deviceId(), ovsdb);
+    }
+
+    @Override
+    public void deleteNode(OvsdbNode ovsdb) {
+        checkNotNull(ovsdb);
+
+        if (!nodeStore.containsKey(ovsdb.deviceId())) {
             return;
         }
-        nodeStore.put(ovsdbNode.deviceId(), ovsdbNode);
-        if (ovsdbNode.state() != INIT) {
-            updateNode(ovsdbNode, INIT);
+
+        // check ovsdb and integration bridge connection state first
+        if (isNodeConnected(ovsdb)) {
+            log.warn("Cannot delete connected node {}", ovsdb.host());
+        } else {
+            nodeStore.remove(ovsdb.deviceId());
         }
     }
 
     @Override
-    public void deleteNode(OvsdbNode ovsdbNode) {
-        if (!nodeStore.containsKey(ovsdbNode.deviceId())) {
-            log.warn("Node {} does not exist", ovsdbNode.host());
+    public void connect(OvsdbNode ovsdb) {
+        checkNotNull(ovsdb);
+
+        if (!nodeStore.containsKey(ovsdb.deviceId())) {
+            log.warn("Node {} does not exist", ovsdb.host());
             return;
         }
-        updateNode(ovsdbNode, DISCONNECT);
+        controller.connect(ovsdb.ip(), ovsdb.port());
     }
 
     @Override
-    public void updateNode(OvsdbNode ovsdbNode, State state) {
-        if (!nodeStore.containsKey(ovsdbNode.deviceId())) {
-            log.warn("Node {} does not exist", ovsdbNode.host());
+    public void disconnect(OvsdbNode ovsdb) {
+        checkNotNull(ovsdb);
+
+        if (!nodeStore.containsKey(ovsdb.deviceId())) {
+            log.warn("Node {} does not exist", ovsdb.host());
             return;
         }
-        DefaultOvsdbNode updatedNode = new DefaultOvsdbNode(ovsdbNode.host(),
-                                                            ovsdbNode.ip(),
-                                                            ovsdbNode.port(),
-                                                            state);
-        nodeStore.put(ovsdbNode.deviceId(), updatedNode);
+
+        OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb);
+        checkNotNull(ovsdbClient);
+
+        if (ovsdbClient.isConnected()) {
+            ovsdbClient.disconnect();
+        }
     }
 
     @Override
@@ -159,14 +202,42 @@
 
     @Override
     public OvsdbNode getNode(DeviceId deviceId) {
-        return nodeStore.get(deviceId);
+        Versioned<OvsdbNode> ovsdb = nodeStore.get(deviceId);
+        if (ovsdb != null) {
+            return ovsdb.value();
+        } else {
+            return null;
+        }
     }
 
     @Override
     public List<OvsdbNode> getNodes() {
-        return nodeStore.values()
-                .stream()
-                .collect(Collectors.toList());
+        List<OvsdbNode> ovsdbs = new ArrayList<>();
+        ovsdbs.addAll(Collections2.transform(nodeStore.values(), Versioned::value));
+        return ovsdbs;
+    }
+
+    @Override
+    public boolean isNodeConnected(OvsdbNode ovsdb) {
+        checkNotNull(ovsdb);
+
+        OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb);
+        if (ovsdbClient == null) {
+            return false;
+        } else {
+            return ovsdbClient.isConnected();
+        }
+    }
+
+    private OvsdbClientService getOvsdbClient(OvsdbNode ovsdb) {
+        checkNotNull(ovsdb);
+
+        OvsdbClientService ovsdbClient = controller.getOvsdbClient(
+                new OvsdbNodeId(ovsdb.ip(), ovsdb.port().toInt()));
+        if (ovsdbClient == null) {
+            log.warn("Couldn't find ovsdb client of node {}", ovsdb.host());
+        }
+        return ovsdbClient;
     }
 
     private class InternalDeviceListener implements DeviceListener {
@@ -182,6 +253,7 @@
                     break;
                 case DEVICE_AVAILABILITY_CHANGED:
                     eventExecutor.submit(() -> handler.disconnected(device));
+                    // TODO handle the case that the device is recovered
                     break;
                 default:
                     break;
@@ -212,14 +284,27 @@
 
         @Override
         public void connected(Device device) {
-            // create bridge and set bridgeId
-            // set node state connected
+            log.info("Ovsdb {} is connected", device.id());
+
+            if (!mastershipService.isLocalMaster(device.id())) {
+                return;
+            }
+
+            // TODO change to use bridge config
+            OvsdbNode ovsdb = getNode(device.id());
+            OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb);
+
+            List<ControllerInfo> controllers = new ArrayList<>();
+            Sets.newHashSet(clusterService.getNodes()).forEach(controller ->
+                        controllers.add(new ControllerInfo(controller.ip(), OFPORT, "tcp")));
+            String dpid = ovsdb.intBrId().toString().substring(DPID_BEGIN);
+
+            ovsdbClient.createBridge(DEFAULT_BRIDGE_NAME, dpid, controllers);
         }
 
         @Override
         public void disconnected(Device device) {
-            // set node state disconnected if the node exists
-            // which means that the node is not deleted explicitly
+            log.warn("Ovsdb {} is disconnected", device.id());
         }
     }
 
@@ -227,12 +312,29 @@
 
         @Override
         public void connected(Device device) {
-            // create vxlan port
+            log.info("Integration Bridge {} is detected", device.id());
+
+            OvsdbNode ovsdb = getNodes().stream()
+                    .filter(node -> node.intBrId().equals(device.id()))
+                    .findFirst().get();
+
+            if (ovsdb == null) {
+                log.warn("Couldn't find OVSDB associated with {}", device.id());
+                return;
+            }
+
+            if (!mastershipService.isLocalMaster(ovsdb.deviceId())) {
+                return;
+            }
+
+            // TODO change to use tunnel config and tunnel description
+            OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb);
+            ovsdbClient.createTunnel(DEFAULT_BRIDGE_NAME, "vxlan", "vxlan", VXLAN_OPTIONS);
         }
 
         @Override
         public void disconnected(Device device) {
-
+            log.info("Integration Bridge {} is vanished", device.id());
         }
     }
 
@@ -240,12 +342,12 @@
 
         @Override
         public void connected(Host host) {
-            // install flow rules for this vm
+            log.info("VM {} is detected", host.id());
         }
 
         @Override
         public void disconnected(Host host) {
-            // uninstall flow rules associated with this vm
+            log.info("VM {} is vanished", host.id());
         }
     }
 }