[ONOS-3688] Implement application for Openstack Node bootstrap
- Using OVSDB connnection, it makes bridge and vxlan tunnel for Openstack Nodes.
- Reading nodes information from network config supported

Change-Id: I1c0483b146ace19cd77ac91141182ee9200a9432
diff --git a/apps/openstacknode/src/main/java/org/onosproject/openstacknode/OpenstackNodeManager.java b/apps/openstacknode/src/main/java/org/onosproject/openstacknode/OpenstackNodeManager.java
new file mode 100644
index 0000000..28b41cf
--- /dev/null
+++ b/apps/openstacknode/src/main/java/org/onosproject/openstacknode/OpenstackNodeManager.java
@@ -0,0 +1,630 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacknode;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeName;
+import org.onosproject.net.behaviour.ControllerInfo;
+import org.onosproject.net.behaviour.DefaultTunnelDescription;
+import org.onosproject.net.behaviour.TunnelConfig;
+import org.onosproject.net.behaviour.TunnelDescription;
+import org.onosproject.net.behaviour.TunnelName;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.onosproject.ovsdb.controller.OvsdbNodeId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.Device.Type.SWITCH;
+import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * Initializes devices in compute/gateway nodes according to there type.
+ */
+@Component(immediate = true)
+@Service
+public class OpenstackNodeManager implements OpenstackNodeService {
+    protected final Logger log = getLogger(getClass());
+    private static final int NUM_THREADS = 1;
+    private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(OpenstackNode.class)
+            .register(OpenstackNodeType.class)
+            .register(NodeState.class);
+    private static final String DEFAULT_BRIDGE = "br-int";
+    private static final String DEFAULT_TUNNEL = "vxlan";
+    private static final String PORT_NAME = "portName";
+    private static final String OPENSTACK_NODESTORE = "openstacknode-nodestore";
+    private static final String OPENSTACK_NODEMANAGER_ID = "org.onosproject.openstacknode";
+
+    private static final Map<String, String> DEFAULT_TUNNEL_OPTIONS
+            = ImmutableMap.of("key", "flow", "remote_ip", "flow");
+
+    private static final int DPID_BEGIN = 3;
+    private static final int OFPORT = 6653;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OvsdbController controller;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DriverService driverService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceAdminService adminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected NetworkConfigService configService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected NetworkConfigRegistry configRegistry;
+
+    private final OvsdbHandler ovsdbHandler = new OvsdbHandler();
+    private final BridgeHandler bridgeHandler = new BridgeHandler();
+    private final NetworkConfigListener configListener = new InternalConfigListener();
+    private final ConfigFactory configFactory =
+            new ConfigFactory(SubjectFactories.APP_SUBJECT_FACTORY, OpenstackNodeConfig.class, "openstacknode") {
+                @Override
+                public OpenstackNodeConfig createConfig() {
+                    return new OpenstackNodeConfig();
+                }
+            };
+
+    private final ExecutorService eventExecutor = Executors
+            .newFixedThreadPool(NUM_THREADS, groupedThreads("onos/openstacknode", "event-handler"));
+
+    private final DeviceListener deviceListener = new InternalDeviceListener();
+
+    private ApplicationId appId;
+    private ConsistentMap<OpenstackNode, NodeState> nodeStore;
+
+    private enum NodeState {
+
+        INIT {
+            @Override
+            public void process(OpenstackNodeManager openstackNodeManager, OpenstackNode node) {
+                openstackNodeManager.connect(node);
+            }
+        },
+        OVSDB_CONNECTED {
+            @Override
+            public void process(OpenstackNodeManager openstackNodeManager, OpenstackNode node) {
+                if (!openstackNodeManager.getOvsdbConnectionState(node)) {
+                    openstackNodeManager.connect(node);
+                } else {
+                    openstackNodeManager.createIntegrationBridge(node);
+                }
+            }
+        },
+        BRIDGE_CREATED {
+            @Override
+            public void process(OpenstackNodeManager openstackNodeManager, OpenstackNode node) {
+                if (!openstackNodeManager.getOvsdbConnectionState(node)) {
+                    openstackNodeManager.connect(node);
+                } else {
+                    openstackNodeManager.createTunnelInterface(node);
+                }
+            }
+        },
+        COMPLETE {
+            @Override
+            public void process(OpenstackNodeManager openstackNodeManager, OpenstackNode node) {
+                openstackNodeManager.postInit(node);
+            }
+        },
+        INCOMPLETE {
+            @Override
+            public void process(OpenstackNodeManager openstackNodeManager, OpenstackNode node) {
+            }
+        };
+
+        public abstract void process(OpenstackNodeManager openstackNodeManager, OpenstackNode node);
+    }
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(OPENSTACK_NODEMANAGER_ID);
+        nodeStore = storageService.<OpenstackNode, NodeState>consistentMapBuilder()
+                .withSerializer(Serializer.using(NODE_SERIALIZER.build()))
+                .withName(OPENSTACK_NODESTORE)
+                .withApplicationId(appId)
+                .build();
+
+        deviceService.addListener(deviceListener);
+        configRegistry.registerConfigFactory(configFactory);
+        configService.addListener(configListener);
+        readConfiguration();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        deviceService.removeListener(deviceListener);
+        eventExecutor.shutdown();
+        nodeStore.clear();
+
+        configRegistry.unregisterConfigFactory(configFactory);
+        configService.removeListener(configListener);
+
+        log.info("Stopped");
+    }
+
+
+    @Override
+    public void addNode(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+
+        nodeStore.putIfAbsent(node, checkNodeState(node));
+
+        NodeState state = checkNodeState(node);
+        state.process(this, node);
+    }
+
+    @Override
+    public void deleteNode(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+
+        if (getOvsdbConnectionState(node)) {
+            disconnect(node);
+        }
+
+        nodeStore.remove(node);
+    }
+
+    @Override
+    public List<OpenstackNode> getNodes(OpenstackNodeType openstackNodeType) {
+        List<OpenstackNode> nodes = new ArrayList<>();
+        nodes.addAll(nodeStore.keySet().stream().filter(node -> node.openstackNodeType()
+                .equals(openstackNodeType)).collect(Collectors.toList()));
+        return nodes;
+    }
+
+    private List<OpenstackNode> getNodesAll() {
+        List<OpenstackNode> nodes = new ArrayList<>();
+        nodes.addAll(nodeStore.keySet());
+        return nodes;
+    }
+
+    @Override
+    public boolean isComplete(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+
+        if (!nodeStore.containsKey(node)) {
+            log.warn("Node {} does not exist", node.hostName());
+            return false;
+        } else if (nodeStore.get(node).equals(NodeState.COMPLETE)) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks current state of a given openstack node and returns it.
+     *
+     * @param node openstack node
+     * @return node state
+     */
+    private NodeState checkNodeState(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+
+        if (checkIntegrationBridge(node) && checkTunnelInterface(node)) {
+            return NodeState.COMPLETE;
+        } else if (checkIntegrationBridge(node)) {
+            return NodeState.BRIDGE_CREATED;
+        } else if (getOvsdbConnectionState(node)) {
+            return NodeState.OVSDB_CONNECTED;
+        } else {
+            return NodeState.INIT;
+        }
+    }
+
+
+    /**
+     * Checks if integration bridge exists and available.
+     *
+     * @param node openstack node
+     * @return true if the bridge is available, false otherwise
+     */
+    private boolean checkIntegrationBridge(OpenstackNode node) {
+        return (deviceService.getDevice(node.intBrId()) != null
+                && deviceService.isAvailable(node.intBrId()));
+    }
+    /**
+     * Checks if tunnel interface exists.
+     *
+     * @param node openstack node
+     * @return true if the interface exists, false otherwise
+     */
+    private boolean checkTunnelInterface(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+        return deviceService.getPorts(node.intBrId())
+                    .stream()
+                    .filter(p -> p.annotations().value(PORT_NAME).contains(DEFAULT_TUNNEL) && p.isEnabled())
+                    .findAny().isPresent();
+    }
+
+    /**
+     * Returns connection state of OVSDB server for a given node.
+     *
+     * @param node openstack node
+     * @return true if it is connected, false otherwise
+     */
+    private boolean getOvsdbConnectionState(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+
+        OvsdbClientService ovsdbClient = getOvsdbClient(node);
+        return deviceService.isAvailable(node.ovsdbId()) &&
+                ovsdbClient != null && ovsdbClient.isConnected();
+    }
+
+    /**
+     * Returns OVSDB client for a given node.
+     *
+     * @param node openstack node
+     * @return OVSDB client, or null if it fails to get OVSDB client
+     */
+    private OvsdbClientService getOvsdbClient(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+
+        OvsdbClientService ovsdbClient = controller.getOvsdbClient(
+                new OvsdbNodeId(node.ovsdbIp(), node.ovsdbPort().toInt()));
+        if (ovsdbClient == null) {
+            log.debug("Couldn't find OVSDB client for {}", node.hostName());
+        }
+        return ovsdbClient;
+    }
+
+    /**
+     * Connects to OVSDB server for a given node.
+     *
+     * @param node openstack node
+     */
+    private void connect(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+
+        if (!nodeStore.containsKey(node)) {
+            log.warn("Node {} does not exist", node.hostName());
+            return;
+        }
+
+        if (!getOvsdbConnectionState(node)) {
+            controller.connect(node.ovsdbIp(), node.ovsdbPort());
+        }
+    }
+
+    /**
+     * Creates an integration bridge for a given node.
+     *
+     * @param node openstack node
+     */
+    private void createIntegrationBridge(OpenstackNode node) {
+        if (checkIntegrationBridge(node)) {
+            return;
+        }
+
+        List<ControllerInfo> controllers = new ArrayList<>();
+        Sets.newHashSet(clusterService.getNodes()).stream()
+                .forEach(controller -> {
+                    ControllerInfo ctrlInfo = new ControllerInfo(controller.ip(), OFPORT, "tcp");
+                    controllers.add(ctrlInfo);
+                });
+        String dpid = node.intBrId().toString().substring(DPID_BEGIN);
+
+        try {
+            DriverHandler handler = driverService.createHandler(node.ovsdbId());
+            BridgeConfig bridgeConfig =  handler.behaviour(BridgeConfig.class);
+            bridgeConfig.addBridge(BridgeName.bridgeName(DEFAULT_BRIDGE), dpid, controllers);
+        } catch (ItemNotFoundException e) {
+            log.warn("Failed to create integration bridge on {}", node.ovsdbId());
+        }
+    }
+
+    /**
+     * Creates tunnel interface to the integration bridge for a given node.
+     *
+     * @param node openstack node
+     */
+    private void createTunnelInterface(OpenstackNode node) {
+        if (checkTunnelInterface(node)) {
+            return;
+        }
+
+        DefaultAnnotations.Builder optionBuilder = DefaultAnnotations.builder();
+        for (String key : DEFAULT_TUNNEL_OPTIONS.keySet()) {
+            optionBuilder.set(key, DEFAULT_TUNNEL_OPTIONS.get(key));
+        }
+        TunnelDescription description =
+                new DefaultTunnelDescription(null, null, VXLAN, TunnelName.tunnelName(DEFAULT_TUNNEL),
+                        optionBuilder.build());
+        try {
+            DriverHandler handler = driverService.createHandler(node.ovsdbId());
+            TunnelConfig tunnelConfig =  handler.behaviour(TunnelConfig.class);
+            tunnelConfig.createTunnelInterface(BridgeName.bridgeName(DEFAULT_BRIDGE), description);
+        } catch (ItemNotFoundException e) {
+            log.warn("Failed to create tunnel interface on {}", node.ovsdbId());
+        }
+    }
+
+    /**
+     * Performs tasks after node initialization.
+     * First disconnect unnecessary OVSDB connection and then installs flow rules
+     * for existing VMs if there are any.
+     *
+     * @param node openstack node
+     */
+    private void postInit(OpenstackNode node) {
+        disconnect(node);
+        log.info("Finished initializing {}", node.hostName());
+    }
+
+    /**
+     * Sets a new state for a given openstack node.
+     *
+     * @param node openstack node
+     * @param newState new node state
+     */
+    private void setNodeState(OpenstackNode node, NodeState newState) {
+        checkNotNull(node, "Node cannot be null");
+
+        log.debug("Changed {} state: {}", node.hostName(), newState.toString());
+
+        nodeStore.put(node, newState);
+        newState.process(this, node);
+    }
+
+    /**
+     * Returns openstack node associated with a given OVSDB device.
+     *
+     * @param ovsdbId OVSDB device id
+     * @return openstack node, null if it fails to find the node
+     */
+    private OpenstackNode getNodeByOvsdbId(DeviceId ovsdbId) {
+
+        return getNodesAll().stream()
+                .filter(node -> node.ovsdbId().equals(ovsdbId))
+                .findFirst().orElse(null);
+    }
+
+    /**
+     * Returns openstack node associated with a given integration bridge.
+     *
+     * @param bridgeId device id of integration bridge
+     * @return openstack node, null if it fails to find the node
+     */
+    private OpenstackNode getNodeByBridgeId(DeviceId bridgeId) {
+        return getNodesAll().stream()
+                .filter(node -> node.intBrId().equals(bridgeId))
+                .findFirst().orElse(null);
+    }
+    /**
+     * Disconnects OVSDB server for a given node.
+     *
+     * @param node openstack node
+     */
+    private void disconnect(OpenstackNode node) {
+        checkNotNull(node, "Node cannot be null");
+
+        if (!nodeStore.containsKey(node)) {
+            log.warn("Node {} does not exist", node.hostName());
+            return;
+        }
+
+        if (getOvsdbConnectionState(node)) {
+            OvsdbClientService ovsdbClient = getOvsdbClient(node);
+            ovsdbClient.disconnect();
+        }
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+
+            Device device = event.subject();
+            ConnectionHandler<Device> handler =
+                    (device.type().equals(SWITCH) ? bridgeHandler : ovsdbHandler);
+
+            switch (event.type()) {
+                case PORT_ADDED:
+                    eventExecutor.submit(() -> bridgeHandler.portAdded(event.port()));
+                    break;
+                case PORT_UPDATED:
+                    if (!event.port().isEnabled()) {
+                        eventExecutor.submit(() -> bridgeHandler.portRemoved(event.port()));
+                    }
+                    break;
+                case DEVICE_ADDED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                    if (deviceService.isAvailable(device.id())) {
+                        eventExecutor.submit(() -> handler.connected(device));
+                    } else {
+                        eventExecutor.submit(() -> handler.disconnected(device));
+                    }
+                    break;
+                default:
+                    log.debug("Unsupported event type {}", event.type().toString());
+                    break;
+            }
+        }
+    }
+
+    private class OvsdbHandler implements ConnectionHandler<Device> {
+
+        @Override
+        public void connected(Device device) {
+            OpenstackNode node = getNodeByOvsdbId(device.id());
+            if (node != null) {
+                setNodeState(node, checkNodeState(node));
+            }
+        }
+
+        @Override
+        public void disconnected(Device device) {
+            if (!deviceService.isAvailable(device.id())) {
+                adminService.removeDevice(device.id());
+            }
+        }
+    }
+
+    private class BridgeHandler implements ConnectionHandler<Device> {
+
+        @Override
+        public void connected(Device device) {
+            OpenstackNode node = getNodeByBridgeId(device.id());
+            if (node != null) {
+                setNodeState(node, checkNodeState(node));
+            }
+        }
+
+        @Override
+        public void disconnected(Device device) {
+            OpenstackNode node = getNodeByBridgeId(device.id());
+            if (node != null) {
+                log.debug("Integration Bridge is disconnected from {}", node.hostName());
+                setNodeState(node, NodeState.INCOMPLETE);
+            }
+        }
+
+        /**
+         * Handles port added situation.
+         * If the added port is tunnel port, proceed remaining node initialization.
+         * Otherwise, do nothing.
+         *
+         * @param port port
+         */
+        public void portAdded(Port port) {
+            if (!port.annotations().value(PORT_NAME).contains(DEFAULT_TUNNEL)) {
+                return;
+            }
+
+            OpenstackNode node = getNodeByBridgeId((DeviceId) port.element().id());
+            if (node != null) {
+                setNodeState(node, checkNodeState(node));
+            }
+        }
+
+        /**
+         * Handles port removed situation.
+         * If the removed port is tunnel port, proceed remaining node initialization.
+         * Others, do nothing.
+         *
+         * @param port port
+         */
+        public void portRemoved(Port port) {
+            if (!port.annotations().value(PORT_NAME).contains(DEFAULT_TUNNEL)) {
+                return;
+            }
+
+            OpenstackNode node = getNodeByBridgeId((DeviceId) port.element().id());
+            if (node != null) {
+                log.info("Tunnel interface is removed from {}", node.hostName());
+                setNodeState(node, NodeState.INCOMPLETE);
+            }
+        }
+    }
+
+
+    private void readConfiguration() {
+        OpenstackNodeConfig config =
+                configService.getConfig(appId, OpenstackNodeConfig.class);
+
+        if (config == null) {
+            log.error("No configuration found");
+            return;
+        }
+
+        config.openstackNodes().stream().forEach(node -> addNode(node));
+        log.info("Node configured");
+    }
+
+    private class InternalConfigListener implements NetworkConfigListener {
+
+        @Override
+        public void event(NetworkConfigEvent event) {
+            if (!event.configClass().equals(OpenstackNodeConfig.class)) {
+                return;
+            }
+
+            switch (event.type()) {
+                case CONFIG_ADDED:
+                case CONFIG_UPDATED:
+                    eventExecutor.execute(OpenstackNodeManager.this::readConfiguration);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+
+}
+