Add a CLI to synchronize kubevirt state from kubernetes
Change-Id: I3c519f7707d386e3e488b1a7e4641b7316fec289
(cherry picked from commit 6a53ae6d2fea70ed13577acd177403921286347a)
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubevirtSyncStateCommand.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubevirtSyncStateCommand.java
new file mode 100644
index 0000000..5c51008
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubevirtSyncStateCommand.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2022-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.cli;
+
+import com.eclipsesource.json.JsonArray;
+import com.eclipsesource.json.JsonObject;
+import com.eclipsesource.json.JsonValue;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancer;
+import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerAdminService;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
+import org.onosproject.kubevirtnetworking.api.KubevirtPeerRouter;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouterAdminService;
+import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroup;
+import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupAdminService;
+import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupRule;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.buildKubevirtNode;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseKubevirtNetwork;
+
+/**
+ * Synchronizes kubevirt states.
+ */
+@Service
+@Command(scope = "onos", name = "kubevirt-sync-state",
+ description = "Synchronizes kubevirt states.")
+public class KubevirtSyncStateCommand extends AbstractShellCommand {
+
+ private static final String ITEMS = "items";
+ private static final String SPEC = "spec";
+ private final CustomResourceDefinitionContext routerCrdCxt = new CustomResourceDefinitionContext
+ .Builder()
+ .withGroup("kubevirt.io")
+ .withScope("Cluster")
+ .withVersion("v1")
+ .withPlural("virtualrouters")
+ .build();
+
+ private final CustomResourceDefinitionContext nadCrdCxt = new CustomResourceDefinitionContext
+ .Builder()
+ .withGroup("k8s.cni.cncf.io")
+ .withScope("Namespaced")
+ .withVersion("v1")
+ .withPlural("network-attachment-definitions")
+ .build();
+
+ private final CustomResourceDefinitionContext securityGroupCrdCxt = new CustomResourceDefinitionContext
+ .Builder()
+ .withGroup("kubevirt.io")
+ .withScope("Cluster")
+ .withVersion("v1")
+ .withPlural("securitygroups")
+ .build();
+
+ private final CustomResourceDefinitionContext securityGroupRuleCrdCxt = new CustomResourceDefinitionContext
+ .Builder()
+ .withGroup("kubevirt.io")
+ .withScope("Cluster")
+ .withVersion("v1")
+ .withPlural("securitygrouprules")
+ .build();
+
+ private final CustomResourceDefinitionContext lbCrdCxt = new CustomResourceDefinitionContext
+ .Builder()
+ .withGroup("kubevirt.io")
+ .withScope("Cluster")
+ .withVersion("v1")
+ .withPlural("loadbalancers")
+ .build();
+
+ @Override
+ protected void doExecute() throws Exception {
+ KubevirtApiConfigService apiConfigService = get(KubevirtApiConfigService.class);
+
+ print("Re-synchronizing Kubevirt states..");
+ KubevirtApiConfig config = apiConfigService.apiConfig();
+ KubernetesClient k8sClient = k8sClient(config);
+
+ if (k8sClient == null) {
+ error("Failed to initialize Kubernetes client.");
+ return;
+ }
+
+ // try to sync nodes
+ syncNodes(k8sClient);
+
+ // try to sync networks
+ syncNetworks(k8sClient);
+
+ // try to sync routers
+ syncRouters(k8sClient);
+
+ // try to sync security groups
+ syncSecurityGroups(k8sClient);
+
+ // try to sync security group rules
+ syncSecurityGroupRules(k8sClient);
+
+ // try to sync load balancers
+ syncLoadBalancers(k8sClient);
+
+ print("Done.");
+ }
+
+ private void syncNodes(KubernetesClient client) {
+ KubevirtNodeAdminService nodeService = get(KubevirtNodeAdminService.class);
+ Set<KubevirtNode> existingNodes = nodeService.nodes();
+ Set<String> existingNodeNames = existingNodes.stream()
+ .map(KubevirtNode::hostname).collect(Collectors.toSet());
+ List<Node> refNodes = client.nodes().list().getItems();
+
+ for (Node node : refNodes) {
+ String nodeName = node.getMetadata().getName();
+ KubevirtNode builtNode = buildKubevirtNode(node);
+ if (existingNodeNames.contains(nodeName)) {
+ nodeService.updateNode(builtNode);
+ } else {
+ nodeService.createNode(builtNode);
+ }
+ }
+
+ print("Successfully synchronized nodes!");
+ }
+
+ private void syncRouters(KubernetesClient client) {
+ KubevirtRouterAdminService routerService = get(KubevirtRouterAdminService.class);
+ Set<KubevirtRouter> existingRouters = routerService.routers();
+ Set<String> existingRouterNames = existingRouters.stream()
+ .map(KubevirtRouter::name).collect(Collectors.toSet());
+ Map<String, Object> refRouters = client.customResource(routerCrdCxt).list();
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ String jsonString = mapper.writeValueAsString(refRouters);
+ JsonObject json = JsonObject.readFrom(jsonString);
+ JsonArray items = json.get(ITEMS).asArray();
+
+ for (JsonValue item : items) {
+ KubevirtRouter router = parseKubevirtRouter(routerService, item.toString());
+ if (router != null) {
+ if (existingRouterNames.contains(router.name())) {
+
+ KubevirtPeerRouter oldPeerRouter = routerService.router(router.name()).peerRouter();
+ if (oldPeerRouter != null
+ && Objects.equals(oldPeerRouter.ipAddress(), router.peerRouter().ipAddress())
+ && oldPeerRouter.macAddress() != null
+ && router.peerRouter().macAddress() == null) {
+
+ router = router.updatePeerRouter(oldPeerRouter);
+ }
+
+ routerService.updateRouter(router);
+ } else {
+ routerService.createRouter(router);
+ }
+ }
+ }
+ } catch (Exception e) {
+ error("Failed to synchronize routers! Reason: " + e.getMessage());
+ return;
+ }
+
+ print("Successfully synchronized routers!");
+ }
+
+ private void syncNetworks(KubernetesClient client) {
+ KubevirtNetworkAdminService networkService = get(KubevirtNetworkAdminService.class);
+ Set<KubevirtNetwork> existingNetworks = networkService.networks();
+ Set<String> existingNetworkNames = existingNetworks.stream()
+ .map(KubevirtNetwork::name).collect(Collectors.toSet());
+ Map<String, Object> refNetworks = client.customResource(nadCrdCxt).list();
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ String jsonString = mapper.writeValueAsString(refNetworks);
+ JsonObject json = JsonObject.readFrom(jsonString);
+ JsonArray items = json.get(ITEMS).asArray();
+ for (JsonValue item : items) {
+ KubevirtNetwork network = parseKubevirtNetwork(item.toString());
+ if (network != null) {
+ if (existingNetworkNames.contains(network.name())) {
+ networkService.updateNetwork(network);
+ } else {
+ networkService.createNetwork(network);
+ }
+ }
+ }
+ } catch (Exception e) {
+ error("Failed to synchronize networks! Reason: " + e.getMessage());
+ return;
+ }
+
+ print("Successfully synchronized networks!");
+ }
+
+ private void syncSecurityGroups(KubernetesClient client) {
+ KubevirtSecurityGroupAdminService sgService = get(KubevirtSecurityGroupAdminService.class);
+ Set<KubevirtSecurityGroup> existingSgs = sgService.securityGroups();
+ Set<String> existingSgNames = existingSgs.stream()
+ .map(KubevirtSecurityGroup::name).collect(Collectors.toSet());
+ Map<String, Object> refSgs = client.customResource(securityGroupCrdCxt).list();
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ String jsonString = mapper.writeValueAsString(refSgs);
+ JsonObject json = JsonObject.readFrom(jsonString);
+ JsonArray items = json.get(ITEMS).asArray();
+ for (JsonValue item : items) {
+ KubevirtSecurityGroup sg = parseSecurityGroup(item.toString());
+ if (sg != null) {
+ if (existingSgNames.contains(sg.name())) {
+ KubevirtSecurityGroup orig = sgService.securityGroup(sg.id());
+
+ if (orig != null) {
+ KubevirtSecurityGroup updated = sg.updateRules(orig.rules());
+ sgService.updateSecurityGroup(updated);
+ }
+ } else {
+ sgService.createSecurityGroup(sg);
+ }
+ }
+ }
+ } catch (Exception e) {
+ error("Failed to synchronize security groups! Reason: " + e.getMessage());
+ return;
+ }
+
+ print("Successfully synchronized security groups!");
+ }
+
+ private void syncSecurityGroupRules(KubernetesClient client) {
+ KubevirtSecurityGroupAdminService sgService = get(KubevirtSecurityGroupAdminService.class);
+ Set<KubevirtSecurityGroup> existingSgs = sgService.securityGroups();
+ Set<KubevirtSecurityGroupRule> existingSgrs = new HashSet<>();
+ existingSgs.forEach(sg -> existingSgrs.addAll(sg.rules()));
+ Set<String> existingSgrIds = existingSgrs.stream()
+ .map(KubevirtSecurityGroupRule::id).collect(Collectors.toSet());
+ Map<String, Object> refSgrs = client.customResource(securityGroupRuleCrdCxt).list();
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ String jsonString = mapper.writeValueAsString(refSgrs);
+ JsonObject json = JsonObject.readFrom(jsonString);
+ JsonArray items = json.get(ITEMS).asArray();
+
+ for (JsonValue item : items) {
+ KubevirtSecurityGroupRule sgr = parseSecurityGroupRule(item.toString());
+ if (sgr != null) {
+ if (!existingSgrIds.contains(sgr.id())) {
+ sgService.createSecurityGroupRule(sgr);
+ }
+ }
+ }
+ } catch (Exception e) {
+ error("Failed to synchronize security group rules! Reason: " + e.getMessage());
+ return;
+ }
+
+ print("Successfully synchronized security group rules!");
+ }
+
+ private void syncLoadBalancers(KubernetesClient client) {
+ KubevirtLoadBalancerAdminService lbService = get(KubevirtLoadBalancerAdminService.class);
+ Set<KubevirtLoadBalancer> existingLbs = lbService.loadBalancers();
+ Set<String> existingLbNames = existingLbs.stream()
+ .map(KubevirtLoadBalancer::name).collect(Collectors.toSet());
+ Map<String, Object> refLbs = client.customResource(lbCrdCxt).list();
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ String jsonString = mapper.writeValueAsString(refLbs);
+ JsonObject json = JsonObject.readFrom(jsonString);
+ JsonArray items = json.get(ITEMS).asArray();
+
+ for (JsonValue item : items) {
+ KubevirtLoadBalancer lb = parseKubevirtLoadBalancer(item.toString());
+ if (lb != null) {
+ if (existingLbNames.contains(lb.name())) {
+ lbService.updateLoadBalancer(lb);
+ } else {
+ lbService.createLoadBalancer(lb);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ error("Failed to synchronize load balancers! Reason: " + e.getMessage());
+ return;
+ }
+
+ print("Successfully synchronized load balancers!");
+ }
+
+ private KubevirtRouter parseKubevirtRouter(KubevirtRouterAdminService service, String resource) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ ObjectNode spec = (ObjectNode) json.get(SPEC);
+
+ KubevirtRouter router = codec(KubevirtRouter.class).decode(spec, this);
+ KubevirtRouter existing = service.router(router.name());
+
+ if (existing == null) {
+ return router;
+ } else {
+ return router.updatedElectedGateway(existing.electedGateway());
+ }
+ } catch (IOException e) {
+ log.error("Failed to parse kubevirt router object");
+ }
+
+ return null;
+ }
+
+ private KubevirtSecurityGroup parseSecurityGroup(String resource) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ ObjectNode spec = (ObjectNode) json.get(SPEC);
+ return codec(KubevirtSecurityGroup.class).decode(spec, this);
+ } catch (IOException e) {
+ log.error("Failed to parse kubevirt security group object");
+ }
+
+ return null;
+ }
+
+ private KubevirtSecurityGroupRule parseSecurityGroupRule(String resource) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ ObjectNode spec = (ObjectNode) json.get(SPEC);
+ return codec(KubevirtSecurityGroupRule.class).decode(spec, this);
+ } catch (IOException e) {
+ log.error("Failed to parse kubevirt security group rule object");
+ }
+
+ return null;
+ }
+
+ private KubevirtLoadBalancer parseKubevirtLoadBalancer(String resource) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ ObjectNode spec = (ObjectNode) json.get(SPEC);
+ return codec(KubevirtLoadBalancer.class).decode(spec, this);
+ } catch (IOException e) {
+ log.error("Failed to parse kubevirt load balancer object");
+ }
+
+ return null;
+ }
+}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupManager.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupManager.java
index b92db05..8906452 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupManager.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupManager.java
@@ -60,6 +60,7 @@
private static final String MSG_SG_RULE = "Kubevirt security group rule %s %s";
private static final String MSG_CREATED = "created";
+ private static final String MSG_UPDATED = "updated";
private static final String MSG_REMOVED = "removed";
private static final String ERR_NULL_SG =
@@ -114,6 +115,7 @@
checkArgument(!Strings.isNullOrEmpty(sg.id()), ERR_NULL_SG_ID);
sgStore.updateSecurityGroup(sg);
+ log.info(String.format(MSG_SG, sg.name(), MSG_UPDATED));
}
@Override
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/NetworkAttachmentDefinitionWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/NetworkAttachmentDefinitionWatcher.java
index d82a043..3ef16f4 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/NetworkAttachmentDefinitionWatcher.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/NetworkAttachmentDefinitionWatcher.java
@@ -15,25 +15,16 @@
*/
package org.onosproject.kubevirtnetworking.impl;
-import com.eclipsesource.json.JsonArray;
-import com.eclipsesource.json.JsonObject;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
-import org.apache.commons.lang.StringUtils;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpPrefix;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.kubevirtnetworking.api.DefaultKubevirtNetwork;
-import org.onosproject.kubevirtnetworking.api.KubevirtHostRoute;
-import org.onosproject.kubevirtnetworking.api.KubevirtIpPool;
import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
-import org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type;
import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
@@ -47,17 +38,14 @@
import org.slf4j.Logger;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.Locale;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
-import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseKubevirtNetwork;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseResourceName;
import static org.slf4j.LoggerFactory.getLogger;
@@ -69,21 +57,6 @@
private final Logger log = getLogger(getClass());
- private static final String NETWORK_CONFIG = "network-config";
- private static final String TYPE = "type";
- private static final String MTU = "mtu";
- private static final String SEGMENT_ID = "segmentId";
- private static final String GATEWAY_IP = "gatewayIp";
- private static final String DEFAULT_ROUTE = "defaultRoute";
- private static final String CIDR = "cidr";
- private static final String HOST_ROUTES = "hostRoutes";
- private static final String DESTINATION = "destination";
- private static final String NEXTHOP = "nexthop";
- private static final String IP_POOL = "ipPool";
- private static final String START = "start";
- private static final String END = "end";
- private static final String DNSES = "dnses";
-
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -267,80 +240,5 @@
private boolean isMaster() {
return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
-
- private KubevirtNetwork parseKubevirtNetwork(String resource) {
- JsonObject json = JsonObject.readFrom(resource);
- String name = parseResourceName(resource);
- JsonObject annots = json.get("metadata").asObject().get("annotations").asObject();
- if (annots.get(NETWORK_CONFIG) == null) {
- // SR-IOV network does not contain network-config field
- return null;
- }
- String networkConfig = annots.get(NETWORK_CONFIG).asString();
- if (networkConfig != null) {
- KubevirtNetwork.Builder builder = DefaultKubevirtNetwork.builder();
-
- JsonObject configJson = JsonObject.readFrom(networkConfig);
- String type = configJson.get(TYPE).asString().toUpperCase(Locale.ROOT);
- Integer mtu = configJson.get(MTU).asInt();
- String gatewayIp = configJson.getString(GATEWAY_IP, "");
- boolean defaultRoute = configJson.getBoolean(DEFAULT_ROUTE, false);
-
- if (!type.equalsIgnoreCase(FLAT.name())) {
- builder.segmentId(configJson.getString(SEGMENT_ID, ""));
- }
-
- String cidr = configJson.getString(CIDR, "");
-
- JsonObject poolJson = configJson.get(IP_POOL).asObject();
- if (poolJson != null) {
- String start = poolJson.getString(START, "");
- String end = poolJson.getString(END, "");
- builder.ipPool(new KubevirtIpPool(
- IpAddress.valueOf(start), IpAddress.valueOf(end)));
- }
-
- if (configJson.get(HOST_ROUTES) != null) {
- JsonArray routesJson = configJson.get(HOST_ROUTES).asArray();
- Set<KubevirtHostRoute> hostRoutes = new HashSet<>();
- if (routesJson != null) {
- for (int i = 0; i < routesJson.size(); i++) {
- JsonObject route = routesJson.get(i).asObject();
- String destinationStr = route.getString(DESTINATION, "");
- String nexthopStr = route.getString(NEXTHOP, "");
-
- if (StringUtils.isNotEmpty(destinationStr) &&
- StringUtils.isNotEmpty(nexthopStr)) {
- hostRoutes.add(new KubevirtHostRoute(
- IpPrefix.valueOf(destinationStr),
- IpAddress.valueOf(nexthopStr)));
- }
- }
- }
- builder.hostRoutes(hostRoutes);
- }
-
- if (configJson.get(DNSES) != null) {
- JsonArray dnsesJson = configJson.get(DNSES).asArray();
- Set<IpAddress> dnses = new HashSet<>();
- if (dnsesJson != null) {
- for (int i = 0; i < dnsesJson.size(); i++) {
- String dns = dnsesJson.get(i).asString();
- if (StringUtils.isNotEmpty(dns)) {
- dnses.add(IpAddress.valueOf(dns));
- }
- }
- }
- builder.dnses(dnses);
- }
-
- builder.networkId(name).name(name).type(Type.valueOf(type))
- .mtu(mtu).gatewayIp(IpAddress.valueOf(gatewayIp))
- .defaultRoute(defaultRoute).cidr(cidr);
-
- return builder.build();
- }
- return null;
- }
}
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
index e9baecf..de71f29 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
@@ -15,12 +15,18 @@
*/
package org.onosproject.kubevirtnetworking.util;
+import com.eclipsesource.json.JsonArray;
import com.eclipsesource.json.JsonObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.NodeAddress;
+import io.fabric8.kubernetes.api.model.NodeSpec;
+import io.fabric8.kubernetes.api.model.Taint;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -30,9 +36,13 @@
import org.onlab.packet.Ethernet;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.kubevirtnetworking.api.DefaultKubevirtNetwork;
import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
+import org.onosproject.kubevirtnetworking.api.KubevirtHostRoute;
+import org.onosproject.kubevirtnetworking.api.KubevirtIpPool;
import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancer;
import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerService;
import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
@@ -40,10 +50,14 @@
import org.onosproject.kubevirtnetworking.api.KubevirtPort;
import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
+import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
+import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
import org.onosproject.kubevirtnode.api.KubevirtNode;
import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.kubevirtnode.api.KubevirtNodeState;
+import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
@@ -60,13 +74,20 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_TO_TENANT_PREFIX;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
+import static org.onosproject.kubevirtnode.api.Constants.SONA_PROJECT_DOMAIN;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.OTHER;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
/**
@@ -92,6 +113,35 @@
private static final String INTERFACES = "interfaces";
private static final String NODE_NAME = "nodeName";
+ private static final String NETWORK_CONFIG = "network-config";
+ private static final String TYPE = "type";
+ private static final String MTU = "mtu";
+ private static final String SEGMENT_ID = "segmentId";
+ private static final String GATEWAY_IP = "gatewayIp";
+ private static final String DEFAULT_ROUTE = "defaultRoute";
+ private static final String CIDR = "cidr";
+ private static final String HOST_ROUTES = "hostRoutes";
+ private static final String DESTINATION = "destination";
+ private static final String NEXTHOP = "nexthop";
+ private static final String IP_POOL = "ipPool";
+ private static final String START = "start";
+ private static final String END = "end";
+ private static final String DNSES = "dnses";
+
+ private static final String INTERNAL_IP = "InternalIP";
+ private static final String K8S_ROLE = "node-role.kubernetes.io";
+ private static final String PHYSNET_CONFIG_KEY = SONA_PROJECT_DOMAIN + "/physnet-config";
+ private static final String DATA_IP_KEY = SONA_PROJECT_DOMAIN + "/data-ip";
+ private static final String GATEWAY_CONFIG_KEY = SONA_PROJECT_DOMAIN + "/gateway-config";
+ private static final String GATEWAY_BRIDGE_NAME = "gatewayBridgeName";
+ private static final String NETWORK_KEY = "network";
+ private static final String INTERFACE_KEY = "interface";
+ private static final String PHYS_BRIDGE_ID = "physBridgeId";
+
+ private static final String NO_SCHEDULE_EFFECT = "NoSchedule";
+ private static final String KUBEVIRT_IO_KEY = "kubevirt.io/drain";
+ private static final String DRAINING_VALUE = "draining";
+
/**
* Prevents object installation from external.
*/
@@ -691,4 +741,194 @@
log.error(e.toString());
}
}
+
+ /**
+ * Returns the kubevirt node from the node.
+ *
+ * @param node a raw node object returned from a k8s client
+ * @return kubevirt node
+ */
+ public static KubevirtNode buildKubevirtNode(Node node) {
+ String hostname = node.getMetadata().getName();
+ IpAddress managementIp = null;
+ IpAddress dataIp = null;
+
+ for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
+ if (nodeAddress.getType().equals(INTERNAL_IP)) {
+ managementIp = IpAddress.valueOf(nodeAddress.getAddress());
+ dataIp = IpAddress.valueOf(nodeAddress.getAddress());
+ }
+ }
+
+ Set<String> rolesFull = node.getMetadata().getLabels().keySet().stream()
+ .filter(l -> l.contains(K8S_ROLE))
+ .collect(Collectors.toSet());
+
+ KubevirtNode.Type nodeType = WORKER;
+
+ for (String roleStr : rolesFull) {
+ String role = roleStr.split("/")[1];
+ if (MASTER.name().equalsIgnoreCase(role)) {
+ nodeType = MASTER;
+ break;
+ }
+ }
+
+ // start to parse kubernetes annotation
+ Map<String, String> annots = node.getMetadata().getAnnotations();
+ String physnetConfig = annots.get(PHYSNET_CONFIG_KEY);
+ String gatewayConfig = annots.get(GATEWAY_CONFIG_KEY);
+ String dataIpStr = annots.get(DATA_IP_KEY);
+ Set<KubevirtPhyInterface> phys = new HashSet<>();
+ String gatewayBridgeName = null;
+ try {
+ if (physnetConfig != null) {
+ JsonArray configJson = JsonArray.readFrom(physnetConfig);
+
+ for (int i = 0; i < configJson.size(); i++) {
+ JsonObject object = configJson.get(i).asObject();
+ String network = object.get(NETWORK_KEY).asString();
+ String intf = object.get(INTERFACE_KEY).asString();
+
+ if (network != null && intf != null) {
+ String physBridgeId;
+ if (object.get(PHYS_BRIDGE_ID) != null) {
+ physBridgeId = object.get(PHYS_BRIDGE_ID).asString();
+ } else {
+ physBridgeId = genDpidFromName(network + intf + hostname);
+ log.trace("host {} physnet dpid for network {} intf {} is null so generate dpid {}",
+ hostname, network, intf, physBridgeId);
+ }
+
+ phys.add(DefaultKubevirtPhyInterface.builder()
+ .network(network)
+ .intf(intf)
+ .physBridge(DeviceId.deviceId(physBridgeId))
+ .build());
+ }
+ }
+ }
+
+ if (dataIpStr != null) {
+ dataIp = IpAddress.valueOf(dataIpStr);
+ }
+
+ if (gatewayConfig != null) {
+ JsonNode jsonNode = new ObjectMapper().readTree(gatewayConfig);
+
+ nodeType = GATEWAY;
+ gatewayBridgeName = jsonNode.get(GATEWAY_BRIDGE_NAME).asText();
+ }
+ } catch (JsonProcessingException e) {
+ log.error("Failed to parse physnet config or gateway config object", e);
+ }
+
+ // if the node is taint with kubevirt.io key configured,
+ // we mark this node as OTHER type, and do not add it into the cluster
+ NodeSpec spec = node.getSpec();
+ if (spec.getTaints() != null) {
+ for (Taint taint : spec.getTaints()) {
+ String effect = taint.getEffect();
+ String key = taint.getKey();
+ String value = taint.getValue();
+
+ if (StringUtils.equals(effect, NO_SCHEDULE_EFFECT) &&
+ StringUtils.equals(key, KUBEVIRT_IO_KEY) &&
+ StringUtils.equals(value, DRAINING_VALUE)) {
+ nodeType = OTHER;
+ }
+ }
+ }
+
+ return DefaultKubevirtNode.builder()
+ .hostname(hostname)
+ .managementIp(managementIp)
+ .dataIp(dataIp)
+ .type(nodeType)
+ .state(KubevirtNodeState.ON_BOARDED)
+ .phyIntfs(phys)
+ .gatewayBridgeName(gatewayBridgeName)
+ .build();
+ }
+
+ /**
+ * Parses kubevirt network resource.
+ *
+ * @param resource kubevirt network resource string
+ * @return kubevirt network object
+ */
+ public static KubevirtNetwork parseKubevirtNetwork(String resource) {
+ JsonObject json = JsonObject.readFrom(resource);
+ String name = parseResourceName(resource);
+ JsonObject annots = json.get("metadata").asObject().get("annotations").asObject();
+ if (annots.get(NETWORK_CONFIG) == null) {
+ // SR-IOV network does not contain network-config field
+ return null;
+ }
+ String networkConfig = annots.get(NETWORK_CONFIG).asString();
+ if (networkConfig != null) {
+ KubevirtNetwork.Builder builder = DefaultKubevirtNetwork.builder();
+
+ JsonObject configJson = JsonObject.readFrom(networkConfig);
+ String type = configJson.get(TYPE).asString().toUpperCase(Locale.ROOT);
+ Integer mtu = configJson.get(MTU).asInt();
+ String gatewayIp = configJson.getString(GATEWAY_IP, "");
+ boolean defaultRoute = configJson.getBoolean(DEFAULT_ROUTE, false);
+
+ if (!type.equalsIgnoreCase(FLAT.name())) {
+ builder.segmentId(configJson.getString(SEGMENT_ID, ""));
+ }
+
+ String cidr = configJson.getString(CIDR, "");
+
+ JsonObject poolJson = configJson.get(IP_POOL).asObject();
+ if (poolJson != null) {
+ String start = poolJson.getString(START, "");
+ String end = poolJson.getString(END, "");
+ builder.ipPool(new KubevirtIpPool(
+ IpAddress.valueOf(start), IpAddress.valueOf(end)));
+ }
+
+ if (configJson.get(HOST_ROUTES) != null) {
+ JsonArray routesJson = configJson.get(HOST_ROUTES).asArray();
+ Set<KubevirtHostRoute> hostRoutes = new HashSet<>();
+ if (routesJson != null) {
+ for (int i = 0; i < routesJson.size(); i++) {
+ JsonObject route = routesJson.get(i).asObject();
+ String destinationStr = route.getString(DESTINATION, "");
+ String nexthopStr = route.getString(NEXTHOP, "");
+
+ if (StringUtils.isNotEmpty(destinationStr) &&
+ StringUtils.isNotEmpty(nexthopStr)) {
+ hostRoutes.add(new KubevirtHostRoute(
+ IpPrefix.valueOf(destinationStr),
+ IpAddress.valueOf(nexthopStr)));
+ }
+ }
+ }
+ builder.hostRoutes(hostRoutes);
+ }
+
+ if (configJson.get(DNSES) != null) {
+ JsonArray dnsesJson = configJson.get(DNSES).asArray();
+ Set<IpAddress> dnses = new HashSet<>();
+ if (dnsesJson != null) {
+ for (int i = 0; i < dnsesJson.size(); i++) {
+ String dns = dnsesJson.get(i).asString();
+ if (StringUtils.isNotEmpty(dns)) {
+ dnses.add(IpAddress.valueOf(dns));
+ }
+ }
+ }
+ builder.dnses(dnses);
+ }
+
+ builder.networkId(name).name(name).type(KubevirtNetwork.Type.valueOf(type))
+ .mtu(mtu).gatewayIp(IpAddress.valueOf(gatewayIp))
+ .defaultRoute(defaultRoute).cidr(cidr);
+
+ return builder.build();
+ }
+ return null;
+ }
}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java
deleted file mode 100644
index 91ba18e..0000000
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2021-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kubevirtnode.cli;
-
-import io.fabric8.kubernetes.api.model.Node;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import org.apache.karaf.shell.api.action.Command;
-import org.apache.karaf.shell.api.action.lifecycle.Service;
-import org.onosproject.cli.AbstractShellCommand;
-import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
-import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
-import org.onosproject.kubevirtnode.api.KubevirtNode;
-import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
-
-import java.util.Set;
-
-import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
-import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
-import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
-import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
-
-/**
- * Synchronizes kubevirt node states.
- */
-@Service
-@Command(scope = "onos", name = "kubevirt-sync-state",
- description = "Synchronizes kubevirt node states.")
-public class KubevirtSyncStateCommand extends AbstractShellCommand {
- @Override
- protected void doExecute() throws Exception {
- KubevirtApiConfigService apiConfigService = get(KubevirtApiConfigService.class);
-
- print("Re-synchronizing Kubevirt node states..");
- KubevirtApiConfig config = apiConfigService.apiConfig();
- bootstrapKubevirtNodes(config);
- print("Done.");
-
- }
-
- private void bootstrapKubevirtNodes(KubevirtApiConfig config) {
- KubevirtNodeAdminService nodeAdminService = get(KubevirtNodeAdminService.class);
-
- Set<KubevirtNode> completeNodeSet = nodeAdminService.completeNodes();
- KubernetesClient k8sClient = k8sClient(config);
-
- if (k8sClient == null) {
- log.warn("Failed to connect to kubernetes API server");
- return;
- }
-
- for (Node node : k8sClient.nodes().list().getItems()) {
- KubevirtNode kubevirtNode = buildKubevirtNode(node);
- // we always provision VMs to worker nodes, so only need to install
- // flow rules in worker nodes
- if (kubevirtNode.type() == WORKER) {
- if (completeNodeSet.stream().map(KubevirtNode::hostname)
- .filter(name -> name.equals(kubevirtNode.hostname()))
- .findAny().isPresent()) {
- print("Initializing %s because the node was COMPLETE state.",
- kubevirtNode.hostname());
- KubevirtNode updated = kubevirtNode.updateState(INIT);
- nodeAdminService.updateNode(updated);
- } else {
- nodeAdminService.updateNode(kubevirtNode);
- }
- }
- }
- }
-}