[ONOS-7902] Add node handler and a set of CLIs for kubernetes node

Change-Id: Iee4a88e4af437d551a38342de339455387389f61
diff --git a/apps/k8s-node/app/BUILD b/apps/k8s-node/app/BUILD
index fd859b9..d740f68 100644
--- a/apps/k8s-node/app/BUILD
+++ b/apps/k8s-node/app/BUILD
@@ -1,5 +1,7 @@
 COMPILE_DEPS = CORE_DEPS + JACKSON + KRYO + CLI + REST + [
     "//core/store/serializers:onos-core-serializers",
+    "//protocols/ovsdb/api:onos-protocols-ovsdb-api",
+    "//protocols/ovsdb/rfc:onos-protocols-ovsdb-rfc",
     "//apps/k8s-node/api:onos-apps-k8s-node-api",
 ]
 
@@ -10,7 +12,12 @@
 ]
 
 osgi_jar_with_tests(
+    api_description = "REST API for Kubernetes Node",
+    api_package = "org.onosproject.k8snode.web",
+    api_title = "Kubernetes Node API",
+    api_version = "1.0",
     karaf_command_packages = ["org.onosproject.k8snode.cli"],
     test_deps = TEST_DEPS,
+    web_context = "/onos/k8snode",
     deps = COMPILE_DEPS,
 )
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java
new file mode 100644
index 0000000..54c66fa
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.cli;
+
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.CommandLine;
+import org.apache.karaf.shell.api.console.Completer;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.karaf.shell.support.completers.StringsCompleter;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+
+import static org.onosproject.cli.AbstractShellCommand.get;
+
+/**
+ * Kubernetes host completer.
+ */
+@Service
+public class K8sHostnameCompleter implements Completer {
+    @Override
+    public int complete(Session session, CommandLine commandLine, List<String> candidates) {
+        StringsCompleter delegate = new StringsCompleter();
+        K8sNodeService nodeService = get(K8sNodeService.class);
+
+        Set<String> hostnames = nodeService.nodes().stream()
+                .map(K8sNode::hostname)
+                .collect(Collectors.toSet());
+        SortedSet<String> strings = delegate.getStrings();
+
+        strings.addAll(hostnames);
+
+        return delegate.complete(session, commandLine, candidates);
+    }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java
new file mode 100644
index 0000000..9879415
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+
+import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
+import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
+import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
+import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+
+/**
+ * Checks detailed node init state.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-node-check",
+        description = "Shows detailed kubernetes node init state")
+public class K8sNodeCheckCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "hostname", description = "Hostname",
+            required = true, multiValued = false)
+    @Completion(K8sHostnameCompleter.class)
+    private String hostname = null;
+
+    private static final String MSG_OK = "OK";
+    private static final String MSG_ERROR = "ERROR";
+
+    @Override
+    protected void doExecute() {
+        K8sNodeService nodeService = get(K8sNodeService.class);
+        DeviceService deviceService = get(DeviceService.class);
+
+        K8sNode node = nodeService.node(hostname);
+        if (node == null) {
+            print("Cannot find %s from registered nodes", hostname);
+            return;
+        }
+
+        print("[Integration Bridge Status]");
+        Device device = deviceService.getDevice(node.intgBridge());
+        if (device != null) {
+            print("%s %s=%s available=%s %s",
+                    deviceService.isAvailable(device.id()) ? MSG_OK : MSG_ERROR,
+                    INTEGRATION_BRIDGE,
+                    device.id(),
+                    deviceService.isAvailable(device.id()),
+                    device.annotations());
+            if (node.dataIp() != null) {
+                printPortState(deviceService, node.intgBridge(), VXLAN_TUNNEL);
+                printPortState(deviceService, node.intgBridge(), GRE_TUNNEL);
+                printPortState(deviceService, node.intgBridge(), GENEVE_TUNNEL);
+            }
+        } else {
+            print("%s %s=%s is not available",
+                    MSG_ERROR,
+                    INTEGRATION_BRIDGE,
+                    node.intgBridge());
+        }
+    }
+
+    private void printPortState(DeviceService deviceService,
+                                DeviceId deviceId, String portName) {
+        Port port = deviceService.getPorts(deviceId).stream()
+                .filter(p -> p.annotations().value(PORT_NAME).equals(portName) &&
+                        p.isEnabled())
+                .findAny().orElse(null);
+
+        if (port != null) {
+            print("%s %s portNum=%s enabled=%s %s",
+                    port.isEnabled() ? MSG_OK : MSG_ERROR,
+                    portName,
+                    port.number(),
+                    port.isEnabled() ? Boolean.TRUE : Boolean.FALSE,
+                    port.annotations());
+        } else {
+            print("%s %s does not exist", MSG_ERROR, portName);
+        }
+    }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java
new file mode 100644
index 0000000..a414e15
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeAdminService;
+import org.onosproject.k8snode.api.K8sNodeService;
+
+import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
+import static org.onosproject.k8snode.api.K8sNodeState.INIT;
+
+/**
+ * Initializes nodes for node service.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-node-init",
+        description = "Initializes nodes for kubernetes node service")
+public class K8sNodeInitCommand extends AbstractShellCommand {
+
+    @Option(name = "-a", aliases = "--all", description = "Apply this command to all nodes",
+            required = false, multiValued = false)
+    private boolean isAll = false;
+
+    @Option(name = "-i", aliases = "--incomplete",
+            description = "Apply this command to incomplete nodes",
+            required = false, multiValued = false)
+    private boolean isIncomplete = false;
+
+    @Argument(index = 0, name = "hostnames", description = "Hostname(s) to apply this command",
+            required = false, multiValued = true)
+    @Completion(K8sHostnameCompleter.class)
+    private String[] hostnames = null;
+
+    @Override
+    protected void doExecute() {
+        K8sNodeService nodeService = get(K8sNodeService.class);
+        K8sNodeAdminService nodeAdminService = get(K8sNodeAdminService.class);
+
+        if ((!isAll && !isIncomplete && hostnames == null) ||
+                (isAll && isIncomplete) ||
+                (isIncomplete && hostnames != null) ||
+                (hostnames != null && isAll)) {
+            print("Please specify one of hostname, --all, and --incomplete options.");
+            return;
+        }
+
+        if (isAll) {
+            hostnames = nodeService.nodes().stream()
+                    .map(K8sNode::hostname).toArray(String[]::new);
+        } else if (isIncomplete) {
+            hostnames = nodeService.nodes().stream()
+                    .filter(node -> node.state() != COMPLETE)
+                    .map(K8sNode::hostname).toArray(String[]::new);
+        }
+
+        for (String hostname : hostnames) {
+            K8sNode node = nodeService.node(hostname);
+            if (node == null) {
+                print("Unable to find %s", hostname);
+                continue;
+            }
+            print("Initializing %s", hostname);
+            K8sNode updated = node.updateState(INIT);
+            nodeAdminService.updateNode(updated);
+        }
+        print("Done.");
+    }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java
new file mode 100644
index 0000000..04601cf
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.cli;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.Lists;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
+
+import java.util.Comparator;
+import java.util.List;
+
+import static org.onosproject.k8snode.util.K8sNodeUtil.prettyJson;
+
+/**
+ * Lists all nodes registered to the service.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-nodes",
+        description = "Lists all nodes registered in kubernetes node service")
+public class K8sNodeListCommand extends AbstractShellCommand {
+
+    private static final String FORMAT = "%-20s%-15s%-24s%-24s%-20s%-15s";
+
+    @Override
+    protected void doExecute() {
+        K8sNodeService nodeService = get(K8sNodeService.class);
+        List<K8sNode> nodes = Lists.newArrayList(nodeService.nodes());
+        nodes.sort(Comparator.comparing(K8sNode::hostname));
+
+        if (outputJson()) {
+            print("%s", json(nodes));
+        } else {
+            print(FORMAT, "Hostname", "Type", "Integration Bridge",
+                    "Management IP", "Data IP", "State");
+            for (K8sNode node : nodes) {
+                print(FORMAT,
+                        node.hostname(),
+                        node.type(),
+                        node.intgBridge(),
+                        node.managementIp(),
+                        node.dataIp() != null ? node.dataIp() : "",
+                        node.state());
+            }
+            print("Total %s nodes", nodeService.nodes().size());
+        }
+    }
+
+    private String json(List<K8sNode> nodes) {
+        ObjectMapper mapper = new ObjectMapper();
+        ArrayNode result = mapper.createArrayNode();
+        for (K8sNode node : nodes) {
+            result.add(jsonForEntity(node, K8sNode.class));
+        }
+        return prettyJson(mapper, result.toString());
+    }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java
new file mode 100644
index 0000000..6fb20b7
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java
@@ -0,0 +1,674 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.impl;
+
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeAdminService;
+import org.onosproject.k8snode.api.K8sNodeEvent;
+import org.onosproject.k8snode.api.K8sNodeHandler;
+import org.onosproject.k8snode.api.K8sNodeListener;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.k8snode.api.K8sNodeState;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeDescription;
+import org.onosproject.net.behaviour.ControllerInfo;
+import org.onosproject.net.behaviour.DefaultBridgeDescription;
+import org.onosproject.net.behaviour.DefaultTunnelDescription;
+import org.onosproject.net.behaviour.InterfaceConfig;
+import org.onosproject.net.behaviour.TunnelDescription;
+import org.onosproject.net.behaviour.TunnelEndPoints;
+import org.onosproject.net.behaviour.TunnelKeys;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.packet.TpPort.tpPort;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snode.api.Constants.GENEVE;
+import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
+import static org.onosproject.k8snode.api.Constants.GRE;
+import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
+import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
+import static org.onosproject.k8snode.api.Constants.VXLAN;
+import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
+import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
+import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
+import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
+import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
+import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty;
+import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient;
+import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Service bootstraps kubernetes node based on its type.
+ */
+@Component(immediate = true,
+        property = {
+                OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
+                AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
+        }
+)
+public class DefaultK8sNodeHandler implements K8sNodeHandler {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String DEFAULT_OF_PROTO = "tcp";
+    private static final int DEFAULT_OFPORT = 6653;
+    private static final int DPID_BEGIN = 3;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceAdminService deviceAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected OvsdbController ovsdbController;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNodeService k8sNodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNodeAdminService k8sNodeAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+
+    /** OVSDB server listen port. */
+    private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
+
+    /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
+    private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+    private final DeviceListener ovsdbListener = new InternalOvsdbListener();
+    private final DeviceListener bridgeListener = new InternalBridgeListener();
+    private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
+
+    private ApplicationId appId;
+    private NodeId localNode;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.getAppId(APP_ID);
+        localNode = clusterService.getLocalNode().id();
+
+        componentConfigService.registerProperties(getClass());
+        leadershipService.runForLeadership(appId.name());
+        deviceService.addListener(ovsdbListener);
+        deviceService.addListener(bridgeListener);
+        k8sNodeService.addListener(k8sNodeListener);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        k8sNodeService.removeListener(k8sNodeListener);
+        deviceService.removeListener(bridgeListener);
+        deviceService.removeListener(ovsdbListener);
+        componentConfigService.unregisterProperties(getClass(), false);
+        leadershipService.withdraw(appId.name());
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    @Modified
+    protected void modified(ComponentContext context) {
+        readComponentConfiguration(context);
+
+        log.info("Modified");
+    }
+
+    @Override
+    public void processInitState(K8sNode k8sNode) {
+        if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
+            ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
+            return;
+        }
+        if (!deviceService.isAvailable(k8sNode.intgBridge())) {
+            createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
+        }
+    }
+
+    @Override
+    public void processDeviceCreatedState(K8sNode k8sNode) {
+        try {
+            if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
+                ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
+                return;
+            }
+
+            if (k8sNode.dataIp() != null &&
+                    !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
+                createVxlanTunnelInterface(k8sNode);
+            }
+
+            if (k8sNode.dataIp() != null &&
+                    !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
+                createGreTunnelInterface(k8sNode);
+            }
+
+            if (k8sNode.dataIp() != null &&
+                    !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
+                createGeneveTunnelInterface(k8sNode);
+            }
+        } catch (Exception e) {
+            log.error("Exception occurred because of {}", e);
+        }
+    }
+
+    @Override
+    public void processCompleteState(K8sNode k8sNode) {
+        // do something if needed
+    }
+
+    @Override
+    public void processIncompleteState(K8sNode k8sNode) {
+        // do something if needed
+    }
+
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
+        if (ovsdbPortConfigured == null) {
+            ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
+            log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
+        } else {
+            ovsdbPortNum = ovsdbPortConfigured;
+            log.info("Configured. OVSDB port is {}", ovsdbPortNum);
+        }
+
+        Boolean autoRecoveryConfigured =
+                getBooleanProperty(properties, AUTO_RECOVERY);
+        if (autoRecoveryConfigured == null) {
+            autoRecovery = AUTO_RECOVERY_DEFAULT;
+            log.info("Auto recovery flag is NOT " +
+                    "configured, default value is {}", autoRecovery);
+        } else {
+            autoRecovery = autoRecoveryConfigured;
+            log.info("Configured. Auto recovery flag is {}", autoRecovery);
+        }
+    }
+
+    /**
+     * Creates a bridge with a given name on a given kubernetes node.
+     *
+     * @param k8sNode       kubernetes node
+     * @param bridgeName    bridge name
+     * @param devId         device identifier
+     */
+    private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) {
+        Device device = deviceService.getDevice(k8sNode.ovsdb());
+
+        List<ControllerInfo> controllers = clusterService.getNodes().stream()
+                .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
+                .collect(Collectors.toList());
+
+        String dpid = devId.toString().substring(DPID_BEGIN);
+
+        BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
+                .name(bridgeName)
+                .failMode(BridgeDescription.FailMode.SECURE)
+                .datapathId(dpid)
+                .disableInBand()
+                .controllers(controllers);
+
+        BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+        bridgeConfig.addBridge(builder.build());
+    }
+
+    /**
+     * Creates a VXLAN tunnel interface in a given kubernetes node.
+     *
+     * @param k8sNode       kubernetes node
+     */
+    private void createVxlanTunnelInterface(K8sNode k8sNode) {
+        createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
+    }
+
+    /**
+     * Creates a GRE tunnel interface in a given kubernetes node.
+     *
+     * @param k8sNode       kubernetes node
+     */
+    private void createGreTunnelInterface(K8sNode k8sNode) {
+        createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
+    }
+
+    /**
+     * Creates a GENEVE tunnel interface in a given kubernetes node.
+     *
+     * @param k8sNode       kubernetes node
+     */
+    private void createGeneveTunnelInterface(K8sNode k8sNode) {
+        createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
+    }
+
+    /**
+     * Creates a tunnel interface in a given kubernetes node.
+     *
+     * @param k8sNode       kubernetes node
+     */
+    private void createTunnelInterface(K8sNode k8sNode,
+                                       String type, String intfName) {
+        if (isIntfEnabled(k8sNode, intfName)) {
+            return;
+        }
+
+        Device device = deviceService.getDevice(k8sNode.ovsdb());
+        if (device == null || !device.is(InterfaceConfig.class)) {
+            log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb());
+            return;
+        }
+
+        TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
+
+        InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+        ifaceConfig.addTunnelMode(intfName, tunnelDesc);
+    }
+
+    /**
+     * Builds tunnel description according to the network type.
+     *
+     * @param type      network type
+     * @return tunnel description
+     */
+    private TunnelDescription buildTunnelDesc(String type, String intfName) {
+        if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
+            TunnelDescription.Builder tdBuilder =
+                    DefaultTunnelDescription.builder()
+                            .deviceId(INTEGRATION_BRIDGE)
+                            .ifaceName(intfName)
+                            .remote(TunnelEndPoints.flowTunnelEndpoint())
+                            .key(TunnelKeys.flowTunnelKey());
+
+            switch (type) {
+                case VXLAN:
+                    tdBuilder.type(TunnelDescription.Type.VXLAN);
+                    break;
+                case GRE:
+                    tdBuilder.type(TunnelDescription.Type.GRE);
+                    break;
+                case GENEVE:
+                    tdBuilder.type(TunnelDescription.Type.GENEVE);
+                    break;
+                default:
+                    return null;
+            }
+
+            return tdBuilder.build();
+        }
+        return null;
+    }
+
+    /**
+     * Checks whether a given network interface in a given kubernetes node
+     * is enabled or not.
+     *
+     * @param k8sNode       kubernetes node
+     * @param intf          network interface name
+     * @return true if the given interface is enabled, false otherwise
+     */
+    private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
+        return deviceService.isAvailable(k8sNode.intgBridge()) &&
+                deviceService.getPorts(k8sNode.intgBridge()).stream()
+                        .anyMatch(port -> Objects.equals(
+                                port.annotations().value(PORT_NAME), intf) &&
+                                port.isEnabled());
+    }
+
+    /**
+     * Checks whether all requirements for this state are fulfilled or not.
+     *
+     * @param k8sNode       kubernetes node
+     * @return true if all requirements are fulfilled, false otherwise
+     */
+    private boolean isCurrentStateDone(K8sNode k8sNode) {
+        switch (k8sNode.state()) {
+            case INIT:
+                if (!isOvsdbConnected(k8sNode, ovsdbPortNum,
+                        ovsdbController, deviceService)) {
+                    return false;
+                }
+
+                return deviceService.isAvailable(k8sNode.intgBridge());
+            case DEVICE_CREATED:
+                if (k8sNode.dataIp() != null &&
+                        !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
+                    return false;
+                }
+                if (k8sNode.dataIp() != null &&
+                        !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
+                    return false;
+                }
+                if (k8sNode.dataIp() != null &&
+                        !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
+                    return false;
+                }
+
+                return true;
+            case COMPLETE:
+            case INCOMPLETE:
+                // always return false
+                // run init CLI to re-trigger node bootstrap
+                return false;
+            default:
+                return true;
+        }
+    }
+
+    /**
+     * Configures the kubernetes node with new state.
+     *
+     * @param k8sNode       kubernetes node
+     * @param newState      a new state
+     */
+    private void setState(K8sNode k8sNode, K8sNodeState newState) {
+        if (k8sNode.state() == newState) {
+            return;
+        }
+        K8sNode updated = k8sNode.updateState(newState);
+        k8sNodeAdminService.updateNode(updated);
+        log.info("Changed {} state: {}", k8sNode.hostname(), newState);
+    }
+
+    /**
+     * Bootstraps a new kubernetes node.
+     *
+     * @param k8sNode kubernetes node
+     */
+    private void bootstrapNode(K8sNode k8sNode) {
+        if (isCurrentStateDone(k8sNode)) {
+            setState(k8sNode, k8sNode.state().nextState());
+        } else {
+            log.trace("Processing {} state for {}", k8sNode.state(),
+                    k8sNode.hostname());
+            k8sNode.state().process(this, k8sNode);
+        }
+    }
+
+    private void processK8sNodeRemoved(K8sNode k8sNode) {
+        OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController);
+        if (client == null) {
+            log.info("Failed to get ovsdb client");
+            return;
+        }
+
+        // delete integration bridge from the node
+        client.dropBridge(INTEGRATION_BRIDGE);
+
+        // disconnect ovsdb
+        client.disconnect();
+    }
+
+    /**
+     * An internal OVSDB listener. This listener is used for listening the
+     * network facing events from OVSDB device. If a new OVSDB device is detected,
+     * ONOS tries to bootstrap the kubernetes node.
+     */
+    private class InternalOvsdbListener implements DeviceListener {
+
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            return event.subject().type() == Device.Type.CONTROLLER;
+        }
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(DeviceEvent event) {
+            Device device = event.subject();
+
+            switch (event.type()) {
+                case DEVICE_AVAILABILITY_CHANGED:
+                case DEVICE_ADDED:
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        K8sNode k8sNode = k8sNodeService.node(device.id());
+
+                        if (k8sNode == null) {
+                            return;
+                        }
+
+                        if (deviceService.isAvailable(device.id())) {
+                            log.debug("OVSDB {} detected", device.id());
+                            bootstrapNode(k8sNode);
+                        }
+                    });
+                    break;
+                case PORT_ADDED:
+                case PORT_REMOVED:
+                case DEVICE_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+    }
+
+    /**
+     * An internal integration bridge listener. This listener is used for
+     * listening the events from integration bridge. To listen the events from
+     * other types of bridge such as provider bridge or tunnel bridge, we need
+     * to augment K8sNodeService.node() method.
+     */
+    private class InternalBridgeListener implements DeviceListener {
+
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            return event.subject().type() == Device.Type.SWITCH;
+        }
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(DeviceEvent event) {
+            Device device = event.subject();
+
+            switch (event.type()) {
+                case DEVICE_AVAILABILITY_CHANGED:
+                case DEVICE_ADDED:
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        K8sNode k8sNode = k8sNodeService.node(device.id());
+
+                        if (k8sNode == null) {
+                            return;
+                        }
+
+                        if (deviceService.isAvailable(device.id())) {
+                            log.debug("Integration bridge created on {}", k8sNode.hostname());
+                            bootstrapNode(k8sNode);
+                        } else if (k8sNode.state() == COMPLETE) {
+                            log.info("Device {} disconnected", device.id());
+                            setState(k8sNode, INCOMPLETE);
+                        }
+
+                        if (autoRecovery) {
+                            if (k8sNode.state() == INCOMPLETE ||
+                                    k8sNode.state() == DEVICE_CREATED) {
+                                log.info("Device {} is reconnected", device.id());
+                                k8sNodeAdminService.updateNode(
+                                        k8sNode.updateState(K8sNodeState.INIT));
+                            }
+                        }
+                    });
+                    break;
+                case PORT_UPDATED:
+                case PORT_ADDED:
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        K8sNode k8sNode = k8sNodeService.node(device.id());
+
+                        if (k8sNode == null) {
+                            return;
+                        }
+
+                        Port port = event.port();
+                        String portName = port.annotations().value(PORT_NAME);
+                        if (k8sNode.state() == DEVICE_CREATED && (
+                                Objects.equals(portName, VXLAN_TUNNEL) ||
+                                        Objects.equals(portName, GRE_TUNNEL) ||
+                                        Objects.equals(portName, GENEVE_TUNNEL))) {
+                            log.info("Interface {} added or updated to {}",
+                                    portName, device.id());
+                            bootstrapNode(k8sNode);
+                        }
+                    });
+                    break;
+                case PORT_REMOVED:
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        K8sNode k8sNode = k8sNodeService.node(device.id());
+
+                        if (k8sNode == null) {
+                            return;
+                        }
+
+                        Port port = event.port();
+                        String portName = port.annotations().value(PORT_NAME);
+                        if (k8sNode.state() == COMPLETE && (
+                                Objects.equals(portName, VXLAN_TUNNEL) ||
+                                        Objects.equals(portName, GRE_TUNNEL) ||
+                                        Objects.equals(portName, GENEVE_TUNNEL))) {
+                            log.warn("Interface {} removed from {}",
+                                    portName, event.subject().id());
+                            setState(k8sNode, INCOMPLETE);
+                        }
+                    });
+                    break;
+                case DEVICE_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+    }
+
+    /**
+     * An internal kubernetes node listener.
+     * The notification is triggered by KubernetesNodeStore.
+     */
+    private class InternalK8sNodeListener implements K8sNodeListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sNodeEvent event) {
+            switch (event.type()) {
+                case K8S_NODE_CREATED:
+                case K8S_NODE_UPDATED:
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        bootstrapNode(event.subject());
+                    });
+                    break;
+                case K8S_NODE_REMOVED:
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        processK8sNodeRemoved(event.subject());
+                    });
+                    break;
+                case K8S_NODE_INCOMPLETE:
+                default:
+                    break;
+            }
+        }
+    }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java
new file mode 100644
index 0000000..c0a158e
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.net.Device;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeName;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.onosproject.ovsdb.controller.OvsdbNodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Dictionary;
+
+import static org.onlab.util.Tools.get;
+
+/**
+ * An utility that used in kubernetes node app.
+ */
+public final class K8sNodeUtil {
+    private static final Logger log = LoggerFactory.getLogger(K8sNodeUtil.class);
+
+    /**
+     * Prevents object installation from external.
+     */
+    private K8sNodeUtil() {
+    }
+
+    /**
+     * Checks whether the controller has a connection with an OVSDB that resides
+     * inside the given kubernetes node.
+     *
+     * @param node              kubernetes node
+     * @param ovsdbPort         OVSDB port
+     * @param ovsdbController   OVSDB controller
+     * @param deviceService     device service
+     * @return true if the controller is connected to the OVSDB, false otherwise
+     */
+    public static boolean isOvsdbConnected(K8sNode node,
+                                           int ovsdbPort,
+                                           OvsdbController ovsdbController,
+                                           DeviceService deviceService) {
+        OvsdbClientService client = getOvsdbClient(node, ovsdbPort, ovsdbController);
+        return deviceService.isAvailable(node.ovsdb()) &&
+                client != null &&
+                client.isConnected();
+    }
+
+    /**
+     * Gets the ovsdb client with supplied openstack node.
+     *
+     * @param node              kubernetes node
+     * @param ovsdbPort         ovsdb port
+     * @param ovsdbController   ovsdb controller
+     * @return ovsdb client
+     */
+    public static OvsdbClientService getOvsdbClient(K8sNode node,
+                                                    int ovsdbPort,
+                                                    OvsdbController ovsdbController) {
+        OvsdbNodeId ovsdb = new OvsdbNodeId(node.managementIp(), ovsdbPort);
+        return ovsdbController.getOvsdbClient(ovsdb);
+    }
+
+    /**
+     * Adds or removes a network interface (aka port) into a given bridge of kubernetes node.
+     *
+     * @param k8sNode       kubernetes node
+     * @param bridgeName    bridge name
+     * @param intfName      interface name
+     * @param deviceService device service
+     * @param addOrRemove   add port is true, remove it otherwise
+     */
+    public static synchronized void addOrRemoveSystemInterface(K8sNode k8sNode,
+                                                               String bridgeName,
+                                                               String intfName,
+                                                               DeviceService deviceService,
+                                                               boolean addOrRemove) {
+
+
+        Device device = deviceService.getDevice(k8sNode.ovsdb());
+        if (device == null || !device.is(BridgeConfig.class)) {
+            log.info("device is null or this device if not ovsdb device");
+            return;
+        }
+        BridgeConfig bridgeConfig =  device.as(BridgeConfig.class);
+
+        if (addOrRemove) {
+            bridgeConfig.addPort(BridgeName.bridgeName(bridgeName), intfName);
+        } else {
+            bridgeConfig.deletePort(BridgeName.bridgeName(bridgeName), intfName);
+        }
+    }
+
+    /**
+     * Gets Boolean property from the propertyName
+     * Return null if propertyName is not found.
+     *
+     * @param properties   properties to be looked up
+     * @param propertyName the name of the property to look up
+     * @return value when the propertyName is defined or return null
+     */
+    public static Boolean getBooleanProperty(Dictionary<?, ?> properties,
+                                             String propertyName) {
+        Boolean value;
+        try {
+            String s = get(properties, propertyName);
+            value = Strings.isNullOrEmpty(s) ? null : Boolean.valueOf(s);
+        } catch (ClassCastException e) {
+            value = null;
+        }
+        return value;
+    }
+
+    /**
+     * Prints out the JSON string in pretty format.
+     *
+     * @param mapper        Object mapper
+     * @param jsonString    JSON string
+     * @return pretty formatted JSON string
+     */
+    public static String prettyJson(ObjectMapper mapper, String jsonString) {
+        try {
+            Object jsonObject = mapper.readValue(jsonString, Object.class);
+            return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonObject);
+        } catch (IOException e) {
+            log.debug("Json string parsing exception caused by {}", e);
+        }
+        return null;
+    }
+}
diff --git a/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json b/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json
index 34823d2..4351616 100644
--- a/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json
+++ b/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json
@@ -13,7 +13,7 @@
           "type",
           "managementIp",
           "dataIp",
-          "integrationBridge",
+          "integrationBridge"
         ],
         "properties": {
           "hostname": {