Fix: add physnet patch-ports related rules on kubevirt node completion

Change-Id: I51caa6f923583d48ce7580e9f6b2e811ace6e055
(cherry picked from commit 6d2dd5aa6c8f1b98dcbc549d7bf0d2858cf981e0)
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
index 9d9757d..b94c7c4 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
@@ -17,10 +17,13 @@
 
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
 import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
 import org.onosproject.kubevirtnode.api.KubevirtNodeService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
@@ -41,6 +44,7 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
@@ -78,17 +82,21 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected KubevirtFlowRuleService flowRuleService;
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected KubevirtNodeService nodeService;
+    protected KubevirtNodeService kubevirtNodeService;
 
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
     private final InternalDeviceListener internalDeviceListener = new InternalDeviceListener();
+    private final InternalKubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
     private ApplicationId appId;
+    private NodeId localNodeId;
 
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
         deviceService.addListener(internalDeviceListener);
+        kubevirtNodeService.addListener(kubevirtNodeListener);
         log.info("Started");
     }
 
@@ -96,6 +104,7 @@
     protected void deactivate() {
         eventExecutor.shutdown();
         deviceService.removeListener(internalDeviceListener);
+        kubevirtNodeService.removeListener(kubevirtNodeListener);
         log.info("Stopped");
     }
 
@@ -135,7 +144,7 @@
                 return false;
             }
 
-            KubevirtNode node = nodeService.node(event.subject().id());
+            KubevirtNode node = kubevirtNodeService.node(event.subject().id());
             if (node == null) {
                 return false;
             }
@@ -179,4 +188,35 @@
                     event.port().number(), false);
         }
     }
+
+    private class InternalKubevirtNodeListener implements KubevirtNodeListener {
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(KubevirtNodeEvent event) {
+            switch (event.type()) {
+                case KUBEVIRT_NODE_COMPLETE:
+                    eventExecutor.execute(() -> processNodeCompletion(event.subject()));
+                    break;
+                case KUBEVIRT_NODE_INCOMPLETE:
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+
+        private void processNodeCompletion(KubevirtNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            deviceService.getPorts(node.intgBridge()).forEach(p -> {
+                if (containsPhyPatchPort(node, p)) {
+                    setIngressRuleForPatchPort(node.intgBridge(), p.number(), true);
+                }
+            });
+        }
+    }
 }