Add the logic of generating ovs bridges on bootstrap kubevirt node
Change-Id: Id691738ee31b509a143143103152111dfb47a606
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtCheckNodeCommand.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtCheckNodeCommand.java
new file mode 100644
index 0000000..06297c2
--- /dev/null
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtCheckNodeCommand.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnode.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+
+import static org.onosproject.kubevirtnode.api.Constants.GENEVE;
+import static org.onosproject.kubevirtnode.api.Constants.GRE;
+import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
+import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
+import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+
+/**
+ * Checks detailed node init state.
+ */
+@Service
+@Command(scope = "onos", name = "kubevirt-check-node",
+ description = "Shows detailed kubevirt nodes status")
+public class KubevirtCheckNodeCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "hostname", description = "Hostname",
+ required = true, multiValued = false)
+ @Completion(KubevirtHostnameCompleter.class)
+ private String hostname = null;
+
+ private static final String MSG_PASS = "PASS";
+ private static final String MSG_FAIL = "FAIL";
+
+ @Override
+ protected void doExecute() throws Exception {
+ KubevirtNodeService nodeService = get(KubevirtNodeService.class);
+ DeviceService deviceService = get(DeviceService.class);
+
+ KubevirtNode node = nodeService.node(hostname);
+ if (node == null) {
+ print("Cannot find %s from registered nodes", hostname);
+ return;
+ }
+
+ print("[Integration Bridge Status]");
+ Device intgBridge = deviceService.getDevice(node.intgBridge());
+ if (intgBridge != null) {
+ print("%s %s=%s available=%s %s",
+ deviceService.isAvailable(intgBridge.id()) ? MSG_PASS : MSG_FAIL,
+ INTEGRATION_BRIDGE,
+ intgBridge.id(),
+ deviceService.isAvailable(intgBridge.id()),
+ intgBridge.annotations());
+ }
+
+ print("");
+ print("[Tunnel Bridge Status]");
+ Device tunBridge = deviceService.getDevice(node.tunBridge());
+ if (tunBridge != null) {
+ print("%s %s=%s available=%s %s",
+ deviceService.isAvailable(tunBridge.id()) ? MSG_PASS : MSG_FAIL,
+ TUNNEL_BRIDGE,
+ tunBridge.id(),
+ deviceService.isAvailable(tunBridge.id()),
+ tunBridge.annotations());
+
+ if (node.dataIp() != null) {
+ printPortState(deviceService, node.tunBridge(), VXLAN);
+ printPortState(deviceService, node.tunBridge(), GRE);
+ printPortState(deviceService, node.tunBridge(), GENEVE);
+ }
+ }
+ }
+
+ private void printPortState(DeviceService deviceService,
+ DeviceId deviceId, String portName) {
+ Port port = deviceService.getPorts(deviceId).stream()
+ .filter(p -> p.annotations().value(PORT_NAME).equals(portName) &&
+ p.isEnabled())
+ .findAny().orElse(null);
+
+ if (port != null) {
+ print("%s %s portNum=%s enabled=%s %s",
+ port.isEnabled() ? MSG_PASS : MSG_FAIL,
+ portName,
+ port.number(),
+ port.isEnabled() ? Boolean.TRUE : Boolean.FALSE,
+ port.annotations());
+ } else {
+ print("%s %s does not exist", MSG_FAIL, portName);
+ }
+ }
+}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtHostnameCompleter.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtHostnameCompleter.java
new file mode 100644
index 0000000..b05ca5f
--- /dev/null
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtHostnameCompleter.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnode.cli;
+
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.CommandLine;
+import org.apache.karaf.shell.api.console.Completer;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.karaf.shell.support.completers.StringsCompleter;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+
+import static org.onosproject.cli.AbstractShellCommand.get;
+
+/**
+ * Kubevirt host completer.
+ */
+@Service
+public class KubevirtHostnameCompleter implements Completer {
+ @Override
+ public int complete(Session session, CommandLine commandLine, List<String> candidates) {
+ StringsCompleter delegate = new StringsCompleter();
+ KubevirtNodeService nodeService = get(KubevirtNodeService.class);
+
+ Set<String> hostnames = nodeService.nodes().stream()
+ .map(KubevirtNode::hostname)
+ .collect(Collectors.toSet());
+ SortedSet<String> strings = delegate.getStrings();
+
+ strings.addAll(hostnames);
+
+ return delegate.complete(session, commandLine, candidates);
+ }
+}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtInitNodeCommand.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtInitNodeCommand.java
new file mode 100644
index 0000000..fade52e
--- /dev/null
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtInitNodeCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnode.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
+
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
+
+/**
+ * Initializes nodes for node service.
+ */
+@Service
+@Command(scope = "onos", name = "kubevirt-init-node",
+ description = "Initializes nodes for KubeVirt node service")
+public class KubevirtInitNodeCommand extends AbstractShellCommand {
+
+ @Option(name = "-a", aliases = "--all", description = "Apply this command to all nodes",
+ required = false, multiValued = false)
+ private boolean isAll = false;
+
+ @Option(name = "-i", aliases = "--incomplete",
+ description = "Apply this command to incomplete nodes",
+ required = false, multiValued = false)
+ private boolean isIncomplete = false;
+
+ @Argument(index = 0, name = "hostnames", description = "Hostname(s) to apply this command",
+ required = false, multiValued = true)
+ @Completion(KubevirtHostnameCompleter.class)
+ private String[] hostnames = null;
+
+ @Override
+ protected void doExecute() throws Exception {
+ KubevirtNodeAdminService service = get(KubevirtNodeAdminService.class);
+
+ if ((!isAll && !isIncomplete && hostnames == null) ||
+ (isAll && isIncomplete) ||
+ (isIncomplete && hostnames != null) ||
+ (hostnames != null && isAll)) {
+ print("Please specify one of hostname, --all, and --incomplete options.");
+ return;
+ }
+
+ if (isAll) {
+ hostnames = service.nodes().stream()
+ .map(KubevirtNode::hostname).toArray(String[]::new);
+ } else if (isIncomplete) {
+ hostnames = service.nodes().stream()
+ .filter(node -> node.state() != COMPLETE)
+ .map(KubevirtNode::hostname).toArray(String[]::new);
+ }
+
+ for (String hostname : hostnames) {
+ KubevirtNode node = service.node(hostname);
+ if (node == null) {
+ print("Unable to find %s", hostname);
+ continue;
+ }
+ print("Initializing %s", hostname);
+ KubevirtNode updated = node.updateState(INIT);
+ service.updateNode(updated);
+ }
+ print("Done.");
+ }
+}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtApiConfigListCommand.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtListApiConfigsCommand.java
similarity index 96%
rename from apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtApiConfigListCommand.java
rename to apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtListApiConfigsCommand.java
index ce64d1e..d01c33a 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtApiConfigListCommand.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtListApiConfigsCommand.java
@@ -31,7 +31,7 @@
@Service
@Command(scope = "onos", name = "kubevirt-api-configs",
description = "Lists all KubeVirt API server configs registered to the service")
-public class KubevirtApiConfigListCommand extends AbstractShellCommand {
+public class KubevirtListApiConfigsCommand extends AbstractShellCommand {
private static final String FORMAT = "%-10s%-25s%-10s%-10s";
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtNodeListCommand.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtListNodesCommand.java
similarity index 97%
rename from apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtNodeListCommand.java
rename to apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtListNodesCommand.java
index 8503821..fc4555a 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtNodeListCommand.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtListNodesCommand.java
@@ -38,7 +38,7 @@
@Service
@Command(scope = "onos", name = "kubevirt-nodes",
description = "Lists all nodes registered in KubeVirt node service")
-public class KubevirtNodeListCommand extends AbstractShellCommand {
+public class KubevirtListNodesCommand extends AbstractShellCommand {
private static final int HOSTNAME_LENGTH = 35;
private static final int TYPE_LENGTH = 15;
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/codec/KubevirtNodeCodec.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/codec/KubevirtNodeCodec.java
index 17ed505..4a7e77c 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/codec/KubevirtNodeCodec.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/codec/KubevirtNodeCodec.java
@@ -48,6 +48,7 @@
private static final String TYPE = "type";
private static final String INTEGRATION_BRIDGE = "integrationBridge";
+ private static final String TUNNEL_BRIDGE = "tunnelBridge";
private static final String STATE = "state";
private static final String PHYSICAL_INTERFACES = "phyIntfs";
@@ -68,6 +69,11 @@
result.put(INTEGRATION_BRIDGE, node.intgBridge().toString());
}
+ // serialize tunnel bridge config
+ if (node.tunBridge() != null) {
+ result.put(TUNNEL_BRIDGE, node.tunBridge().toString());
+ }
+
// serialize data IP only if it presents
if (node.dataIp() != null) {
result.put(DATA_IP, node.dataIp().toString());
@@ -115,6 +121,11 @@
nodeBuilder.intgBridge(DeviceId.deviceId(intBridgeJson.asText()));
}
+ JsonNode tunBridgeJson = json.get(TUNNEL_BRIDGE);
+ if (tunBridgeJson != null) {
+ nodeBuilder.tunBridge(DeviceId.deviceId(tunBridgeJson.asText()));
+ }
+
// parse physical interfaces
List<KubevirtPhyInterface> phyIntfs = new ArrayList<>();
JsonNode phyIntfsJson = json.get(PHYSICAL_INTERFACES);
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
index 699c41b..cfd95b6 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
@@ -25,6 +25,7 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
+import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigAdminService;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
@@ -32,6 +33,7 @@
import org.onosproject.kubevirtnode.api.KubevirtNode;
import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
import org.onosproject.kubevirtnode.api.KubevirtNodeState;
+import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -39,7 +41,10 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -61,6 +66,8 @@
private static final String INTERNAL_IP = "InternalIP";
private static final String K8S_ROLE = "node-role.kubernetes.io";
+ private static final String DEFAULT_PHY_NETWORK = "physical.network";
+ private static final String DEFAULT_PHY_INTERFACE = "physical.interface";
private static final long SLEEP_MS = 10000; // we wait 10s
@@ -157,13 +164,23 @@
}
}
- // TODO: need to config the physnet obtained from node's annotation
+ // start to parse kubernetes annotation
+ Map<String, String> annots = node.getMetadata().getAnnotations();
+ String physnet = annots.get(DEFAULT_PHY_NETWORK);
+ String physintf = annots.get(DEFAULT_PHY_INTERFACE);
+
+ Set<KubevirtPhyInterface> phys = new HashSet<>();
+ if (physnet != null && physintf != null) {
+ phys.add(DefaultKubevirtPhyInterface.builder().network(physnet).intf(physintf).build());
+ }
+
return DefaultKubevirtNode.builder()
.hostname(hostname)
.managementIp(managementIp)
.dataIp(dataIp)
.type(nodeType)
.state(KubevirtNodeState.ON_BOARDED)
+ .phyIntfs(phys)
.build();
}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtNodeHandler.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtNodeHandler.java
new file mode 100644
index 0000000..c6f67a3
--- /dev/null
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtNodeHandler.java
@@ -0,0 +1,948 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnode.impl;
+
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeHandler;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
+import org.onosproject.kubevirtnode.api.KubevirtNodeState;
+import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeDescription;
+import org.onosproject.net.behaviour.BridgeName;
+import org.onosproject.net.behaviour.ControllerInfo;
+import org.onosproject.net.behaviour.DefaultBridgeDescription;
+import org.onosproject.net.behaviour.DefaultPatchDescription;
+import org.onosproject.net.behaviour.DefaultTunnelDescription;
+import org.onosproject.net.behaviour.InterfaceConfig;
+import org.onosproject.net.behaviour.PatchDescription;
+import org.onosproject.net.behaviour.TunnelDescription;
+import org.onosproject.net.behaviour.TunnelEndPoints;
+import org.onosproject.net.behaviour.TunnelKey;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.lang.Thread.sleep;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.packet.TpPort.tpPort;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnode.api.Constants.BRIDGE_PREFIX;
+import static org.onosproject.kubevirtnode.api.Constants.FLOW_KEY;
+import static org.onosproject.kubevirtnode.api.Constants.GENEVE;
+import static org.onosproject.kubevirtnode.api.Constants.GRE;
+import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
+import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
+import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_TUNNEL;
+import static org.onosproject.kubevirtnode.api.Constants.PHYSICAL_TO_INTEGRATION_SUFFIX;
+import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
+import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
+import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.DEVICE_CREATED;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INCOMPLETE;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
+import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
+import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
+import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
+import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.addOrRemoveSystemInterface;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getBooleanProperty;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getOvsdbClient;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.isOvsdbConnected;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.structurePortName;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Service bootstraps kubernetes node based on its type.
+ */
+@Component(immediate = true,
+ property = {
+ OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
+ AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
+ }
+)
+public class DefaultKubevirtNodeHandler implements KubevirtNodeHandler {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String DEFAULT_OF_PROTO = "tcp";
+ private static final int DEFAULT_OFPORT = 6653;
+ private static final int DPID_BEGIN = 3;
+ private static final int NETWORK_BEGIN = 3;
+ private static final long SLEEP_SHORT_MS = 1000; // we wait 1s
+ private static final long SLEEP_LONG_MS = 2000; // we wait 2s
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceAdminService deviceAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OvsdbController ovsdbController;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNodeAdminService nodeAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ /** OVSDB server listen port. */
+ private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
+
+ /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
+ private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ private final DeviceListener ovsdbListener = new InternalOvsdbListener();
+ private final DeviceListener bridgeListener = new InternalBridgeListener();
+ private final KubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
+
+ private ApplicationId appId;
+ private NodeId localNode;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.getAppId(APP_ID);
+ localNode = clusterService.getLocalNode().id();
+
+ componentConfigService.registerProperties(getClass());
+ leadershipService.runForLeadership(appId.name());
+ deviceService.addListener(ovsdbListener);
+ deviceService.addListener(bridgeListener);
+ nodeAdminService.addListener(kubevirtNodeListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ nodeAdminService.removeListener(kubevirtNodeListener);
+ deviceService.removeListener(bridgeListener);
+ deviceService.removeListener(ovsdbListener);
+ componentConfigService.unregisterProperties(getClass(), false);
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Modified
+ protected void modified(ComponentContext context) {
+ readComponentConfiguration(context);
+
+ log.info("Modified");
+ }
+
+ @Override
+ public void processInitState(KubevirtNode node) {
+ if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
+ ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
+ return;
+ }
+ if (!deviceService.isAvailable(node.intgBridge())) {
+ createBridge(node, INTEGRATION_BRIDGE, node.intgBridge());
+ }
+
+ if (!deviceService.isAvailable(node.tunBridge())) {
+ createBridge(node, TUNNEL_BRIDGE, node.tunBridge());
+ }
+ }
+
+ @Override
+ public void processDeviceCreatedState(KubevirtNode node) {
+ try {
+ if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
+ ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
+ return;
+ }
+
+ // create patch ports between integration to other bridges
+ createPatchInterfaces(node);
+
+ if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
+ createVxlanTunnelInterface(node);
+ }
+
+ if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
+ createGreTunnelInterface(node);
+ }
+
+ if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
+ createGeneveTunnelInterface(node);
+ }
+
+ // provision new physical interfaces on the given node
+ // this includes creating physical bridge, attaching physical port
+ // to physical bridge, adding patch ports to both physical bridge and br-int
+ provisionPhysicalInterfaces(node);
+
+ } catch (Exception e) {
+ log.error("Exception occurred because of {}", e);
+ }
+ }
+
+ @Override
+ public void processCompleteState(KubevirtNode node) {
+ // do something if needed
+ }
+
+ @Override
+ public void processIncompleteState(KubevirtNode node) {
+ // do something if needed
+ }
+
+ @Override
+ public void processOnBoardedState(KubevirtNode node) {
+ // do something if needed
+ }
+
+ /**
+ * Extracts properties from the component configuration context.
+ *
+ * @param context the component context
+ */
+ private void readComponentConfiguration(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
+ if (ovsdbPortConfigured == null) {
+ ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
+ log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
+ } else {
+ ovsdbPortNum = ovsdbPortConfigured;
+ log.info("Configured. OVSDB port is {}", ovsdbPortNum);
+ }
+
+ Boolean autoRecoveryConfigured =
+ getBooleanProperty(properties, AUTO_RECOVERY);
+ if (autoRecoveryConfigured == null) {
+ autoRecovery = AUTO_RECOVERY_DEFAULT;
+ log.info("Auto recovery flag is NOT " +
+ "configured, default value is {}", autoRecovery);
+ } else {
+ autoRecovery = autoRecoveryConfigured;
+ log.info("Configured. Auto recovery flag is {}", autoRecovery);
+ }
+ }
+
+ /**
+ * Creates a bridge with a given name on a given kubernetes node.
+ *
+ * @param node kubevirt node
+ * @param bridgeName bridge name
+ * @param devId device identifier
+ */
+ private void createBridge(KubevirtNode node, String bridgeName, DeviceId devId) {
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ List<ControllerInfo> controllers = clusterService.getNodes().stream()
+ .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
+ .collect(Collectors.toList());
+
+ String dpid = devId.toString().substring(DPID_BEGIN);
+
+ BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
+ .name(bridgeName)
+ .failMode(BridgeDescription.FailMode.SECURE)
+ .datapathId(dpid)
+ .disableInBand()
+ .controllers(controllers);
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addBridge(builder.build());
+ }
+
+ /**
+ * Creates a VXLAN tunnel interface in a given kubevirt node.
+ *
+ * @param node kubevirt node
+ */
+ private void createVxlanTunnelInterface(KubevirtNode node) {
+ createTunnelInterface(node, VXLAN, VXLAN);
+ }
+
+ /**
+ * Creates a GRE tunnel interface in a given kubevirt node.
+ *
+ * @param node kubevirt node
+ */
+ private void createGreTunnelInterface(KubevirtNode node) {
+ createTunnelInterface(node, GRE, GRE);
+ }
+
+ /**
+ * Creates a GENEVE tunnel interface in a given kubevirt node.
+ *
+ * @param node kubevirt node
+ */
+ private void createGeneveTunnelInterface(KubevirtNode node) {
+ createTunnelInterface(node, GENEVE, GENEVE);
+ }
+
+ /**
+ * Creates a tunnel interface in a given kubernetes node.
+ *
+ * @param node kubevirt node
+ */
+ private void createTunnelInterface(KubevirtNode node,
+ String type, String intfName) {
+ if (isIntfEnabled(node, intfName)) {
+ return;
+ }
+
+ Device device = deviceService.getDevice(node.ovsdb());
+ if (device == null || !device.is(InterfaceConfig.class)) {
+ log.error("Failed to create tunnel interface on {}", node.ovsdb());
+ return;
+ }
+
+ TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
+
+ InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+ ifaceConfig.addTunnelMode(intfName, tunnelDesc);
+ }
+
+ /**
+ * Builds tunnel description according to the network type.
+ *
+ * @param type network type
+ * @return tunnel description
+ */
+ private TunnelDescription buildTunnelDesc(String type, String intfName) {
+ TunnelKey<String> key = new TunnelKey<>(FLOW_KEY);
+ if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
+ TunnelDescription.Builder tdBuilder =
+ DefaultTunnelDescription.builder()
+ .deviceId(TUNNEL_BRIDGE)
+ .ifaceName(intfName)
+ .remote(TunnelEndPoints.flowTunnelEndpoint())
+ .key(key);
+
+ switch (type) {
+ case VXLAN:
+ tdBuilder.type(TunnelDescription.Type.VXLAN);
+ break;
+ case GRE:
+ tdBuilder.type(TunnelDescription.Type.GRE);
+ break;
+ case GENEVE:
+ tdBuilder.type(TunnelDescription.Type.GENEVE);
+ break;
+ default:
+ return null;
+ }
+
+ return tdBuilder.build();
+ }
+ return null;
+ }
+
+ /**
+ * Checks whether a given network interface in a given kubernetes node
+ * is enabled or not.
+ *
+ * @param node kubevirt node
+ * @param intf network interface name
+ * @return true if the given interface is enabled, false otherwise
+ */
+ private boolean isIntfEnabled(KubevirtNode node, String intf) {
+ return deviceService.isAvailable(node.tunBridge()) &&
+ deviceService.getPorts(node.tunBridge()).stream()
+ .anyMatch(port -> Objects.equals(
+ port.annotations().value(PORT_NAME), intf) &&
+ port.isEnabled());
+ }
+
+ private void createPatchInterfaces(KubevirtNode node) {
+ Device device = deviceService.getDevice(node.ovsdb());
+ if (device == null || !device.is(InterfaceConfig.class)) {
+ log.error("Failed to create patch interface on {}", node.ovsdb());
+ return;
+ }
+
+ InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+
+ // integration bridge -> tunnel bridge
+ PatchDescription brIntTunPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(INTEGRATION_BRIDGE)
+ .ifaceName(INTEGRATION_TO_TUNNEL)
+ .peer(TUNNEL_TO_INTEGRATION)
+ .build();
+
+ ifaceConfig.addPatchMode(INTEGRATION_TO_TUNNEL, brIntTunPatchDesc);
+
+ // tunnel bridge -> integration bridge
+ PatchDescription brTunIntPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(TUNNEL_BRIDGE)
+ .ifaceName(TUNNEL_TO_INTEGRATION)
+ .peer(INTEGRATION_TO_TUNNEL)
+ .build();
+
+ ifaceConfig.addPatchMode(TUNNEL_TO_INTEGRATION, brTunIntPatchDesc);
+ }
+
+ /**
+ * Bootstraps a new kubevirt node.
+ *
+ * @param node kubevirt node
+ */
+ private void bootstrapNode(KubevirtNode node) {
+ if (isCurrentStateDone(node)) {
+ setState(node, node.state().nextState());
+ } else {
+ log.trace("Processing {} state for {}", node.state(), node.hostname());
+ node.state().process(this, node);
+ }
+ }
+
+ /**
+ * Removes the existing kubevirt node.
+ *
+ * @param node kubevirt node
+ */
+ private void removeNode(KubevirtNode node) {
+ OvsdbClientService client = getOvsdbClient(node, ovsdbPortNum, ovsdbController);
+ if (client == null) {
+ log.info("Failed to get ovsdb client");
+ return;
+ }
+
+ // unprovision physical interfaces from the node
+ // this procedure includes detaching physical port from physical bridge,
+ // remove patch ports from br-int, removing physical bridge
+ unprovisionPhysicalInterfaces(node);
+
+ // delete tunnel bridge from the node
+ client.dropBridge(TUNNEL_BRIDGE);
+
+ // delete integration bridge from the node
+ client.dropBridge(INTEGRATION_BRIDGE);
+ }
+
+ /**
+ * Checks whether all requirements for this state are fulfilled or not.
+ *
+ * @param node kubevirt node
+ * @return true if all requirements are fulfilled, false otherwise
+ */
+ private boolean isCurrentStateDone(KubevirtNode node) {
+ switch (node.state()) {
+ case INIT:
+ return isInitStateDone(node);
+ case DEVICE_CREATED:
+ return isDeviceCreatedStateDone(node);
+ case COMPLETE:
+ case INCOMPLETE:
+ case ON_BOARDED:
+ // always return false
+ // run init CLI to re-trigger node bootstrap
+ return false;
+ default:
+ return true;
+ }
+ }
+
+ private boolean isInitStateDone(KubevirtNode node) {
+ if (!isOvsdbConnected(node, ovsdbPortNum,
+ ovsdbController, deviceService)) {
+ return false;
+ }
+
+ try {
+ // we need to wait a while, in case interfaces and bridges
+ // creation requires some time
+ sleep(SLEEP_SHORT_MS);
+ } catch (InterruptedException e) {
+ log.error("Exception caused during init state checking...");
+ }
+
+ cleanPhysicalInterfaces(node);
+
+ return node.intgBridge() != null && node.tunBridge() != null &&
+ deviceService.isAvailable(node.intgBridge()) &&
+ deviceService.isAvailable(node.tunBridge());
+ }
+
+ private boolean isDeviceCreatedStateDone(KubevirtNode node) {
+
+ try {
+ // we need to wait a while, in case tunneling ports
+ // creation requires some time
+ sleep(SLEEP_LONG_MS);
+ } catch (InterruptedException e) {
+ log.error("Exception caused during init state checking...");
+ }
+
+ if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
+ return false;
+ }
+ if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
+ return false;
+ }
+ if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
+ return false;
+ }
+
+ for (KubevirtPhyInterface phyIntf : node.phyIntfs()) {
+ if (phyIntf == null) {
+ return false;
+ }
+
+ String bridgeName = BRIDGE_PREFIX + phyIntf.network();
+ String patchPortName = structurePortName(
+ INTEGRATION_TO_PHYSICAL_PREFIX + phyIntf.network());
+
+ if (!(hasPhyBridge(node, bridgeName) &&
+ hasPhyPatchPort(node, patchPortName) &&
+ hasPhyIntf(node, phyIntf.intf()))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Configures the kubernetes node with new state.
+ *
+ * @param node kubevirt node
+ * @param newState a new state
+ */
+ private void setState(KubevirtNode node, KubevirtNodeState newState) {
+ if (node.state() == newState) {
+ return;
+ }
+ KubevirtNode updated = node.updateState(newState);
+ nodeAdminService.updateNode(updated);
+ log.info("Changed {} state: {}", node.hostname(), newState);
+ }
+
+ private void provisionPhysicalInterfaces(KubevirtNode node) {
+ node.phyIntfs().forEach(pi -> {
+ String bridgeName = BRIDGE_PREFIX + pi.network();
+ String patchPortName =
+ structurePortName(INTEGRATION_TO_PHYSICAL_PREFIX + pi.network());
+
+ if (!hasPhyBridge(node, bridgeName)) {
+ createPhysicalBridge(node, pi);
+ createPhysicalPatchPorts(node, pi);
+ attachPhysicalPort(node, pi);
+ } else {
+ // in case physical bridge exists, but patch port is missing on br-int,
+ // we will add patch port to connect br-int with physical bridge
+ if (!hasPhyPatchPort(node, patchPortName)) {
+ createPhysicalPatchPorts(node, pi);
+ }
+ }
+ });
+ }
+
+ private void cleanPhysicalInterfaces(KubevirtNode node) {
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+
+ Set<String> bridgeNames = bridgeConfig.getBridges().stream()
+ .map(BridgeDescription::name).collect(Collectors.toSet());
+
+ Set<String> phyNetworkNames = node.phyIntfs().stream()
+ .map(pi -> BRIDGE_PREFIX + pi.network()).collect(Collectors.toSet());
+
+ // we remove existing physical bridges and patch ports, if the physical
+ // bridges are not defined in kubevirt node
+ for (String brName : bridgeNames) {
+ if (!phyNetworkNames.contains(brName)) {
+ // integration bridge and tunnel bridge should NOT be treated as
+ // physical bridges
+ if (brName.equals(INTEGRATION_BRIDGE) || brName.equals(TUNNEL_BRIDGE)) {
+ continue;
+ }
+ removePhysicalPatchPorts(node, brName.substring(NETWORK_BEGIN));
+ removePhysicalBridge(node, brName.substring(NETWORK_BEGIN));
+ }
+ }
+ }
+
+ private void unprovisionPhysicalInterfaces(KubevirtNode node) {
+ node.phyIntfs().forEach(pi -> {
+ detachPhysicalPort(node, pi.network(), pi.intf());
+ removePhysicalPatchPorts(node, pi.network());
+ removePhysicalBridge(node, pi.network());
+ });
+ }
+
+ private boolean hasPhyBridge(KubevirtNode node, String bridgeName) {
+ BridgeConfig bridgeConfig =
+ deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
+ return bridgeConfig.getBridges().stream()
+ .anyMatch(br -> br.name().equals(bridgeName));
+ }
+
+ private boolean hasPhyPatchPort(KubevirtNode node, String patchPortName) {
+ List<Port> ports = deviceService.getPorts(node.intgBridge());
+ return ports.stream().anyMatch(p ->
+ p.annotations().value(PORT_NAME).equals(patchPortName));
+ }
+
+ private boolean hasPhyIntf(KubevirtNode node, String intfName) {
+ BridgeConfig bridgeConfig =
+ deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
+ return bridgeConfig.getPorts().stream()
+ .anyMatch(p -> p.annotations().value(PORT_NAME).equals(intfName));
+ }
+
+ private void createPhysicalBridge(KubevirtNode osNode,
+ KubevirtPhyInterface phyInterface) {
+ Device device = deviceService.getDevice(osNode.ovsdb());
+
+ String bridgeName = BRIDGE_PREFIX + phyInterface.network();
+
+ BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
+ .name(bridgeName)
+ .mcastSnoopingEnable();
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addBridge(builder.build());
+ }
+
+ private void removePhysicalBridge(KubevirtNode node, String network) {
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ BridgeName bridgeName = BridgeName.bridgeName(BRIDGE_PREFIX + network);
+
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.deleteBridge(bridgeName);
+ }
+
+ private void createPhysicalPatchPorts(KubevirtNode node,
+ KubevirtPhyInterface phyInterface) {
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ if (device == null || !device.is(InterfaceConfig.class)) {
+ log.error("Failed to create patch interface on {}", node.ovsdb());
+ return;
+ }
+
+ String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
+
+ String intToPhyPatchPort = structurePortName(
+ INTEGRATION_TO_PHYSICAL_PREFIX + phyInterface.network());
+ String phyToIntPatchPort = structurePortName(
+ phyInterface.network() + PHYSICAL_TO_INTEGRATION_SUFFIX);
+
+ // integration bridge -> physical bridge
+ PatchDescription intToPhyPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(INTEGRATION_BRIDGE)
+ .ifaceName(intToPhyPatchPort)
+ .peer(phyToIntPatchPort)
+ .build();
+
+ // physical bridge -> integration bridge
+ PatchDescription phyToIntPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(physicalDeviceId)
+ .ifaceName(phyToIntPatchPort)
+ .peer(intToPhyPatchPort)
+ .build();
+
+ InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+ ifaceConfig.addPatchMode(INTEGRATION_TO_PHYSICAL_PREFIX +
+ phyInterface.network(), intToPhyPatchDesc);
+ ifaceConfig.addPatchMode(phyInterface.network() +
+ PHYSICAL_TO_INTEGRATION_SUFFIX, phyToIntPatchDesc);
+
+ addOrRemoveSystemInterface(node, physicalDeviceId,
+ phyInterface.intf(), deviceService, true);
+ }
+
+ private void removePhysicalPatchPorts(KubevirtNode node, String network) {
+ Device device = deviceService.getDevice(node.ovsdb());
+
+ if (device == null || !device.is(InterfaceConfig.class)) {
+ log.error("Failed to remove patch interface on {}", node.ovsdb());
+ return;
+ }
+
+ String intToPhyPatchPort = structurePortName(
+ INTEGRATION_TO_PHYSICAL_PREFIX + network);
+
+ InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
+ ifaceConfig.removePatchMode(intToPhyPatchPort);
+ }
+
+ private void attachPhysicalPort(KubevirtNode node,
+ KubevirtPhyInterface phyInterface) {
+
+ String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
+
+ addOrRemoveSystemInterface(node, physicalDeviceId,
+ phyInterface.intf(), deviceService, true);
+ }
+
+ private void detachPhysicalPort(KubevirtNode node, String network, String portName) {
+ String physicalDeviceId = BRIDGE_PREFIX + network;
+
+ addOrRemoveSystemInterface(node, physicalDeviceId, portName, deviceService, false);
+ }
+
+ /**
+ * An internal OVSDB listener. This listener is used for listening the
+ * network facing events from OVSDB device. If a new OVSDB device is detected,
+ * ONOS tries to bootstrap the kubernetes node.
+ */
+ private class InternalOvsdbListener implements DeviceListener {
+
+ @Override
+ public boolean isRelevant(DeviceEvent event) {
+ return event.subject().type() == Device.Type.CONTROLLER;
+ }
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(DeviceEvent event) {
+ Device device = event.subject();
+
+ switch (event.type()) {
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_ADDED:
+ eventExecutor.execute(() -> {
+
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubevirtNode node = nodeAdminService.node(device.id());
+
+ if (node == null) {
+ return;
+ }
+
+ if (deviceService.isAvailable(device.id())) {
+ log.debug("OVSDB {} detected", device.id());
+ bootstrapNode(node);
+ }
+ });
+ break;
+ case PORT_ADDED:
+ case PORT_REMOVED:
+ case DEVICE_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+
+ /**
+ * An internal integration bridge listener. This listener is used for
+ * listening the events from integration bridge. To listen the events from
+ * other types of bridge such as provider bridge or tunnel bridge, we need
+ * to augment KubevirtNodeService.node() method.
+ */
+ private class InternalBridgeListener implements DeviceListener {
+
+ @Override
+ public boolean isRelevant(DeviceEvent event) {
+ return event.subject().type() == Device.Type.SWITCH;
+ }
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(DeviceEvent event) {
+ Device device = event.subject();
+ Port port = event.port();
+
+ switch (event.type()) {
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_ADDED:
+ eventExecutor.execute(() -> processDeviceAddition(device));
+ break;
+ case PORT_UPDATED:
+ case PORT_ADDED:
+ eventExecutor.execute(() -> processPortAddition(device, port));
+ break;
+ case PORT_REMOVED:
+ eventExecutor.execute(() -> processPortRemoval(device, port));
+ break;
+ case DEVICE_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ void processDeviceAddition(Device device) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubevirtNode node = nodeAdminService.node(device.id());
+
+ if (node == null) {
+ return;
+ }
+
+ if (deviceService.isAvailable(device.id())) {
+ log.debug("Bridge created on {}", node.hostname());
+ bootstrapNode(node);
+ } else if (node.state() == COMPLETE) {
+ log.info("Device {} disconnected", device.id());
+ setState(node, INCOMPLETE);
+ }
+
+ if (autoRecovery) {
+ if (node.state() == INCOMPLETE || node.state() == DEVICE_CREATED) {
+ log.info("Device {} is reconnected", device.id());
+ nodeAdminService.updateNode(node.updateState(INIT));
+ }
+ }
+ }
+
+ void processPortAddition(Device device, Port port) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubevirtNode node = nodeAdminService.nodeByTunBridge(device.id());
+
+ if (node == null) {
+ return;
+ }
+
+ String portName = port.annotations().value(PORT_NAME);
+ if (node.state() == DEVICE_CREATED && (
+ Objects.equals(portName, VXLAN) ||
+ Objects.equals(portName, GRE) ||
+ Objects.equals(portName, GENEVE))) {
+ log.info("Interface {} added or updated to {}",
+ portName, device.id());
+ bootstrapNode(node);
+ }
+ }
+
+ void processPortRemoval(Device device, Port port) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubevirtNode node = nodeAdminService.node(device.id());
+
+ if (node == null) {
+ return;
+ }
+
+ String portName = port.annotations().value(PORT_NAME);
+ if (node.state() == COMPLETE && (
+ Objects.equals(portName, VXLAN) ||
+ Objects.equals(portName, GRE) ||
+ Objects.equals(portName, GENEVE))) {
+ log.warn("Interface {} removed from {}", portName, device.id());
+ setState(node, INCOMPLETE);
+ }
+ }
+ }
+
+ /**
+ * An internal kubevirt node listener.
+ * The notification is triggered by KubevirtNodeStore.
+ */
+ private class InternalKubevirtNodeListener implements KubevirtNodeListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtNodeEvent event) {
+ switch (event.type()) {
+ case KUBEVIRT_NODE_CREATED:
+ case KUBEVIRT_NODE_UPDATED:
+ eventExecutor.execute(() -> {
+ if (!isRelevantHelper()) {
+ return;
+ }
+ bootstrapNode(event.subject());
+ });
+ break;
+ case KUBEVIRT_NODE_REMOVED:
+ eventExecutor.execute(() -> {
+ if (!isRelevantHelper()) {
+ return;
+ }
+ removeNode(event.subject());
+ });
+ break;
+ case KUBEVIRT_NODE_INCOMPLETE:
+ default:
+ break;
+ }
+ }
+ }
+}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeManager.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeManager.java
index c77efa3..99c8244 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeManager.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeManager.java
@@ -164,6 +164,7 @@
checkNotNull(node, ERR_NULL_NODE);
KubevirtNode intNode;
+ KubevirtNode tunNode;
if (node.intgBridge() == null) {
String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
@@ -177,9 +178,21 @@
NOT_DUPLICATED_MSG, intNode.intgBridge());
}
- nodeStore.createNode(intNode);
+ if (node.tunBridge() == null) {
+ String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
+ checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
+ tunNode = intNode.updateTunBridge(DeviceId.deviceId(deviceIdStr));
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ } else {
+ tunNode = intNode;
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ }
- log.info(String.format(MSG_NODE, intNode.hostname(), MSG_CREATED));
+ nodeStore.createNode(tunNode);
+
+ log.info(String.format(MSG_NODE, tunNode.hostname(), MSG_CREATED));
}
@Override
@@ -187,6 +200,7 @@
checkNotNull(node, ERR_NULL_NODE);
KubevirtNode intNode;
+ KubevirtNode tunNode;
KubevirtNode existingNode = nodeStore.node(node.hostname());
checkNotNull(existingNode, ERR_NULL_NODE);
@@ -203,9 +217,19 @@
NOT_DUPLICATED_MSG, intNode.intgBridge());
}
- nodeStore.updateNode(intNode);
+ DeviceId existTunBridge = nodeStore.node(node.hostname()).tunBridge();
+ if (intNode.tunBridge() == null) {
+ tunNode = intNode.updateTunBridge(existTunBridge);
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ } else {
+ tunNode = intNode;
+ checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
+ NOT_DUPLICATED_MSG, tunNode.tunBridge());
+ }
+ nodeStore.updateNode(tunNode);
- log.info(String.format(MSG_NODE, intNode.hostname(), MSG_UPDATED));
+ log.info(String.format(MSG_NODE, tunNode.hostname(), MSG_UPDATED));
}
@Override
@@ -266,6 +290,13 @@
.findFirst().orElse(null);
}
+ @Override
+ public KubevirtNode nodeByTunBridge(DeviceId deviceId) {
+ return nodeStore.nodes().stream()
+ .filter(node -> Objects.equals(node.tunBridge(), deviceId))
+ .findFirst().orElse(null);
+ }
+
private boolean hasIntgBridge(DeviceId deviceId, String hostname) {
Optional<KubevirtNode> existNode = nodeStore.nodes().stream()
.filter(n -> !n.hostname().equals(hostname))
@@ -275,6 +306,15 @@
return existNode.isPresent();
}
+ private boolean hasTunBridge(DeviceId deviceId, String hostname) {
+ Optional<KubevirtNode> existNode = nodeStore.nodes().stream()
+ .filter(n -> !n.hostname().equals(hostname))
+ .filter(n -> deviceId.equals(n.tunBridge()))
+ .findFirst();
+
+ return existNode.isPresent();
+ }
+
private class InternalNodeStoreDelegate implements KubevirtNodeStoreDelegate {
@Override
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/OsgiPropertyConstants.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/OsgiPropertyConstants.java
index ae1b04c..9c1b8f8 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/OsgiPropertyConstants.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/OsgiPropertyConstants.java
@@ -24,7 +24,7 @@
}
static final String OVSDB_PORT = "ovsdbPortNum";
- static final int OVSDB_PORT_NUM_DEFAULT = 6640;
+ static final int OVSDB_PORT_NUM_DEFAULT = 6653;
static final String AUTO_RECOVERY = "autoRecovery";
static final boolean AUTO_RECOVERY_DEFAULT = true;
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
index 57f989e..1c51336 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
@@ -16,18 +16,30 @@
package org.onosproject.kubevirtnode.util;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.lang.StringUtils;
import org.onlab.packet.IpAddress;
import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.net.Device;
+import org.onosproject.net.behaviour.BridgeConfig;
+import org.onosproject.net.behaviour.BridgeName;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.ovsdb.controller.OvsdbClientService;
+import org.onosproject.ovsdb.controller.OvsdbController;
+import org.onosproject.ovsdb.controller.OvsdbNodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Dictionary;
import java.util.List;
+import static org.onlab.util.Tools.get;
+
/**
* An utility that used in KubeVirt node app.
*/
@@ -42,6 +54,8 @@
private static final String OF_PREFIX = "of:";
private static final String ZERO = "0";
+ private static final int PORT_NAME_MAX_LENGTH = 15;
+
/**
* Prevents object installation from external.
*/
@@ -162,4 +176,106 @@
return new DefaultKubernetesClient(configBuilder.build());
}
+
+ /**
+ * Gets the ovsdb client with supplied openstack node.
+ *
+ * @param node kubernetes node
+ * @param ovsdbPort ovsdb port
+ * @param ovsdbController ovsdb controller
+ * @return ovsdb client
+ */
+ public static OvsdbClientService getOvsdbClient(KubevirtNode node,
+ int ovsdbPort,
+ OvsdbController ovsdbController) {
+ OvsdbNodeId ovsdb = new OvsdbNodeId(node.managementIp(), ovsdbPort);
+ return ovsdbController.getOvsdbClient(ovsdb);
+ }
+
+ /**
+ * Checks whether the controller has a connection with an OVSDB that resides
+ * inside the given kubernetes node.
+ *
+ * @param node kubernetes node
+ * @param ovsdbPort OVSDB port
+ * @param ovsdbController OVSDB controller
+ * @param deviceService device service
+ * @return true if the controller is connected to the OVSDB, false otherwise
+ */
+ public static boolean isOvsdbConnected(KubevirtNode node,
+ int ovsdbPort,
+ OvsdbController ovsdbController,
+ DeviceService deviceService) {
+ OvsdbClientService client = getOvsdbClient(node, ovsdbPort, ovsdbController);
+ return deviceService.isAvailable(node.ovsdb()) &&
+ client != null &&
+ client.isConnected();
+ }
+
+ /**
+ * Adds or removes a network interface (aka port) into a given bridge of kubernetes node.
+ *
+ * @param k8sNode kubernetes node
+ * @param bridgeName bridge name
+ * @param intfName interface name
+ * @param deviceService device service
+ * @param addOrRemove add port is true, remove it otherwise
+ */
+ public static synchronized void addOrRemoveSystemInterface(KubevirtNode k8sNode,
+ String bridgeName,
+ String intfName,
+ DeviceService deviceService,
+ boolean addOrRemove) {
+
+
+ Device device = deviceService.getDevice(k8sNode.ovsdb());
+ if (device == null || !device.is(BridgeConfig.class)) {
+ log.info("device is null or this device if not ovsdb device");
+ return;
+ }
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+
+ if (addOrRemove) {
+ bridgeConfig.addPort(BridgeName.bridgeName(bridgeName), intfName);
+ } else {
+ bridgeConfig.deletePort(BridgeName.bridgeName(bridgeName), intfName);
+ }
+ }
+
+ /**
+ * Re-structures the OVS port name.
+ * The length of OVS port name should be not large than 15.
+ *
+ * @param portName original port name
+ * @return re-structured OVS port name
+ */
+ public static String structurePortName(String portName) {
+
+ // The size of OVS port name should not be larger than 15
+ if (portName.length() > PORT_NAME_MAX_LENGTH) {
+ return StringUtils.substring(portName, 0, PORT_NAME_MAX_LENGTH);
+ }
+
+ return portName;
+ }
+
+ /**
+ * Gets Boolean property from the propertyName
+ * Return null if propertyName is not found.
+ *
+ * @param properties properties to be looked up
+ * @param propertyName the name of the property to look up
+ * @return value when the propertyName is defined or return null
+ */
+ public static Boolean getBooleanProperty(Dictionary<?, ?> properties,
+ String propertyName) {
+ Boolean value;
+ try {
+ String s = get(properties, propertyName);
+ value = Strings.isNullOrEmpty(s) ? null : Boolean.valueOf(s);
+ } catch (ClassCastException e) {
+ value = null;
+ }
+ return value;
+ }
}