Support to add/update/remove kubevirt node, and add node watcher

Change-Id: I3cae33625b2dd6deb88f7bca2d0252d65667f2c9
diff --git a/apps/kubevirt-node/api/src/main/java/org/onosproject/kubevirtnode/api/Constants.java b/apps/kubevirt-node/api/src/main/java/org/onosproject/kubevirtnode/api/Constants.java
index 0ec60a8..39ca033 100644
--- a/apps/kubevirt-node/api/src/main/java/org/onosproject/kubevirtnode/api/Constants.java
+++ b/apps/kubevirt-node/api/src/main/java/org/onosproject/kubevirtnode/api/Constants.java
@@ -46,4 +46,6 @@
     public static final String FLOW_KEY = "flow";
 
     public static final String DEFAULT_CLUSTER_NAME = "default";
+
+    public static final String SONA_PROJECT_DOMAIN = "sonaproject.github.io";
 }
diff --git a/apps/kubevirt-node/api/src/main/java/org/onosproject/kubevirtnode/api/KubevirtNodeService.java b/apps/kubevirt-node/api/src/main/java/org/onosproject/kubevirtnode/api/KubevirtNodeService.java
index 480969d..41e90a3 100644
--- a/apps/kubevirt-node/api/src/main/java/org/onosproject/kubevirtnode/api/KubevirtNodeService.java
+++ b/apps/kubevirt-node/api/src/main/java/org/onosproject/kubevirtnode/api/KubevirtNodeService.java
@@ -84,6 +84,14 @@
     KubevirtNode node(IpAddress mgmtIp);
 
     /**
+     * Checks whether has the node with the given hostname.
+     *
+     * @param hostname hostname
+     * @return true if it has the node, false otherwise
+     */
+    boolean hasNode(String hostname);
+
+    /**
      * Returns the node with the specified tunnel device ID.
      * The device ID tunnel bridge.
      *
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
index 49e346b..7208cd6 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtApiConfigHandler.java
@@ -123,7 +123,9 @@
             // we always provision VMs to worker nodes, so only need to install
             // flow rules in worker nodes
             if (kubevirtNode.type() == WORKER) {
-                nodeAdminService.createNode(kubevirtNode);
+                if (!nodeAdminService.hasNode(kubevirtNode.hostname())) {
+                    nodeAdminService.createNode(kubevirtNode);
+                }
             }
         }
     }
@@ -153,8 +155,6 @@
             if (checkApiServerConfig(config)) {
                 KubevirtApiConfig newConfig = config.updateState(CONNECTED);
                 configAdminService.updateApiConfig(newConfig);
-
-                bootstrapKubevirtNodes(config);
             }
         }
     }
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtNodeHandler.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtNodeHandler.java
index 72d1ec0..4756cde 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtNodeHandler.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/DefaultKubevirtNodeHandler.java
@@ -79,11 +79,9 @@
 import static org.onosproject.kubevirtnode.api.Constants.GRE;
 import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
 import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
-import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_TUNNEL;
 import static org.onosproject.kubevirtnode.api.Constants.PHYSICAL_TO_INTEGRATION_SUFFIX;
 import static org.onosproject.kubevirtnode.api.Constants.TENANT_BRIDGE_PREFIX;
 import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
-import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
 import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
 import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
 import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
@@ -120,7 +118,8 @@
     private static final int DPID_BEGIN = 3;
     private static final int NETWORK_BEGIN = 3;
     private static final long SLEEP_SHORT_MS = 1000; // we wait 1s
-    private static final long SLEEP_LONG_MS = 2000; // we wait 2s
+    private static final long SLEEP_MID_MS = 2000; // we wait 2s
+    private static final long SLEEP_LONG_MS = 5000; // we wait 5s
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
@@ -237,12 +236,6 @@
             if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
                 createGeneveTunnelInterface(node);
             }
-
-            // provision new physical interfaces on the given node
-            // this includes creating physical bridge, attaching physical port
-            // to physical bridge, adding patch ports to both physical bridge and br-int
-            provisionPhysicalInterfaces(node);
-
         } catch (Exception e) {
             log.error("Exception occurred because of {}", e);
         }
@@ -350,6 +343,8 @@
      * Creates a tunnel interface in a given kubernetes node.
      *
      * @param node       kubevirt node
+     * @param type       kubevirt type
+     * @param intfName   tunnel interface name
      */
     private void createTunnelInterface(KubevirtNode node,
                                        String type, String intfName) {
@@ -373,6 +368,7 @@
      * Builds tunnel description according to the network type.
      *
      * @param type      network type
+     * @param intfName  tunnel interface
      * @return tunnel description
      */
     private TunnelDescription buildTunnelDesc(String type, String intfName) {
@@ -420,36 +416,6 @@
                                 port.isEnabled());
     }
 
-    private void createPatchInterfaces(KubevirtNode node) {
-        Device device = deviceService.getDevice(node.ovsdb());
-        if (device == null || !device.is(InterfaceConfig.class)) {
-            log.error("Failed to create patch interface on {}", node.ovsdb());
-            return;
-        }
-
-        InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
-
-        // integration bridge -> tunnel bridge
-        PatchDescription brIntTunPatchDesc =
-                DefaultPatchDescription.builder()
-                        .deviceId(INTEGRATION_BRIDGE)
-                        .ifaceName(INTEGRATION_TO_TUNNEL)
-                        .peer(TUNNEL_TO_INTEGRATION)
-                        .build();
-
-        ifaceConfig.addPatchMode(INTEGRATION_TO_TUNNEL, brIntTunPatchDesc);
-
-        // tunnel bridge -> integration bridge
-        PatchDescription brTunIntPatchDesc =
-                DefaultPatchDescription.builder()
-                        .deviceId(TUNNEL_BRIDGE)
-                        .ifaceName(TUNNEL_TO_INTEGRATION)
-                        .peer(INTEGRATION_TO_TUNNEL)
-                        .build();
-
-        ifaceConfig.addPatchMode(TUNNEL_TO_INTEGRATION, brTunIntPatchDesc);
-    }
-
     /**
      * Bootstraps a new kubevirt node.
      *
@@ -527,6 +493,11 @@
 
         cleanPhysicalInterfaces(node);
 
+        // provision new physical interfaces on the given node
+        // this includes creating physical bridge, attaching physical port
+        // to physical bridge, adding patch ports to both physical bridge and br-int
+        provisionPhysicalInterfaces(node);
+
         return node.intgBridge() != null && node.tunBridge() != null &&
                 deviceService.isAvailable(node.intgBridge()) &&
                 deviceService.isAvailable(node.tunBridge());
@@ -537,26 +508,38 @@
         try {
             // we need to wait a while, in case tunneling ports
             // creation requires some time
-            sleep(SLEEP_LONG_MS);
+            sleep(SLEEP_MID_MS);
         } catch (InterruptedException e) {
             log.error("Exception caused during init state checking...");
         }
 
         if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
+            log.warn("VXLAN interface is not enabled!");
             return false;
         }
         if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
+            log.warn("GRE interface is not enabled!");
             return false;
         }
         if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
+            log.warn("GENEVE interface is not enabled!");
             return false;
         }
 
         for (KubevirtPhyInterface phyIntf : node.phyIntfs()) {
             if (phyIntf == null) {
+                log.warn("Physnet interface is invalid");
                 return false;
             }
 
+            try {
+                // we need to wait a while, in case tunneling ports
+                // creation requires some time
+                sleep(SLEEP_LONG_MS);
+            } catch (InterruptedException e) {
+                log.error("Exception caused during init state checking...");
+            }
+
             String bridgeName = BRIDGE_PREFIX + phyIntf.network();
             String patchPortName = structurePortName(
                     INTEGRATION_TO_PHYSICAL_PREFIX + phyIntf.network());
@@ -564,6 +547,9 @@
             if (!(hasPhyBridge(node, bridgeName) &&
                     hasPhyPatchPort(node, patchPortName) &&
                     hasPhyIntf(node, phyIntf.intf()))) {
+                log.warn("PhyBridge {}", hasPhyBridge(node, bridgeName));
+                log.warn("hasPhyPatchPort {}", hasPhyPatchPort(node, patchPortName));
+                log.warn("hasPhyIntf {}", hasPhyIntf(node, phyIntf.intf()));
                 return false;
             }
         }
@@ -596,11 +582,16 @@
                 createPhysicalBridge(node, pi);
                 createPhysicalPatchPorts(node, pi);
                 attachPhysicalPort(node, pi);
+
+                log.info("Creating physnet bridge {}", bridgeName);
+                log.info("Creating patch ports for physnet {}", bridgeName);
             } else {
                 // in case physical bridge exists, but patch port is missing on br-int,
                 // we will add patch port to connect br-int with physical bridge
                 if (!hasPhyPatchPort(node, patchPortName)) {
                     createPhysicalPatchPorts(node, pi);
+
+                    log.info("Creating patch ports for physnet {}", bridgeName);
                 }
             }
         });
@@ -620,16 +611,18 @@
         // we remove existing physical bridges and patch ports, if the physical
         // bridges are not defined in kubevirt node
         for (String brName : bridgeNames) {
+            // integration bridge and tunnel bridge should NOT be treated as
+            // physical bridges
+            if (brName.equals(INTEGRATION_BRIDGE) ||
+                    brName.equals(TUNNEL_BRIDGE) ||
+                    brName.startsWith(TENANT_BRIDGE_PREFIX)) {
+                continue;
+            }
+
             if (!phyNetworkNames.contains(brName)) {
-                // integration bridge and tunnel bridge should NOT be treated as
-                // physical bridges
-                if (brName.equals(INTEGRATION_BRIDGE) ||
-                        brName.equals(TUNNEL_BRIDGE) ||
-                        brName.startsWith(TENANT_BRIDGE_PREFIX)) {
-                    continue;
-                }
                 removePhysicalPatchPorts(node, brName.substring(NETWORK_BEGIN));
                 removePhysicalBridge(node, brName.substring(NETWORK_BEGIN));
+                log.info("Removing physical bridge {}...", brName);
             }
         }
     }
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeManager.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeManager.java
index 99c8244..4e7e39e 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeManager.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeManager.java
@@ -57,9 +57,10 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
+import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
 import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
 import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
-import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.genDpid;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -91,6 +92,8 @@
 
     private static final String NOT_DUPLICATED_MSG = "% cannot be duplicated";
 
+    private static final String OF_PREFIX = "of:";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected KubevirtNodeStore nodeStore;
 
@@ -167,7 +170,7 @@
         KubevirtNode tunNode;
 
         if (node.intgBridge() == null) {
-            String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
+            String deviceIdStr = genDpidFromName(INTEGRATION_BRIDGE + "-" + node.hostname());
             checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
             intNode = node.updateIntgBridge(DeviceId.deviceId(deviceIdStr));
             checkArgument(!hasIntgBridge(intNode.intgBridge(), intNode.hostname()),
@@ -179,7 +182,7 @@
         }
 
         if (node.tunBridge() == null) {
-            String deviceIdStr = genDpid(deviceIdCounter.incrementAndGet());
+            String deviceIdStr = genDpidFromName(TUNNEL_BRIDGE + "-" + node.hostname());
             checkNotNull(deviceIdStr, ERR_NULL_DEVICE_ID);
             tunNode = intNode.updateTunBridge(DeviceId.deviceId(deviceIdStr));
             checkArgument(!hasTunBridge(tunNode.tunBridge(), tunNode.hostname()),
@@ -291,6 +294,11 @@
     }
 
     @Override
+    public boolean hasNode(String hostname) {
+        return nodeStore.nodes().stream().anyMatch(n -> n.hostname().equals(hostname));
+    }
+
+    @Override
     public KubevirtNode nodeByTunBridge(DeviceId deviceId) {
         return nodeStore.nodes().stream()
                 .filter(node -> Objects.equals(node.tunBridge(), deviceId))
@@ -315,6 +323,15 @@
         return existNode.isPresent();
     }
 
+    private String genDpidFromName(String name) {
+        if (name != null) {
+            String hexString = Integer.toHexString(name.hashCode());
+            return OF_PREFIX + Strings.padStart(hexString, 16, '0');
+        }
+
+        return null;
+    }
+
     private class InternalNodeStoreDelegate implements KubevirtNodeStoreDelegate {
 
         @Override
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeWatcher.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeWatcher.java
new file mode 100644
index 0000000..89086bf
--- /dev/null
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/impl/KubevirtNodeWatcher.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnode.impl;
+
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
+import org.onosproject.mastership.MastershipService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
+import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.buildKubevirtNode;
+import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Kubernetes node watcher used for feeding node information.
+ */
+@Component(immediate = true)
+public class KubevirtNodeWatcher {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtNodeAdminService kubevirtNodeAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtApiConfigService kubevirtApiConfigService;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+    private final Watcher<Node> internalKubevirtNodeWatcher = new InternalKubevirtNodeWatcher();
+    private final InternalKubevirtApiConfigListener
+            internalKubevirtApiConfigListener = new InternalKubevirtApiConfigListener();
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        kubevirtApiConfigService.addListener(internalKubevirtApiConfigListener);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        kubevirtApiConfigService.removeListener(internalKubevirtApiConfigListener);
+        leadershipService.withdraw(appId.name());
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    private void instantiateNodeWatcher() {
+        KubevirtApiConfig config = kubevirtApiConfigService.apiConfig();
+        if (config == null) {
+            return;
+        }
+        KubernetesClient client = k8sClient(config);
+
+        if (client != null) {
+            client.nodes().watch(internalKubevirtNodeWatcher);
+        }
+    }
+
+    private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(KubevirtApiConfigEvent event) {
+
+            switch (event.type()) {
+                case KUBEVIRT_API_CONFIG_UPDATED:
+                    eventExecutor.execute(this::processConfigUpdating);
+                    break;
+                case KUBEVIRT_API_CONFIG_CREATED:
+                case KUBEVIRT_API_CONFIG_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+
+        private void processConfigUpdating() {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            instantiateNodeWatcher();
+        }
+    }
+
+    private class InternalKubevirtNodeWatcher implements Watcher<Node> {
+
+        @Override
+        public void eventReceived(Action action, Node node) {
+            switch (action) {
+                case ADDED:
+                    eventExecutor.execute(() -> processAddition(node));
+                    break;
+                case MODIFIED:
+                    eventExecutor.execute(() -> processModification(node));
+                    break;
+                case DELETED:
+                    eventExecutor.execute(() -> processDeletion(node));
+                    break;
+                case ERROR:
+                    log.warn("Failures processing node manipulation.");
+                    break;
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+
+        @Override
+        public void onClose(WatcherException e) {
+            // due to the bugs in fabric8, node watcher might be closed,
+            // we will re-instantiate the node watcher in this case
+            // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
+            log.warn("Node watcher OnClose, re-instantiate the node watcher...");
+            instantiateNodeWatcher();
+        }
+
+        private void processAddition(Node node) {
+            if (!isMaster()) {
+                return;
+            }
+
+            log.trace("Process node {} creating event from API server.",
+                    node.getMetadata().getName());
+
+            KubevirtNode kubevirtNode = buildKubevirtNode(node);
+            if (kubevirtNode.type() == WORKER) {
+                if (!kubevirtNodeAdminService.hasNode(kubevirtNode.hostname())) {
+                    kubevirtNodeAdminService.createNode(kubevirtNode);
+                }
+            }
+        }
+
+        private void processModification(Node node) {
+            if (!isMaster()) {
+                return;
+            }
+
+            log.trace("Process node {} updating event from API server.",
+                    node.getMetadata().getName());
+
+            KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());
+
+            if (existing != null) {
+                KubevirtNode kubevirtNode = buildKubevirtNode(node);
+
+                // we update the kubevirt node and re-run bootstrapping,
+                // only if the updated node has different phyInts and data IP
+                // this means we assume that the node's hostname, type and mgmt IP
+                // are immutable
+                if (!kubevirtNode.phyIntfs().equals(existing.phyIntfs()) ||
+                        !kubevirtNode.dataIp().equals(existing.dataIp())) {
+                    kubevirtNodeAdminService.updateNode(kubevirtNode.updateState(INIT));
+                }
+            }
+        }
+
+        private void processDeletion(Node node) {
+            if (!isMaster()) {
+                return;
+            }
+
+            log.trace("Process node {} removal event from API server.",
+                    node.getMetadata().getName());
+
+            KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());
+
+            if (existing != null) {
+                kubevirtNodeAdminService.removeNode(node.getMetadata().getName());
+            }
+        }
+
+        private boolean isMaster() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+    }
+}
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
index da53f08..06ccc31 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/util/KubevirtNodeUtil.java
@@ -52,6 +52,7 @@
 import java.util.stream.Collectors;
 
 import static org.onlab.util.Tools.get;
+import static org.onosproject.kubevirtnode.api.Constants.SONA_PROJECT_DOMAIN;
 import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
 import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
 
@@ -70,7 +71,8 @@
     private static final String ZERO = "0";
     private static final String INTERNAL_IP = "InternalIP";
     private static final String K8S_ROLE = "node-role.kubernetes.io";
-    private static final String PHYSNET_CONFIG_KEY = "physnet-config";
+    private static final String PHYSNET_CONFIG_KEY = SONA_PROJECT_DOMAIN + "/physnet-config";
+    private static final String DATA_IP_KEY = SONA_PROJECT_DOMAIN + "/data-ip";
     private static final String NETWORK_KEY = "network";
     private static final String INTERFACE_KEY = "interface";
 
@@ -334,6 +336,7 @@
         // start to parse kubernetes annotation
         Map<String, String> annots = node.getMetadata().getAnnotations();
         String physnetConfig = annots.get(PHYSNET_CONFIG_KEY);
+        String dataIpStr = annots.get(DATA_IP_KEY);
         Set<KubevirtPhyInterface> phys = new HashSet<>();
         try {
             if (physnetConfig != null) {
@@ -351,6 +354,10 @@
 
                 }
             }
+
+            if (dataIpStr != null) {
+                dataIp = IpAddress.valueOf(dataIpStr);
+            }
         } catch (JSONException e) {
             log.error("Failed to parse network status object", e);
         }