Provides CLI command to synchronize states of all kubevirt nodes
Change-Id: Iec3df65d7e0177ebb8beb3d05508851bad67f824
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java
new file mode 100644
index 0000000..91ba18e
--- /dev/null
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnode.cli;
+
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
+
+import java.util.Set;
+
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
+
+/**
+ * Synchronizes kubevirt node states.
+ */
+@Service
+@Command(scope = "onos", name = "kubevirt-sync-state",
+ description = "Synchronizes kubevirt node states.")
+public class KubevirtSyncStateCommand extends AbstractShellCommand {
+ @Override
+ protected void doExecute() throws Exception {
+ KubevirtApiConfigService apiConfigService = get(KubevirtApiConfigService.class);
+
+ print("Re-synchronizing Kubevirt node states..");
+ KubevirtApiConfig config = apiConfigService.apiConfig();
+ bootstrapKubevirtNodes(config);
+ print("Done.");
+
+ }
+
+ private void bootstrapKubevirtNodes(KubevirtApiConfig config) {
+ KubevirtNodeAdminService nodeAdminService = get(KubevirtNodeAdminService.class);
+
+ Set<KubevirtNode> completeNodeSet = nodeAdminService.completeNodes();
+ KubernetesClient k8sClient = k8sClient(config);
+
+ if (k8sClient == null) {
+ log.warn("Failed to connect to kubernetes API server");
+ return;
+ }
+
+ for (Node node : k8sClient.nodes().list().getItems()) {
+ KubevirtNode kubevirtNode = buildKubevirtNode(node);
+ // we always provision VMs to worker nodes, so only need to install
+ // flow rules in worker nodes
+ if (kubevirtNode.type() == WORKER) {
+ if (completeNodeSet.stream().map(KubevirtNode::hostname)
+ .filter(name -> name.equals(kubevirtNode.hostname()))
+ .findAny().isPresent()) {
+ print("Initializing %s because the node was COMPLETE state.",
+ kubevirtNode.hostname());
+ KubevirtNode updated = kubevirtNode.updateState(INIT);
+ nodeAdminService.updateNode(updated);
+ } else {
+ nodeAdminService.updateNode(kubevirtNode);
+ }
+ }
+ }
+ }
+}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
index 0e7de69..49e346b 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
@@ -16,27 +16,18 @@
package org.onosproject.kubevirtnode.impl;
import io.fabric8.kubernetes.api.model.Node;
-import io.fabric8.kubernetes.api.model.NodeAddress;
import io.fabric8.kubernetes.client.KubernetesClient;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
-import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigAdminService;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
import org.onosproject.kubevirtnode.api.KubevirtNode;
import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
-import org.onosproject.kubevirtnode.api.KubevirtNodeState;
-import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -44,19 +35,15 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import java.util.HashSet;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnode.api.KubevirtApiConfig.State.CONNECTED;
import static org.onosproject.kubevirtnode.api.KubevirtApiConfigService.APP_ID;
-import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
import static org.slf4j.LoggerFactory.getLogger;
@@ -68,12 +55,6 @@
private final Logger log = getLogger(getClass());
- private static final String INTERNAL_IP = "InternalIP";
- private static final String K8S_ROLE = "node-role.kubernetes.io";
- private static final String PHYSNET_CONFIG_KEY = "physnet-config";
- private static final String NETWORK_KEY = "network";
- private static final String INTERFACE_KEY = "interface";
-
private static final long SLEEP_MS = 10000; // we wait 10s
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -147,66 +128,6 @@
}
}
- private KubevirtNode buildKubevirtNode(Node node) {
- String hostname = node.getMetadata().getName();
- IpAddress managementIp = null;
- IpAddress dataIp = null;
-
- for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
- if (nodeAddress.getType().equals(INTERNAL_IP)) {
- managementIp = IpAddress.valueOf(nodeAddress.getAddress());
- dataIp = IpAddress.valueOf(nodeAddress.getAddress());
- }
- }
-
- Set<String> rolesFull = node.getMetadata().getLabels().keySet().stream()
- .filter(l -> l.contains(K8S_ROLE))
- .collect(Collectors.toSet());
-
- KubevirtNode.Type nodeType = WORKER;
-
- for (String roleStr : rolesFull) {
- String role = roleStr.split("/")[1];
- if (MASTER.name().equalsIgnoreCase(role)) {
- nodeType = MASTER;
- break;
- }
- }
-
- // start to parse kubernetes annotation
- Map<String, String> annots = node.getMetadata().getAnnotations();
- String physnetConfig = annots.get(PHYSNET_CONFIG_KEY);
- Set<KubevirtPhyInterface> phys = new HashSet<>();
- try {
- if (physnetConfig != null) {
- JSONArray configJson = new JSONArray(physnetConfig);
-
- for (int i = 0; i < configJson.length(); i++) {
- JSONObject object = configJson.getJSONObject(i);
- String network = object.getString(NETWORK_KEY);
- String intf = object.getString(INTERFACE_KEY);
-
- if (network != null && intf != null) {
- phys.add(DefaultKubevirtPhyInterface.builder()
- .network(network).intf(intf).build());
- }
-
- }
- }
- } catch (JSONException e) {
- log.error("Failed to parse network status object", e);
- }
-
- return DefaultKubevirtNode.builder()
- .hostname(hostname)
- .managementIp(managementIp)
- .dataIp(dataIp)
- .type(nodeType)
- .state(KubevirtNodeState.ON_BOARDED)
- .phyIntfs(phys)
- .build();
- }
-
private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
private boolean isRelevantHelper() {
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
index 1c51336..da53f08 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
@@ -17,13 +17,22 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.NodeAddress;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.lang.StringUtils;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
import org.onlab.packet.IpAddress;
+import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
+import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeState;
+import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
import org.onosproject.net.Device;
import org.onosproject.net.behaviour.BridgeConfig;
import org.onosproject.net.behaviour.BridgeName;
@@ -36,9 +45,15 @@
import java.io.IOException;
import java.util.Dictionary;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.onlab.util.Tools.get;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
/**
* An utility that used in KubeVirt node app.
@@ -53,6 +68,11 @@
private static final int HEX_LENGTH = 16;
private static final String OF_PREFIX = "of:";
private static final String ZERO = "0";
+ private static final String INTERNAL_IP = "InternalIP";
+ private static final String K8S_ROLE = "node-role.kubernetes.io";
+ private static final String PHYSNET_CONFIG_KEY = "physnet-config";
+ private static final String NETWORK_KEY = "network";
+ private static final String INTERFACE_KEY = "interface";
private static final int PORT_NAME_MAX_LENGTH = 15;
@@ -278,4 +298,70 @@
}
return value;
}
+
+ /**
+ * Returns the kubevirt node from the node.
+ *
+ * @param node a raw node object returned from a k8s client
+ * @return kubevirt node
+ */
+ public static KubevirtNode buildKubevirtNode(Node node) {
+ String hostname = node.getMetadata().getName();
+ IpAddress managementIp = null;
+ IpAddress dataIp = null;
+
+ for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
+ if (nodeAddress.getType().equals(INTERNAL_IP)) {
+ managementIp = IpAddress.valueOf(nodeAddress.getAddress());
+ dataIp = IpAddress.valueOf(nodeAddress.getAddress());
+ }
+ }
+
+ Set<String> rolesFull = node.getMetadata().getLabels().keySet().stream()
+ .filter(l -> l.contains(K8S_ROLE))
+ .collect(Collectors.toSet());
+
+ KubevirtNode.Type nodeType = WORKER;
+
+ for (String roleStr : rolesFull) {
+ String role = roleStr.split("/")[1];
+ if (MASTER.name().equalsIgnoreCase(role)) {
+ nodeType = MASTER;
+ break;
+ }
+ }
+
+ // start to parse kubernetes annotation
+ Map<String, String> annots = node.getMetadata().getAnnotations();
+ String physnetConfig = annots.get(PHYSNET_CONFIG_KEY);
+ Set<KubevirtPhyInterface> phys = new HashSet<>();
+ try {
+ if (physnetConfig != null) {
+ JSONArray configJson = new JSONArray(physnetConfig);
+
+ for (int i = 0; i < configJson.length(); i++) {
+ JSONObject object = configJson.getJSONObject(i);
+ String network = object.getString(NETWORK_KEY);
+ String intf = object.getString(INTERFACE_KEY);
+
+ if (network != null && intf != null) {
+ phys.add(DefaultKubevirtPhyInterface.builder()
+ .network(network).intf(intf).build());
+ }
+
+ }
+ }
+ } catch (JSONException e) {
+ log.error("Failed to parse network status object", e);
+ }
+
+ return DefaultKubevirtNode.builder()
+ .hostname(hostname)
+ .managementIp(managementIp)
+ .dataIp(dataIp)
+ .type(nodeType)
+ .state(KubevirtNodeState.ON_BOARDED)
+ .phyIntfs(phys)
+ .build();
+ }
}