[ONOS-7902] Add node handler and a set of CLIs for kubernetes node
Change-Id: Iee4a88e4af437d551a38342de339455387389f61
diff --git a/apps/k8s-node/app/BUILD b/apps/k8s-node/app/BUILD
index fd859b9..d740f68 100644
--- a/apps/k8s-node/app/BUILD
+++ b/apps/k8s-node/app/BUILD
@@ -1,5 +1,7 @@
COMPILE_DEPS = CORE_DEPS + JACKSON + KRYO + CLI + REST + [
"//core/store/serializers:onos-core-serializers",
+ "//protocols/ovsdb/api:onos-protocols-ovsdb-api",
+ "//protocols/ovsdb/rfc:onos-protocols-ovsdb-rfc",
"//apps/k8s-node/api:onos-apps-k8s-node-api",
]
@@ -10,7 +12,12 @@
]
osgi_jar_with_tests(
+ api_description = "REST API for Kubernetes Node",
+ api_package = "org.onosproject.k8snode.web",
+ api_title = "Kubernetes Node API",
+ api_version = "1.0",
karaf_command_packages = ["org.onosproject.k8snode.cli"],
test_deps = TEST_DEPS,
+ web_context = "/onos/k8snode",
deps = COMPILE_DEPS,
)
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java
new file mode 100644
index 0000000..54c66fa
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sHostnameCompleter.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.cli;
+
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.CommandLine;
+import org.apache.karaf.shell.api.console.Completer;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.karaf.shell.support.completers.StringsCompleter;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+
+import static org.onosproject.cli.AbstractShellCommand.get;
+
+/**
+ * Kubernetes host completer.
+ */
+@Service
+public class K8sHostnameCompleter implements Completer {
+ @Override
+ public int complete(Session session, CommandLine commandLine, List<String> candidates) {
+ StringsCompleter delegate = new StringsCompleter();
+ K8sNodeService nodeService = get(K8sNodeService.class);
+
+ Set<String> hostnames = nodeService.nodes().stream()
+ .map(K8sNode::hostname)
+ .collect(Collectors.toSet());
+ SortedSet<String> strings = delegate.getStrings();
+
+ strings.addAll(hostnames);
+
+ return delegate.complete(session, commandLine, candidates);
+ }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java
new file mode 100644
index 0000000..9879415
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeCheckCommand.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+
+import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
+import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
+import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
+import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+
+/**
+ * Checks detailed node init state.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-node-check",
+ description = "Shows detailed kubernetes node init state")
+public class K8sNodeCheckCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "hostname", description = "Hostname",
+ required = true, multiValued = false)
+ @Completion(K8sHostnameCompleter.class)
+ private String hostname = null;
+
+ private static final String MSG_OK = "OK";
+ private static final String MSG_ERROR = "ERROR";
+
+ @Override
+ protected void doExecute() {
+ K8sNodeService nodeService = get(K8sNodeService.class);
+ DeviceService deviceService = get(DeviceService.class);
+
+ K8sNode node = nodeService.node(hostname);
+ if (node == null) {
+ print("Cannot find %s from registered nodes", hostname);
+ return;
+ }
+
+ print("[Integration Bridge Status]");
+ Device device = deviceService.getDevice(node.intgBridge());
+ if (device != null) {
+ print("%s %s=%s available=%s %s",
+ deviceService.isAvailable(device.id()) ? MSG_OK : MSG_ERROR,
+ INTEGRATION_BRIDGE,
+ device.id(),
+ deviceService.isAvailable(device.id()),
+ device.annotations());
+ if (node.dataIp() != null) {
+ printPortState(deviceService, node.intgBridge(), VXLAN_TUNNEL);
+ printPortState(deviceService, node.intgBridge(), GRE_TUNNEL);
+ printPortState(deviceService, node.intgBridge(), GENEVE_TUNNEL);
+ }
+ } else {
+ print("%s %s=%s is not available",
+ MSG_ERROR,
+ INTEGRATION_BRIDGE,
+ node.intgBridge());
+ }
+ }
+
+ private void printPortState(DeviceService deviceService,
+ DeviceId deviceId, String portName) {
+ Port port = deviceService.getPorts(deviceId).stream()
+ .filter(p -> p.annotations().value(PORT_NAME).equals(portName) &&
+ p.isEnabled())
+ .findAny().orElse(null);
+
+ if (port != null) {
+ print("%s %s portNum=%s enabled=%s %s",
+ port.isEnabled() ? MSG_OK : MSG_ERROR,
+ portName,
+ port.number(),
+ port.isEnabled() ? Boolean.TRUE : Boolean.FALSE,
+ port.annotations());
+ } else {
+ print("%s %s does not exist", MSG_ERROR, portName);
+ }
+ }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java
new file mode 100644
index 0000000..a414e15
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeInitCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeAdminService;
+import org.onosproject.k8snode.api.K8sNodeService;
+
+import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
+import static org.onosproject.k8snode.api.K8sNodeState.INIT;
+
+/**
+ * Initializes nodes for node service.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-node-init",
+ description = "Initializes nodes for kubernetes node service")
+public class K8sNodeInitCommand extends AbstractShellCommand {
+
+ @Option(name = "-a", aliases = "--all", description = "Apply this command to all nodes",
+ required = false, multiValued = false)
+ private boolean isAll = false;
+
+ @Option(name = "-i", aliases = "--incomplete",
+ description = "Apply this command to incomplete nodes",
+ required = false, multiValued = false)
+ private boolean isIncomplete = false;
+
+ @Argument(index = 0, name = "hostnames", description = "Hostname(s) to apply this command",
+ required = false, multiValued = true)
+ @Completion(K8sHostnameCompleter.class)
+ private String[] hostnames = null;
+
+ @Override
+ protected void doExecute() {
+ K8sNodeService nodeService = get(K8sNodeService.class);
+ K8sNodeAdminService nodeAdminService = get(K8sNodeAdminService.class);
+
+ if ((!isAll && !isIncomplete && hostnames == null) ||
+ (isAll && isIncomplete) ||
+ (isIncomplete && hostnames != null) ||
+ (hostnames != null && isAll)) {
+ print("Please specify one of hostname, --all, and --incomplete options.");
+ return;
+ }
+
+ if (isAll) {
+ hostnames = nodeService.nodes().stream()
+ .map(K8sNode::hostname).toArray(String[]::new);
+ } else if (isIncomplete) {
+ hostnames = nodeService.nodes().stream()
+ .filter(node -> node.state() != COMPLETE)
+ .map(K8sNode::hostname).toArray(String[]::new);
+ }
+
+ for (String hostname : hostnames) {
+ K8sNode node = nodeService.node(hostname);
+ if (node == null) {
+ print("Unable to find %s", hostname);
+ continue;
+ }
+ print("Initializing %s", hostname);
+ K8sNode updated = node.updateState(INIT);
+ nodeAdminService.updateNode(updated);
+ }
+ print("Done.");
+ }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java
new file mode 100644
index 0000000..04601cf
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/cli/K8sNodeListCommand.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.cli;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.Lists;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
+
+import java.util.Comparator;
+import java.util.List;
+
+import static org.onosproject.k8snode.util.K8sNodeUtil.prettyJson;
+
+/**
+ * Lists all nodes registered to the service.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-nodes",
+ description = "Lists all nodes registered in kubernetes node service")
+public class K8sNodeListCommand extends AbstractShellCommand {
+
+ private static final String FORMAT = "%-20s%-15s%-24s%-24s%-20s%-15s";
+
+ @Override
+ protected void doExecute() {
+ K8sNodeService nodeService = get(K8sNodeService.class);
+ List<K8sNode> nodes = Lists.newArrayList(nodeService.nodes());
+ nodes.sort(Comparator.comparing(K8sNode::hostname));
+
+ if (outputJson()) {
+ print("%s", json(nodes));
+ } else {
+ print(FORMAT, "Hostname", "Type", "Integration Bridge",
+ "Management IP", "Data IP", "State");
+ for (K8sNode node : nodes) {
+ print(FORMAT,
+ node.hostname(),
+ node.type(),
+ node.intgBridge(),
+ node.managementIp(),
+ node.dataIp() != null ? node.dataIp() : "",
+ node.state());
+ }
+ print("Total %s nodes", nodeService.nodes().size());
+ }
+ }
+
+ private String json(List<K8sNode> nodes) {
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode result = mapper.createArrayNode();
+ for (K8sNode node : nodes) {
+ result.add(jsonForEntity(node, K8sNode.class));
+ }
+ return prettyJson(mapper, result.toString());
+ }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java
new file mode 100644
index 0000000..6fb20b7
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/impl/DefaultK8sNodeHandler.java
@@ -0,0 +1,674 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.impl;
+
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeAdminService;
+import org.onosproject.k8snode.api.K8sNodeEvent;
+import org.onosproject.k8snode.api.K8sNodeHandler;
+import org.onosproject.k8snode.api.K8sNodeListener;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.k8snode.api.K8sNodeState;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeDescription;
+import org.onosproject.net.behaviour.ControllerInfo;
+import org.onosproject.net.behaviour.DefaultBridgeDescription;
+import org.onosproject.net.behaviour.DefaultTunnelDescription;
+import org.onosproject.net.behaviour.InterfaceConfig;
+import org.onosproject.net.behaviour.TunnelDescription;
+import org.onosproject.net.behaviour.TunnelEndPoints;
+import org.onosproject.net.behaviour.TunnelKeys;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.packet.TpPort.tpPort;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snode.api.Constants.GENEVE;
+import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
+import static org.onosproject.k8snode.api.Constants.GRE;
+import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
+import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
+import static org.onosproject.k8snode.api.Constants.VXLAN;
+import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
+import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
+import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
+import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
+import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
+import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
+import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty;
+import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient;
+import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Service bootstraps kubernetes node based on its type.
+ */
+@Component(immediate = true,
+ property = {
+ OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
+ AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
+ }
+)
+public class DefaultK8sNodeHandler implements K8sNodeHandler {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String DEFAULT_OF_PROTO = "tcp";
+ private static final int DEFAULT_OFPORT = 6653;
+ private static final int DPID_BEGIN = 3;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceAdminService deviceAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OvsdbController ovsdbController;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sNodeService k8sNodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sNodeAdminService k8sNodeAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ /** OVSDB server listen port. */
+ private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
+
+ /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
+ private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ private final DeviceListener ovsdbListener = new InternalOvsdbListener();
+ private final DeviceListener bridgeListener = new InternalBridgeListener();
+ private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
+
+ private ApplicationId appId;
+ private NodeId localNode;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.getAppId(APP_ID);
+ localNode = clusterService.getLocalNode().id();
+
+ componentConfigService.registerProperties(getClass());
+ leadershipService.runForLeadership(appId.name());
+ deviceService.addListener(ovsdbListener);
+ deviceService.addListener(bridgeListener);
+ k8sNodeService.addListener(k8sNodeListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ k8sNodeService.removeListener(k8sNodeListener);
+ deviceService.removeListener(bridgeListener);
+ deviceService.removeListener(ovsdbListener);
+ componentConfigService.unregisterProperties(getClass(), false);
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Modified
+ protected void modified(ComponentContext context) {
+ readComponentConfiguration(context);
+
+ log.info("Modified");
+ }
+
+ @Override
+ public void processInitState(K8sNode k8sNode) {
+ if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
+ ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
+ return;
+ }
+ if (!deviceService.isAvailable(k8sNode.intgBridge())) {
+ createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
+ }
+ }
+
+ @Override
+ public void processDeviceCreatedState(K8sNode k8sNode) {
+ try {
+ if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
+ ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
+ return;
+ }
+
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
+ createVxlanTunnelInterface(k8sNode);
+ }
+
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
+ createGreTunnelInterface(k8sNode);
+ }
+
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
+ createGeneveTunnelInterface(k8sNode);
+ }
+ } catch (Exception e) {
+ log.error("Exception occurred because of {}", e);
+ }
+ }
+
+ @Override
+ public void processCompleteState(K8sNode k8sNode) {
+ // do something if needed
+ }
+
+ @Override
+ public void processIncompleteState(K8sNode k8sNode) {
+ // do something if needed
+ }
+
+ /**
+ * Extracts properties from the component configuration context.
+ *
+ * @param context the component context
+ */
+ private void readComponentConfiguration(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
+ if (ovsdbPortConfigured == null) {
+ ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
+ log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
+ } else {
+ ovsdbPortNum = ovsdbPortConfigured;
+ log.info("Configured. OVSDB port is {}", ovsdbPortNum);
+ }
+
+ Boolean autoRecoveryConfigured =
+ getBooleanProperty(properties, AUTO_RECOVERY);
+ if (autoRecoveryConfigured == null) {
+ autoRecovery = AUTO_RECOVERY_DEFAULT;
+ log.info("Auto recovery flag is NOT " +
+ "configured, default value is {}", autoRecovery);
+ } else {
+ autoRecovery = autoRecoveryConfigured;
+ log.info("Configured. Auto recovery flag is {}", autoRecovery);
+ }
+ }
+
+ /**
+ * Creates a bridge with a given name on a given kubernetes node.
+ *
+ * @param k8sNode kubernetes node
+ * @param bridgeName bridge name
+ * @param devId device identifier
+ */
+ private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) {
+ Device device = deviceService.getDevice(k8sNode.ovsdb());
+
+ List<ControllerInfo> controllers = clusterService.getNodes().stream()
+ .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
+ .collect(Collectors.toList());
+
+ String dpid = devId.toString().substring(DPID_BEGIN);
+
+ BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
+ .name(bridgeName)
+ .failMode(BridgeDescription.FailMode.SECURE)
+ .datapathId(dpid)
+ .disableInBand()
+ .controllers(controllers);
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addBridge(builder.build());
+ }
+
+ /**
+ * Creates a VXLAN tunnel interface in a given kubernetes node.
+ *
+ * @param k8sNode kubernetes node
+ */
+ private void createVxlanTunnelInterface(K8sNode k8sNode) {
+ createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
+ }
+
+ /**
+ * Creates a GRE tunnel interface in a given kubernetes node.
+ *
+ * @param k8sNode kubernetes node
+ */
+ private void createGreTunnelInterface(K8sNode k8sNode) {
+ createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
+ }
+
+ /**
+ * Creates a GENEVE tunnel interface in a given kubernetes node.
+ *
+ * @param k8sNode kubernetes node
+ */
+ private void createGeneveTunnelInterface(K8sNode k8sNode) {
+ createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
+ }
+
+ /**
+ * Creates a tunnel interface in a given kubernetes node.
+ *
+ * @param k8sNode kubernetes node
+ */
+ private void createTunnelInterface(K8sNode k8sNode,
+ String type, String intfName) {
+ if (isIntfEnabled(k8sNode, intfName)) {
+ return;
+ }
+
+ Device device = deviceService.getDevice(k8sNode.ovsdb());
+ if (device == null || !device.is(InterfaceConfig.class)) {
+ log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb());
+ return;
+ }
+
+ TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
+
+ InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+ ifaceConfig.addTunnelMode(intfName, tunnelDesc);
+ }
+
+ /**
+ * Builds tunnel description according to the network type.
+ *
+ * @param type network type
+ * @return tunnel description
+ */
+ private TunnelDescription buildTunnelDesc(String type, String intfName) {
+ if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
+ TunnelDescription.Builder tdBuilder =
+ DefaultTunnelDescription.builder()
+ .deviceId(INTEGRATION_BRIDGE)
+ .ifaceName(intfName)
+ .remote(TunnelEndPoints.flowTunnelEndpoint())
+ .key(TunnelKeys.flowTunnelKey());
+
+ switch (type) {
+ case VXLAN:
+ tdBuilder.type(TunnelDescription.Type.VXLAN);
+ break;
+ case GRE:
+ tdBuilder.type(TunnelDescription.Type.GRE);
+ break;
+ case GENEVE:
+ tdBuilder.type(TunnelDescription.Type.GENEVE);
+ break;
+ default:
+ return null;
+ }
+
+ return tdBuilder.build();
+ }
+ return null;
+ }
+
+ /**
+ * Checks whether a given network interface in a given kubernetes node
+ * is enabled or not.
+ *
+ * @param k8sNode kubernetes node
+ * @param intf network interface name
+ * @return true if the given interface is enabled, false otherwise
+ */
+ private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
+ return deviceService.isAvailable(k8sNode.intgBridge()) &&
+ deviceService.getPorts(k8sNode.intgBridge()).stream()
+ .anyMatch(port -> Objects.equals(
+ port.annotations().value(PORT_NAME), intf) &&
+ port.isEnabled());
+ }
+
+ /**
+ * Checks whether all requirements for this state are fulfilled or not.
+ *
+ * @param k8sNode kubernetes node
+ * @return true if all requirements are fulfilled, false otherwise
+ */
+ private boolean isCurrentStateDone(K8sNode k8sNode) {
+ switch (k8sNode.state()) {
+ case INIT:
+ if (!isOvsdbConnected(k8sNode, ovsdbPortNum,
+ ovsdbController, deviceService)) {
+ return false;
+ }
+
+ return deviceService.isAvailable(k8sNode.intgBridge());
+ case DEVICE_CREATED:
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
+ return false;
+ }
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
+ return false;
+ }
+ if (k8sNode.dataIp() != null &&
+ !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
+ return false;
+ }
+
+ return true;
+ case COMPLETE:
+ case INCOMPLETE:
+ // always return false
+ // run init CLI to re-trigger node bootstrap
+ return false;
+ default:
+ return true;
+ }
+ }
+
+ /**
+ * Configures the kubernetes node with new state.
+ *
+ * @param k8sNode kubernetes node
+ * @param newState a new state
+ */
+ private void setState(K8sNode k8sNode, K8sNodeState newState) {
+ if (k8sNode.state() == newState) {
+ return;
+ }
+ K8sNode updated = k8sNode.updateState(newState);
+ k8sNodeAdminService.updateNode(updated);
+ log.info("Changed {} state: {}", k8sNode.hostname(), newState);
+ }
+
+ /**
+ * Bootstraps a new kubernetes node.
+ *
+ * @param k8sNode kubernetes node
+ */
+ private void bootstrapNode(K8sNode k8sNode) {
+ if (isCurrentStateDone(k8sNode)) {
+ setState(k8sNode, k8sNode.state().nextState());
+ } else {
+ log.trace("Processing {} state for {}", k8sNode.state(),
+ k8sNode.hostname());
+ k8sNode.state().process(this, k8sNode);
+ }
+ }
+
+ private void processK8sNodeRemoved(K8sNode k8sNode) {
+ OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController);
+ if (client == null) {
+ log.info("Failed to get ovsdb client");
+ return;
+ }
+
+ // delete integration bridge from the node
+ client.dropBridge(INTEGRATION_BRIDGE);
+
+ // disconnect ovsdb
+ client.disconnect();
+ }
+
+ /**
+ * An internal OVSDB listener. This listener is used for listening the
+ * network facing events from OVSDB device. If a new OVSDB device is detected,
+ * ONOS tries to bootstrap the kubernetes node.
+ */
+ private class InternalOvsdbListener implements DeviceListener {
+
+ @Override
+ public boolean isRelevant(DeviceEvent event) {
+ return event.subject().type() == Device.Type.CONTROLLER;
+ }
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(DeviceEvent event) {
+ Device device = event.subject();
+
+ switch (event.type()) {
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_ADDED:
+ eventExecutor.execute(() -> {
+
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ K8sNode k8sNode = k8sNodeService.node(device.id());
+
+ if (k8sNode == null) {
+ return;
+ }
+
+ if (deviceService.isAvailable(device.id())) {
+ log.debug("OVSDB {} detected", device.id());
+ bootstrapNode(k8sNode);
+ }
+ });
+ break;
+ case PORT_ADDED:
+ case PORT_REMOVED:
+ case DEVICE_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+
+ /**
+ * An internal integration bridge listener. This listener is used for
+ * listening the events from integration bridge. To listen the events from
+ * other types of bridge such as provider bridge or tunnel bridge, we need
+ * to augment K8sNodeService.node() method.
+ */
+ private class InternalBridgeListener implements DeviceListener {
+
+ @Override
+ public boolean isRelevant(DeviceEvent event) {
+ return event.subject().type() == Device.Type.SWITCH;
+ }
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(DeviceEvent event) {
+ Device device = event.subject();
+
+ switch (event.type()) {
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_ADDED:
+ eventExecutor.execute(() -> {
+
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ K8sNode k8sNode = k8sNodeService.node(device.id());
+
+ if (k8sNode == null) {
+ return;
+ }
+
+ if (deviceService.isAvailable(device.id())) {
+ log.debug("Integration bridge created on {}", k8sNode.hostname());
+ bootstrapNode(k8sNode);
+ } else if (k8sNode.state() == COMPLETE) {
+ log.info("Device {} disconnected", device.id());
+ setState(k8sNode, INCOMPLETE);
+ }
+
+ if (autoRecovery) {
+ if (k8sNode.state() == INCOMPLETE ||
+ k8sNode.state() == DEVICE_CREATED) {
+ log.info("Device {} is reconnected", device.id());
+ k8sNodeAdminService.updateNode(
+ k8sNode.updateState(K8sNodeState.INIT));
+ }
+ }
+ });
+ break;
+ case PORT_UPDATED:
+ case PORT_ADDED:
+ eventExecutor.execute(() -> {
+
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ K8sNode k8sNode = k8sNodeService.node(device.id());
+
+ if (k8sNode == null) {
+ return;
+ }
+
+ Port port = event.port();
+ String portName = port.annotations().value(PORT_NAME);
+ if (k8sNode.state() == DEVICE_CREATED && (
+ Objects.equals(portName, VXLAN_TUNNEL) ||
+ Objects.equals(portName, GRE_TUNNEL) ||
+ Objects.equals(portName, GENEVE_TUNNEL))) {
+ log.info("Interface {} added or updated to {}",
+ portName, device.id());
+ bootstrapNode(k8sNode);
+ }
+ });
+ break;
+ case PORT_REMOVED:
+ eventExecutor.execute(() -> {
+
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ K8sNode k8sNode = k8sNodeService.node(device.id());
+
+ if (k8sNode == null) {
+ return;
+ }
+
+ Port port = event.port();
+ String portName = port.annotations().value(PORT_NAME);
+ if (k8sNode.state() == COMPLETE && (
+ Objects.equals(portName, VXLAN_TUNNEL) ||
+ Objects.equals(portName, GRE_TUNNEL) ||
+ Objects.equals(portName, GENEVE_TUNNEL))) {
+ log.warn("Interface {} removed from {}",
+ portName, event.subject().id());
+ setState(k8sNode, INCOMPLETE);
+ }
+ });
+ break;
+ case DEVICE_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+
+ /**
+ * An internal kubernetes node listener.
+ * The notification is triggered by KubernetesNodeStore.
+ */
+ private class InternalK8sNodeListener implements K8sNodeListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sNodeEvent event) {
+ switch (event.type()) {
+ case K8S_NODE_CREATED:
+ case K8S_NODE_UPDATED:
+ eventExecutor.execute(() -> {
+
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ bootstrapNode(event.subject());
+ });
+ break;
+ case K8S_NODE_REMOVED:
+ eventExecutor.execute(() -> {
+
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ processK8sNodeRemoved(event.subject());
+ });
+ break;
+ case K8S_NODE_INCOMPLETE:
+ default:
+ break;
+ }
+ }
+ }
+}
diff --git a/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java
new file mode 100644
index 0000000..c0a158e
--- /dev/null
+++ b/apps/k8s-node/app/src/main/java/org/onosproject/k8snode/util/K8sNodeUtil.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snode.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.net.Device;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeName;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.onosproject.ovsdb.controller.OvsdbNodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Dictionary;
+
+import static org.onlab.util.Tools.get;
+
+/**
+ * An utility that used in kubernetes node app.
+ */
+public final class K8sNodeUtil {
+ private static final Logger log = LoggerFactory.getLogger(K8sNodeUtil.class);
+
+ /**
+ * Prevents object installation from external.
+ */
+ private K8sNodeUtil() {
+ }
+
+ /**
+ * Checks whether the controller has a connection with an OVSDB that resides
+ * inside the given kubernetes node.
+ *
+ * @param node kubernetes node
+ * @param ovsdbPort OVSDB port
+ * @param ovsdbController OVSDB controller
+ * @param deviceService device service
+ * @return true if the controller is connected to the OVSDB, false otherwise
+ */
+ public static boolean isOvsdbConnected(K8sNode node,
+ int ovsdbPort,
+ OvsdbController ovsdbController,
+ DeviceService deviceService) {
+ OvsdbClientService client = getOvsdbClient(node, ovsdbPort, ovsdbController);
+ return deviceService.isAvailable(node.ovsdb()) &&
+ client != null &&
+ client.isConnected();
+ }
+
+ /**
+ * Gets the ovsdb client with supplied openstack node.
+ *
+ * @param node kubernetes node
+ * @param ovsdbPort ovsdb port
+ * @param ovsdbController ovsdb controller
+ * @return ovsdb client
+ */
+ public static OvsdbClientService getOvsdbClient(K8sNode node,
+ int ovsdbPort,
+ OvsdbController ovsdbController) {
+ OvsdbNodeId ovsdb = new OvsdbNodeId(node.managementIp(), ovsdbPort);
+ return ovsdbController.getOvsdbClient(ovsdb);
+ }
+
+ /**
+ * Adds or removes a network interface (aka port) into a given bridge of kubernetes node.
+ *
+ * @param k8sNode kubernetes node
+ * @param bridgeName bridge name
+ * @param intfName interface name
+ * @param deviceService device service
+ * @param addOrRemove add port is true, remove it otherwise
+ */
+ public static synchronized void addOrRemoveSystemInterface(K8sNode k8sNode,
+ String bridgeName,
+ String intfName,
+ DeviceService deviceService,
+ boolean addOrRemove) {
+
+
+ Device device = deviceService.getDevice(k8sNode.ovsdb());
+ if (device == null || !device.is(BridgeConfig.class)) {
+ log.info("device is null or this device if not ovsdb device");
+ return;
+ }
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+
+ if (addOrRemove) {
+ bridgeConfig.addPort(BridgeName.bridgeName(bridgeName), intfName);
+ } else {
+ bridgeConfig.deletePort(BridgeName.bridgeName(bridgeName), intfName);
+ }
+ }
+
+ /**
+ * Gets Boolean property from the propertyName
+ * Return null if propertyName is not found.
+ *
+ * @param properties properties to be looked up
+ * @param propertyName the name of the property to look up
+ * @return value when the propertyName is defined or return null
+ */
+ public static Boolean getBooleanProperty(Dictionary<?, ?> properties,
+ String propertyName) {
+ Boolean value;
+ try {
+ String s = get(properties, propertyName);
+ value = Strings.isNullOrEmpty(s) ? null : Boolean.valueOf(s);
+ } catch (ClassCastException e) {
+ value = null;
+ }
+ return value;
+ }
+
+ /**
+ * Prints out the JSON string in pretty format.
+ *
+ * @param mapper Object mapper
+ * @param jsonString JSON string
+ * @return pretty formatted JSON string
+ */
+ public static String prettyJson(ObjectMapper mapper, String jsonString) {
+ try {
+ Object jsonObject = mapper.readValue(jsonString, Object.class);
+ return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonObject);
+ } catch (IOException e) {
+ log.debug("Json string parsing exception caused by {}", e);
+ }
+ return null;
+ }
+}
diff --git a/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json b/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json
index 34823d2..4351616 100644
--- a/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json
+++ b/apps/k8s-node/app/src/main/resources/definitions/K8sNode.json
@@ -13,7 +13,7 @@
"type",
"managementIp",
"dataIp",
- "integrationBridge",
+ "integrationBridge"
],
"properties": {
"hostname": {