Provides CLI command to synchronize states of all kubevirt nodes

Change-Id: Iec3df65d7e0177ebb8beb3d05508851bad67f824
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java
new file mode 100644
index 0000000..91ba18e
--- /dev/null
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/cli/KubevirtSyncStateCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnode.cli;
+
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
+
+import java.util.Set;
+
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
+
+/**
+ * Synchronizes kubevirt node states.
+ */
+@Service
+@Command(scope = "onos", name = "kubevirt-sync-state",
+        description = "Synchronizes kubevirt node states.")
+public class KubevirtSyncStateCommand extends AbstractShellCommand {
+    @Override
+    protected void doExecute() throws Exception {
+        KubevirtApiConfigService apiConfigService = get(KubevirtApiConfigService.class);
+
+        print("Re-synchronizing Kubevirt node states..");
+        KubevirtApiConfig config = apiConfigService.apiConfig();
+        bootstrapKubevirtNodes(config);
+        print("Done.");
+
+    }
+
+    private void bootstrapKubevirtNodes(KubevirtApiConfig config) {
+        KubevirtNodeAdminService nodeAdminService = get(KubevirtNodeAdminService.class);
+
+        Set<KubevirtNode> completeNodeSet = nodeAdminService.completeNodes();
+        KubernetesClient k8sClient = k8sClient(config);
+
+        if (k8sClient == null) {
+            log.warn("Failed to connect to kubernetes API server");
+            return;
+        }
+
+        for (Node node : k8sClient.nodes().list().getItems()) {
+            KubevirtNode kubevirtNode = buildKubevirtNode(node);
+            // we always provision VMs to worker nodes, so only need to install
+            // flow rules in worker nodes
+            if (kubevirtNode.type() == WORKER) {
+                if (completeNodeSet.stream().map(KubevirtNode::hostname)
+                        .filter(name -> name.equals(kubevirtNode.hostname()))
+                        .findAny().isPresent()) {
+                    print("Initializing %s because the node was COMPLETE state.",
+                            kubevirtNode.hostname());
+                    KubevirtNode updated = kubevirtNode.updateState(INIT);
+                    nodeAdminService.updateNode(updated);
+                } else {
+                    nodeAdminService.updateNode(kubevirtNode);
+                }
+            }
+        }
+    }
+}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
index 0e7de69..49e346b 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
@@ -16,27 +16,18 @@
 package org.onosproject.kubevirtnode.impl;
 
 import io.fabric8.kubernetes.api.model.Node;
-import io.fabric8.kubernetes.api.model.NodeAddress;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onlab.packet.IpAddress;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
-import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
 import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
 import org.onosproject.kubevirtnode.api.KubevirtApiConfigAdminService;
 import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
 import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
 import org.onosproject.kubevirtnode.api.KubevirtNode;
 import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
-import org.onosproject.kubevirtnode.api.KubevirtNodeState;
-import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -44,19 +35,15 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import java.util.HashSet;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
 
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.kubevirtnode.api.KubevirtApiConfig.State.CONNECTED;
 import static org.onosproject.kubevirtnode.api.KubevirtApiConfigService.APP_ID;
-import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
 import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
 import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -68,12 +55,6 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final String INTERNAL_IP = "InternalIP";
-    private static final String K8S_ROLE = "node-role.kubernetes.io";
-    private static final String PHYSNET_CONFIG_KEY = "physnet-config";
-    private static final String NETWORK_KEY = "network";
-    private static final String INTERFACE_KEY = "interface";
-
     private static final long SLEEP_MS = 10000; // we wait 10s
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -147,66 +128,6 @@
         }
     }
 
-    private KubevirtNode buildKubevirtNode(Node node) {
-        String hostname = node.getMetadata().getName();
-        IpAddress managementIp = null;
-        IpAddress dataIp = null;
-
-        for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
-            if (nodeAddress.getType().equals(INTERNAL_IP)) {
-                managementIp = IpAddress.valueOf(nodeAddress.getAddress());
-                dataIp = IpAddress.valueOf(nodeAddress.getAddress());
-            }
-        }
-
-        Set<String> rolesFull = node.getMetadata().getLabels().keySet().stream()
-                .filter(l -> l.contains(K8S_ROLE))
-                .collect(Collectors.toSet());
-
-        KubevirtNode.Type nodeType = WORKER;
-
-        for (String roleStr : rolesFull) {
-            String role = roleStr.split("/")[1];
-            if (MASTER.name().equalsIgnoreCase(role)) {
-                nodeType = MASTER;
-                break;
-            }
-        }
-
-        // start to parse kubernetes annotation
-        Map<String, String> annots = node.getMetadata().getAnnotations();
-        String physnetConfig = annots.get(PHYSNET_CONFIG_KEY);
-        Set<KubevirtPhyInterface> phys = new HashSet<>();
-        try {
-            if (physnetConfig != null) {
-                JSONArray configJson = new JSONArray(physnetConfig);
-
-                for (int i = 0; i < configJson.length(); i++) {
-                    JSONObject object = configJson.getJSONObject(i);
-                    String network = object.getString(NETWORK_KEY);
-                    String intf = object.getString(INTERFACE_KEY);
-
-                    if (network != null && intf != null) {
-                        phys.add(DefaultKubevirtPhyInterface.builder()
-                                .network(network).intf(intf).build());
-                    }
-
-                }
-            }
-        } catch (JSONException e) {
-            log.error("Failed to parse network status object", e);
-        }
-
-        return DefaultKubevirtNode.builder()
-                .hostname(hostname)
-                .managementIp(managementIp)
-                .dataIp(dataIp)
-                .type(nodeType)
-                .state(KubevirtNodeState.ON_BOARDED)
-                .phyIntfs(phys)
-                .build();
-    }
-
     private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
 
         private boolean isRelevantHelper() {
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
index 1c51336..da53f08 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
@@ -17,13 +17,22 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.NodeAddress;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang.StringUtils;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 import org.onlab.packet.IpAddress;
+import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
+import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
 import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
 import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeState;
+import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
 import org.onosproject.net.Device;
 import org.onosproject.net.behaviour.BridgeConfig;
 import org.onosproject.net.behaviour.BridgeName;
@@ -36,9 +45,15 @@
 
 import java.io.IOException;
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.onlab.util.Tools.get;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
 
 /**
  * An utility that used in KubeVirt node app.
@@ -53,6 +68,11 @@
     private static final int HEX_LENGTH = 16;
     private static final String OF_PREFIX = "of:";
     private static final String ZERO = "0";
+    private static final String INTERNAL_IP = "InternalIP";
+    private static final String K8S_ROLE = "node-role.kubernetes.io";
+    private static final String PHYSNET_CONFIG_KEY = "physnet-config";
+    private static final String NETWORK_KEY = "network";
+    private static final String INTERFACE_KEY = "interface";
 
     private static final int PORT_NAME_MAX_LENGTH = 15;
 
@@ -278,4 +298,70 @@
         }
         return value;
     }
+
+    /**
+     * Returns the kubevirt node from the node.
+     *
+     * @param node a raw node object returned from a k8s client
+     * @return kubevirt node
+     */
+    public static KubevirtNode buildKubevirtNode(Node node) {
+        String hostname = node.getMetadata().getName();
+        IpAddress managementIp = null;
+        IpAddress dataIp = null;
+
+        for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
+            if (nodeAddress.getType().equals(INTERNAL_IP)) {
+                managementIp = IpAddress.valueOf(nodeAddress.getAddress());
+                dataIp = IpAddress.valueOf(nodeAddress.getAddress());
+            }
+        }
+
+        Set<String> rolesFull = node.getMetadata().getLabels().keySet().stream()
+                .filter(l -> l.contains(K8S_ROLE))
+                .collect(Collectors.toSet());
+
+        KubevirtNode.Type nodeType = WORKER;
+
+        for (String roleStr : rolesFull) {
+            String role = roleStr.split("/")[1];
+            if (MASTER.name().equalsIgnoreCase(role)) {
+                nodeType = MASTER;
+                break;
+            }
+        }
+
+        // start to parse kubernetes annotation
+        Map<String, String> annots = node.getMetadata().getAnnotations();
+        String physnetConfig = annots.get(PHYSNET_CONFIG_KEY);
+        Set<KubevirtPhyInterface> phys = new HashSet<>();
+        try {
+            if (physnetConfig != null) {
+                JSONArray configJson = new JSONArray(physnetConfig);
+
+                for (int i = 0; i < configJson.length(); i++) {
+                    JSONObject object = configJson.getJSONObject(i);
+                    String network = object.getString(NETWORK_KEY);
+                    String intf = object.getString(INTERFACE_KEY);
+
+                    if (network != null && intf != null) {
+                        phys.add(DefaultKubevirtPhyInterface.builder()
+                                .network(network).intf(intf).build());
+                    }
+
+                }
+            }
+        } catch (JSONException e) {
+            log.error("Failed to parse network status object", e);
+        }
+
+        return DefaultKubevirtNode.builder()
+                .hostname(hostname)
+                .managementIp(managementIp)
+                .dataIp(dataIp)
+                .type(nodeType)
+                .state(KubevirtNodeState.ON_BOARDED)
+                .phyIntfs(phys)
+                .build();
+    }
 }