Learn the devId where VM attached by listening VMI event

Refactor: extract wait logic into a common utility class

Change-Id: I96d60832770ee919a7632cd2665c82eb6f96bfba
(cherry picked from commit 0b90d45879020aab3a5fa2bac6a5b19b6bbe75a6)
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
index 466ae9a..6617128 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
@@ -16,6 +16,7 @@
 package org.onosproject.kubevirtnetworking.impl;
 
 import com.google.common.collect.Lists;
+import org.onlab.osgi.DefaultServiceDirectory;
 import org.onlab.packet.ARP;
 import org.onlab.packet.EthType;
 import org.onlab.packet.Ethernet;
@@ -52,6 +53,7 @@
 import org.onosproject.kubevirtnode.api.KubevirtNodeService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.behaviour.BridgeConfig;
 import org.onosproject.net.behaviour.BridgeDescription;
@@ -62,6 +64,7 @@
 import org.onosproject.net.behaviour.InterfaceConfig;
 import org.onosproject.net.behaviour.PatchDescription;
 import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -79,7 +82,6 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
-import static java.lang.Thread.sleep;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.packet.ICMP.CODE_ECHO_REQEUST;
 import static org.onlab.packet.ICMP.TYPE_ECHO_REPLY;
@@ -118,6 +120,7 @@
 import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.segmentIdHex;
 import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
 import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
 import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.NXM_NX_IP_TTL;
 import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.NXM_OF_ICMP_TYPE;
 import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
@@ -130,6 +133,7 @@
 import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
 import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
 import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -141,8 +145,6 @@
     private static final String DEFAULT_OF_PROTO = "tcp";
     private static final int DEFAULT_OFPORT = 6653;
     private static final int DPID_BEGIN = 3;
-    private static final long SLEEP_MS = 3000;
-    private static final long SLEEP_LARGE_MS = 5000;
     private static final int DEFAULT_TTL = 0xff;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -256,15 +258,20 @@
         String dpid = network.tenantDeviceId(
                 node.hostname()).toString().substring(DPID_BEGIN);
 
-        BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
-                .name(network.tenantBridgeName())
-                .failMode(BridgeDescription.FailMode.SECURE)
-                .datapathId(dpid)
-                .disableInBand()
-                .controllers(controllers);
+        // if the bridge is already available, we skip creating a new bridge
+        if (!deviceService.isAvailable(DeviceId.deviceId(dpid))) {
+            BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
+                    .name(network.tenantBridgeName())
+                    .failMode(BridgeDescription.FailMode.SECURE)
+                    .datapathId(dpid)
+                    .disableInBand()
+                    .controllers(controllers);
 
-        BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
-        bridgeConfig.addBridge(builder.build());
+            BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+            bridgeConfig.addBridge(builder.build());
+
+            waitFor(3);
+        }
     }
 
     private void removeBridge(KubevirtNode node, KubevirtNetwork network) {
@@ -292,24 +299,32 @@
         String tunToTenantIntf =
                 TUNNEL_TO_TENANT_PREFIX + segmentIdHex(network.segmentId());
 
-        // tenant bridge -> tunnel bridge
-        PatchDescription brTenantTunPatchDesc =
-                DefaultPatchDescription.builder()
-                        .deviceId(network.tenantBridgeName())
-                        .ifaceName(tenantToTunIntf)
-                        .peer(tunToTenantIntf)
-                        .build();
+        if (!hasPort(DeviceId.deviceId(network.tenantBridgeName()), tenantToTunIntf)) {
+            // tenant bridge -> tunnel bridge
+            PatchDescription brTenantTunPatchDesc =
+                    DefaultPatchDescription.builder()
+                            .deviceId(network.tenantBridgeName())
+                            .ifaceName(tenantToTunIntf)
+                            .peer(tunToTenantIntf)
+                            .build();
 
-        ifaceConfig.addPatchMode(tenantToTunIntf, brTenantTunPatchDesc);
+            ifaceConfig.addPatchMode(tenantToTunIntf, brTenantTunPatchDesc);
 
-        // tunnel bridge -> tenant bridge
-        PatchDescription brTunTenantPatchDesc =
-                DefaultPatchDescription.builder()
-                        .deviceId(TUNNEL_BRIDGE)
-                        .ifaceName(tunToTenantIntf)
-                        .peer(tenantToTunIntf)
-                        .build();
-        ifaceConfig.addPatchMode(tunToTenantIntf, brTunTenantPatchDesc);
+            waitFor(1);
+        }
+
+        if (!hasPort(DeviceId.deviceId(TUNNEL_BRIDGE), tunToTenantIntf)) {
+            // tunnel bridge -> tenant bridge
+            PatchDescription brTunTenantPatchDesc =
+                    DefaultPatchDescription.builder()
+                            .deviceId(TUNNEL_BRIDGE)
+                            .ifaceName(tunToTenantIntf)
+                            .peer(tenantToTunIntf)
+                            .build();
+            ifaceConfig.addPatchMode(tunToTenantIntf, brTunTenantPatchDesc);
+
+            waitFor(1);
+        }
     }
 
     private void removeAllFlows(KubevirtNode node, KubevirtNetwork network) {
@@ -387,12 +402,7 @@
 
         while (!deviceService.isAvailable(deviceId)) {
             log.warn("Device {} is not ready for installing rules", deviceId);
-
-            try {
-                sleep(SLEEP_MS);
-            } catch (InterruptedException e) {
-                log.error("Failed to check device availability", e);
-            }
+            waitFor(3);
         }
 
         flowService.connectTables(deviceId, TENANT_INBOUND_TABLE, TENANT_DHCP_TABLE);
@@ -927,6 +937,15 @@
                 install);
     }
 
+    private boolean hasPort(DeviceId deviceId, String portName) {
+        DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
+        Port port = deviceService.getPorts(deviceId).stream()
+                .filter(p -> p.isEnabled() &&
+                        Objects.equals(p.annotations().value(PORT_NAME), portName))
+                .findAny().orElse(null);
+        return port != null;
+    }
+
     private class InternalRouterEventListener implements KubevirtRouterListener {
         private boolean isRelevantHelper() {
             return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
@@ -1240,11 +1259,7 @@
                 removeAllFlows(n, network);
                 removePatchInterface(n, network);
 
-                try {
-                    sleep(SLEEP_LARGE_MS);
-                } catch (InterruptedException e) {
-                    log.error("Sleep exception", e);
-                }
+                waitFor(5);
 
                 removeBridge(n, network);
             });
@@ -1440,7 +1455,6 @@
                             kubevirtPort, gwNode, false);
                 });
             }
-
         }
     }
 }
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
deleted file mode 100644
index 669f22ff..0000000
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright 2021-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kubevirtnetworking.impl;
-
-import io.fabric8.kubernetes.api.model.Pod;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
-import org.onosproject.kubevirtnetworking.api.KubevirtPodAdminService;
-import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
-import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
-import org.onosproject.kubevirtnetworking.api.KubevirtPort;
-import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
-import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
-import org.onosproject.kubevirtnode.api.KubevirtNode;
-import org.onosproject.kubevirtnode.api.KubevirtNodeService;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.driver.DriverService;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
-import static java.lang.Thread.sleep;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
-import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Associates the kubevirt container port and pod.
- */
-@Component(immediate = true)
-public class KubevirtPodPortMapper {
-
-    private final Logger log = getLogger(getClass());
-
-    private static final String NETWORK_STATUS_KEY = "k8s.v1.cni.cncf.io/network-status";
-    private static final long SLEEP_MS = 2000; // we wait 2s
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected CoreService coreService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected LeadershipService leadershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected DeviceService deviceService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected DriverService driverService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected KubevirtPortAdminService kubevirtPortAdminService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected KubevirtNetworkAdminService kubevirtNetworkAdminService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected KubevirtPodAdminService kubevirtPodAdminService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected KubevirtApiConfigService kubevirtApiConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected KubevirtNodeService kubevirtNodeService;
-
-    private final ExecutorService eventExecutor = newSingleThreadExecutor(
-            groupedThreads(this.getClass().getSimpleName(), "event-handler"));
-
-    private final InternalKubevirtPodListener kubevirtPodListener =
-            new InternalKubevirtPodListener();
-
-    private ApplicationId appId;
-    private NodeId localNodeId;
-
-    @Activate
-    protected void activate() {
-        appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
-        localNodeId = clusterService.getLocalNode().id();
-        leadershipService.runForLeadership(appId.name());
-        kubevirtPodAdminService.addListener(kubevirtPodListener);
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        kubevirtPodAdminService.removeListener(kubevirtPodListener);
-        leadershipService.withdraw(appId.name());
-        eventExecutor.shutdown();
-
-        log.info("Stopped");
-    }
-
-    private class InternalKubevirtPodListener implements KubevirtPodListener {
-
-        private boolean isRelevantHelper() {
-            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
-        }
-
-        @Override
-        public void event(KubevirtPodEvent event) {
-            switch (event.type()) {
-                case KUBEVIRT_POD_UPDATED:
-                case KUBEVIRT_POD_CREATED:
-                    eventExecutor.execute(() -> processPodAddition(event.subject()));
-                case KUBEVIRT_POD_REMOVED:
-                default:
-                    // do nothing
-                    break;
-            }
-        }
-
-        private void processPodAddition(Pod pod) {
-            if (!isRelevantHelper()) {
-                return;
-            }
-
-            Map<String, String> annots = pod.getMetadata().getAnnotations();
-            if (annots == null) {
-                return;
-            }
-
-            if (!annots.containsKey(NETWORK_STATUS_KEY)) {
-                return;
-            }
-
-            KubevirtNode node = kubevirtNodeService.node(pod.getSpec().getNodeName());
-
-            if (node == null) {
-                log.warn("POD scheduled node name {} is not ready, " +
-                         "we wait for a while...", pod.getSpec().getNodeName());
-                try {
-                    // we wait until all k8s nodes are available
-                    sleep(SLEEP_MS);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-
-            Set<KubevirtPort> ports = getPorts(kubevirtNodeService,
-                                      kubevirtNetworkAdminService.networks(), pod);
-
-            if (ports.size() == 0) {
-                return;
-            }
-
-            ports.forEach(port -> {
-                KubevirtPort existing = kubevirtPortAdminService.port(port.macAddress());
-
-                if (existing != null) {
-                    if (port.deviceId() != null && existing.deviceId() == null) {
-                        KubevirtPort updated = existing.updateDeviceId(port.deviceId());
-                        // internal we update device ID of kubevirt port
-                        kubevirtPortAdminService.updatePort(updated);
-                    }
-                }
-            });
-        }
-    }
-}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
index 09cc288..d97ac47 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
@@ -78,7 +78,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
-import static java.lang.Thread.sleep;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.kubevirtnetworking.api.Constants.ACL_CT_TABLE;
@@ -103,6 +102,7 @@
 import static org.onosproject.kubevirtnetworking.impl.OsgiPropertyConstants.USE_SECURITY_GROUP;
 import static org.onosproject.kubevirtnetworking.impl.OsgiPropertyConstants.USE_SECURITY_GROUP_DEFAULT;
 import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPropertyValueAsBoolean;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
 import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildPortRangeMatches;
 import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.computeCtMaskFlag;
 import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.computeCtStateFlag;
@@ -154,8 +154,6 @@
     private static final int ACTION_NONE = 0;
     private static final int ACTION_DROP = -1;
 
-    private static final long SLEEP_MS = 5000;
-
     /** Apply EdgeStack security group rule for VM traffic. */
     private boolean useSecurityGroup = USE_SECURITY_GROUP_DEFAULT;
 
@@ -388,11 +386,7 @@
             if (deviceService.getDevice(deviceId) != null) {
                 break;
             } else {
-                try {
-                    sleep(SLEEP_MS);
-                } catch (InterruptedException e) {
-                    log.error("Failed to install security group default rules.");
-                }
+                waitFor(5);
             }
         }
 
@@ -1020,11 +1014,7 @@
                 // now we wait 5s for all tenant bridges are created,
                 // FIXME: we need to fina a better way to wait all tenant bridges
                 // are created before installing default security group rules
-                try {
-                    sleep(SLEEP_MS);
-                } catch (InterruptedException e) {
-                    log.error("Failed to install security group default rules.");
-                }
+                waitFor(5);
 
                 for (KubevirtNode node : nodes) {
                     initializeTenantPipeline(network, node, true);
@@ -1108,11 +1098,7 @@
             }
 
             // FIXME: we wait all port get its deviceId updated
-            try {
-                sleep(SLEEP_MS);
-            } catch (InterruptedException e) {
-                log.error("Failed to install security group default rules.");
-            }
+            waitFor(5);
 
             resetSecurityGroupRulesByNode(node);
         }
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
index 30295d1..3967d6a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
@@ -55,7 +55,6 @@
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 
-import static java.lang.Thread.sleep;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
@@ -65,6 +64,7 @@
 import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
 import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
 import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
 import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
 import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
 import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
@@ -76,7 +76,6 @@
 @Component(immediate = true)
 public class KubevirtSwitchingTenantHandler {
     private final Logger log = getLogger(getClass());
-    private static final long SLEEP_MS = 3000; // we wait 3s
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
@@ -156,11 +155,7 @@
                 if (tunnelToTenantPort(localNode, network) != null) {
                     break;
                 } else {
-                    try {
-                        sleep(SLEEP_MS);
-                    } catch (InterruptedException e) {
-                        log.error("Failed to install security group default rules.");
-                    }
+                    waitFor(3);
                 }
             }
 
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
index fc3baab..8f38c4a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
@@ -18,7 +18,6 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
-import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.Watcher;
 import io.fabric8.kubernetes.client.WatcherException;
@@ -41,7 +40,6 @@
 import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
 import org.onosproject.kubevirtnode.api.KubevirtNodeService;
 import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.DeviceId;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -56,12 +54,10 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
 
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
-import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
 import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -242,12 +238,6 @@
 
                 port = port.updateIpAddress(ip);
 
-                DeviceId deviceId = getDeviceId(podService.pods(), port);
-
-                if (deviceId != null) {
-                    port = port.updateDeviceId(deviceId);
-                }
-
                 if (portAdminService.port(port.macAddress()) == null) {
                     portAdminService.createPort(port);
                 }
@@ -298,22 +288,6 @@
             return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
-        // FIXME: to obtains the device ID, we have to search through
-        // existing POD inventory, need to find a better wat to obtain device ID
-        private DeviceId getDeviceId(Set<Pod> pods, KubevirtPort port) {
-            Set<Pod> defaultPods = pods.stream()
-                    .filter(pod -> pod.getMetadata().getNamespace().equals(DEFAULT))
-                    .collect(Collectors.toSet());
-
-            Set<KubevirtPort> allPorts = new HashSet<>();
-            for (Pod pod : defaultPods) {
-                allPorts.addAll(getPorts(nodeService, networkAdminService.networks(), pod));
-            }
-
-            return allPorts.stream().filter(p -> p.macAddress().equals(port.macAddress()))
-                    .map(KubevirtPort::deviceId).findFirst().orElse(null);
-        }
-
         private Map<String, IpAddress> parseIpAddresses(String resource) {
             try {
                 ObjectMapper mapper = new ObjectMapper();
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmiWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmiWatcher.java
new file mode 100644
index 0000000..9e0ce3e
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmiWatcher.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
+import org.onosproject.kubevirtnetworking.api.KubevirtPort;
+import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Kubernetes VMI watcher used for feeding VMI information.
+ */
+@Component(immediate = true)
+public class KubevirtVmiWatcher {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String STATUS = "status";
+    private static final String NODE_NAME = "nodeName";
+    private static final String METADATA = "metadata";
+    private static final String NAME = "name";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtNodeService nodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtNetworkAdminService networkAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtPortAdminService portAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtApiConfigService configService;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+    private final InternalKubevirtVmiWatcher watcher = new InternalKubevirtVmiWatcher();
+    private final InternalKubevirtApiConfigListener
+            configListener = new InternalKubevirtApiConfigListener();
+
+    CustomResourceDefinitionContext vmiCrdCxt = new CustomResourceDefinitionContext
+            .Builder()
+            .withGroup("kubevirt.io")
+            .withScope("Namespaced")
+            .withVersion("v1")
+            .withPlural("virtualmachineinstances")
+            .build();
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        configService.addListener(configListener);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        configService.removeListener(configListener);
+        leadershipService.withdraw(appId.name());
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    private void instantiateWatcher() {
+        KubernetesClient client = k8sClient(configService);
+
+        if (client != null) {
+            try {
+                client.customResource(vmiCrdCxt).watch(watcher);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(KubevirtApiConfigEvent event) {
+
+            switch (event.type()) {
+                case KUBEVIRT_API_CONFIG_UPDATED:
+                    eventExecutor.execute(this::processConfigUpdate);
+                    break;
+                case KUBEVIRT_API_CONFIG_CREATED:
+                case KUBEVIRT_API_CONFIG_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+
+        private void processConfigUpdate() {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            instantiateWatcher();
+        }
+    }
+
+    private class InternalKubevirtVmiWatcher implements Watcher<String> {
+
+        @Override
+        public void eventReceived(Action action, String s) {
+            switch (action) {
+                case ADDED:
+                case MODIFIED:
+                    eventExecutor.execute(() -> processAddition(s));
+                    break;
+                case ERROR:
+                    log.warn("Failures processing VM manipulation.");
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        @Override
+        public void onClose(WatcherException e) {
+            log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
+            instantiateWatcher();
+        }
+
+        private void processAddition(String resource) {
+            if (!isMaster()) {
+                return;
+            }
+
+            String nodeName = parseNodeName(resource);
+            String vmiName = parseVmiName(resource);
+
+            if (nodeName == null) {
+                return;
+            }
+
+            KubevirtNode node = nodeService.node(nodeName);
+
+            if (node == null) {
+                log.warn("VMI {} scheduled on node {} is not ready, " +
+                                "we wait for a while...", vmiName, nodeName);
+                waitFor(2);
+            }
+
+            Set<KubevirtPort> ports = getPorts(nodeService,
+                                        networkAdminService.networks(), resource);
+
+            if (ports.size() == 0) {
+                return;
+            }
+
+            ports.forEach(port -> {
+                KubevirtPort existing = portAdminService.port(port.macAddress());
+
+                if (existing != null) {
+                    if (port.deviceId() != null && existing.deviceId() == null) {
+                        KubevirtPort updated = existing.updateDeviceId(port.deviceId());
+                        // internal we update device ID of kubevirt port
+                        portAdminService.updatePort(updated);
+                    }
+                }
+            });
+        }
+
+        private boolean isMaster() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        private String parseVmiName(String resource) {
+            String vmiName = null;
+
+            try {
+                ObjectMapper mapper = new ObjectMapper();
+                JsonNode json = mapper.readTree(resource);
+                JsonNode metadataJson = json.get(METADATA);
+                JsonNode vmiNameJson = metadataJson.get(NAME);
+                vmiName = vmiNameJson != null ? vmiNameJson.asText() : null;
+            } catch (IOException e) {
+                log.error("Failed to parse kubevirt VMI name");
+            }
+
+            return vmiName;
+        }
+
+        private String parseNodeName(String resource) {
+            String nodeName = null;
+            try {
+                ObjectMapper mapper = new ObjectMapper();
+                JsonNode json = mapper.readTree(resource);
+                JsonNode statusJson = json.get(STATUS);
+                JsonNode nodeNameJson = statusJson.get(NODE_NAME);
+                nodeName = nodeNameJson != null ? nodeNameJson.asText() : null;
+            } catch (IOException e) {
+                log.error("Failed to parse kubevirt VMI nodename");
+            }
+
+            return nodeName;
+        }
+    }
+}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
index 8771dda..a60bd0a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
@@ -15,16 +15,16 @@
  */
 package org.onosproject.kubevirtnetworking.util;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
-import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.net.util.SubnetUtils;
-import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.onlab.osgi.DefaultServiceDirectory;
@@ -62,7 +62,6 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -90,6 +89,10 @@
     private static final String MAC = "mac";
     private static final String IPS = "ips";
     private static final String BR_INT = "br-int";
+    private static final String STATUS = "status";
+    private static final String INTERFACES = "interfaces";
+    private static final String IP_ADDRESS = "ipAddress";
+    private static final String NODE_NAME = "nodeName";
 
     /**
      * Prevents object installation from external.
@@ -325,65 +328,68 @@
     }
 
     /**
-     * Obtains the kubevirt port from kubevirt POD.
+     * Obtains the kubevirt port from kubevirt VMI.
      *
      * @param nodeService kubevirt node service
      * @param networks set of existing kubevirt networks
-     * @param pod      kubevirt POD
-     * @return kubevirt ports attached to the POD
+     * @param resource  VMI definition
+     * @return kubevirt ports attached to the VMI
      */
     public static Set<KubevirtPort> getPorts(KubevirtNodeService nodeService,
-                                             Set<KubevirtNetwork> networks, Pod pod) {
+                                             Set<KubevirtNetwork> networks,
+                                             String resource) {
         try {
-            Map<String, String> annots = pod.getMetadata().getAnnotations();
-            if (annots == null) {
-                return ImmutableSet.of();
-            }
-
-            if (!annots.containsKey(NETWORK_STATUS_KEY)) {
-                return ImmutableSet.of();
-            }
-
-            String networkStatusStr = annots.get(NETWORK_STATUS_KEY);
-
-            if (networkStatusStr == null) {
-                return ImmutableSet.of();
-            }
+            ObjectMapper mapper = new ObjectMapper();
+            JsonNode json = mapper.readTree(resource);
+            JsonNode statusJson = json.get(STATUS);
+            ArrayNode interfacesJson = (ArrayNode) statusJson.get(INTERFACES);
 
             KubevirtPort.Builder builder = DefaultKubevirtPort.builder();
-
-            KubevirtNode node = nodeService.node(pod.getSpec().getNodeName());
-
-            if (node != null) {
-                builder.deviceId(node.intgBridge());
+            String nodeName = parseVmiNodeName(resource);
+            if (nodeName != null && nodeService.node(nodeName) != null) {
+                builder.deviceId(nodeService.node(nodeName).intgBridge());
             }
 
-            JSONArray networkStatus = new JSONArray(networkStatusStr);
-            Set<KubevirtPort> ports = new HashSet<>();
+            if (interfacesJson == null) {
+                return ImmutableSet.of();
+            }
 
-            for (int i = 0; i < networkStatus.length(); i++) {
-                JSONObject object = networkStatus.getJSONObject(i);
-                String name = object.getString(NAME);
+            Set<KubevirtPort> ports = new HashSet<>();
+            for (JsonNode interfaceJson : interfacesJson) {
+                String name = interfaceJson.get(NAME).asText();
                 KubevirtNetwork network = networks.stream()
-                        .filter(n -> (NETWORK_PREFIX + n.name()).equals(name) || (n.name()).equals(name))
+                        .filter(n -> (NETWORK_PREFIX + n.name()).equals(name) ||
+                                     (n.name() + "-net").equals(name))
                         .findAny().orElse(null);
                 if (network != null) {
-                    String mac = object.getString(MAC);
-
+                    // FIXME: we do not update IP address, as learning IP address
+                    // requires much more time due to the lag from VM agent
+                    String mac = interfaceJson.get(MAC).asText();
                     builder.macAddress(MacAddress.valueOf(mac))
                             .networkId(network.networkId());
-
                     ports.add(builder.build());
                 }
             }
-
             return ports;
+        } catch (IOException e) {
+            log.error("Failed to parse port info from VMI object", e);
+        }
+        return ImmutableSet.of();
+    }
 
-        } catch (JSONException e) {
-            log.error("Failed to parse network status object", e);
+    public static String parseVmiNodeName(String resource) {
+        String nodeName = null;
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+            JsonNode json = mapper.readTree(resource);
+            JsonNode statusJson = json.get(STATUS);
+            JsonNode nodeNameJson = statusJson.get(NODE_NAME);
+            nodeName = nodeNameJson != null ? nodeNameJson.asText() : null;
+        } catch (IOException e) {
+            log.error("Failed to parse kubevirt VMI nodename");
         }
 
-        return ImmutableSet.of();
+        return nodeName;
     }
 
     /**
@@ -663,4 +669,17 @@
                 .filter(lb -> router.internal().contains(lb.networkId()))
                 .collect(Collectors.toSet());
     }
+
+    /**
+     * Waits for the given length of time.
+     *
+     * @param timeSecond the amount of time for wait in second unit
+     */
+    public static void waitFor(int timeSecond) {
+        try {
+            Thread.sleep(timeSecond * 1000L);
+        } catch (Exception e) {
+            log.error(e.toString());
+        }
+    }
 }