Fix: associate tunnel ingress rules with network events

Change-Id: If5e5ca996d6c3c106450454c3a944a3c9871a6b9
(cherry picked from commit b689e57976810f5c80aed99f5c4d1c8c37f4016b)
diff --git a/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/DefaultKubevirtPort.java b/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/DefaultKubevirtPort.java
index 2a7385f..ccd35e9 100644
--- a/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/DefaultKubevirtPort.java
+++ b/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/DefaultKubevirtPort.java
@@ -92,7 +92,12 @@
                 DefaultServiceDirectory.getService(KubevirtNodeService.class);
         KubevirtNetwork network = networkService.network(networkId);
         KubevirtNode node = nodeService.node(deviceId);
-        return network.tenantDeviceId(node.hostname());
+
+        if (network == null || node == null) {
+            return null;
+        } else {
+            return network.tenantDeviceId(node.hostname());
+        }
     }
 
     @Override
@@ -100,9 +105,13 @@
         KubevirtNetworkService networkService =
                 DefaultServiceDirectory.getService(KubevirtNetworkService.class);
         KubevirtNetwork network = networkService.network(networkId);
-        return network.type() == KubevirtNetwork.Type.VXLAN ||
-                network.type() == KubevirtNetwork.Type.GRE ||
-                network.type() == KubevirtNetwork.Type.GENEVE;
+        if (network == null) {
+            return false;
+        } else {
+            return network.type() == KubevirtNetwork.Type.VXLAN ||
+                    network.type() == KubevirtNetwork.Type.GRE ||
+                    network.type() == KubevirtNetwork.Type.GENEVE;
+        }
     }
 
     @Override
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
index c95f8c4..47f778a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
@@ -1230,7 +1230,7 @@
                 return;
             }
 
-            nodeService.completeNodes().forEach(n -> {
+            nodeService.completeNodes(WORKER).forEach(n -> {
                 removePatchInterface(n, network);
                 removeBridge(n, network);
             });
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
index f49b03f..669f22ff 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
@@ -134,9 +134,8 @@
         public void event(KubevirtPodEvent event) {
             switch (event.type()) {
                 case KUBEVIRT_POD_UPDATED:
-                    eventExecutor.execute(() -> processPodUpdate(event.subject()));
-                    break;
                 case KUBEVIRT_POD_CREATED:
+                    eventExecutor.execute(() -> processPodAddition(event.subject()));
                 case KUBEVIRT_POD_REMOVED:
                 default:
                     // do nothing
@@ -144,7 +143,7 @@
             }
         }
 
-        private void processPodUpdate(Pod pod) {
+        private void processPodAddition(Pod pod) {
             if (!isRelevantHelper()) {
                 return;
             }
@@ -173,6 +172,7 @@
 
             Set<KubevirtPort> ports = getPorts(kubevirtNodeService,
                                       kubevirtNetworkAdminService.networks(), pod);
+
             if (ports.size() == 0) {
                 return;
             }
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
index c4802a6..09cc288 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
@@ -379,6 +379,23 @@
         if (deviceId == null) {
             return;
         }
+
+        // we check whether the given device is available from the store
+        // if not we will wait until the device is eventually created
+        // FIXME: it would be better to listen to device event to perform
+        // pipeline initialization rather on network events.
+        while (true) {
+            if (deviceService.getDevice(deviceId) != null) {
+                break;
+            } else {
+                try {
+                    sleep(SLEEP_MS);
+                } catch (InterruptedException e) {
+                    log.error("Failed to install security group default rules.");
+                }
+            }
+        }
+
         initializeTenantIngressTable(deviceId, install);
         initializeTenantEgressTable(deviceId, install);
         initializeTenantConnTrackTable(deviceId, install);
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
index 066ce49..30295d1 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
@@ -25,6 +25,8 @@
 import org.onosproject.core.CoreService;
 import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
 import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkEvent;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkListener;
 import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
 import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
 import org.onosproject.kubevirtnetworking.api.KubevirtPort;
@@ -53,6 +55,7 @@
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 
+import static java.lang.Thread.sleep;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
@@ -73,6 +76,7 @@
 @Component(immediate = true)
 public class KubevirtSwitchingTenantHandler {
     private final Logger log = getLogger(getClass());
+    private static final long SLEEP_MS = 3000; // we wait 3s
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
@@ -100,6 +104,8 @@
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
 
+    private final InternalKubevirtNetworkListener kubevirtNetworkListener =
+            new InternalKubevirtNetworkListener();
     private final InternalKubevirtPortListener kubevirtPortListener =
             new InternalKubevirtPortListener();
     private final InternalKubevirtNodeListener kubevirtNodeListener =
@@ -114,6 +120,7 @@
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
         kubevirtPortService.addListener(kubevirtPortListener);
+        kubevirtNetworkService.addListener(kubevirtNetworkListener);
         kubevirtNodeService.addListener(kubevirtNodeListener);
 
         log.info("Started");
@@ -121,6 +128,7 @@
 
     @Deactivate
     protected void deactivate() {
+        kubevirtNetworkService.removeListener(kubevirtNetworkListener);
         kubevirtPortService.removeListener(kubevirtPortListener);
         kubevirtNodeService.removeListener(kubevirtNodeListener);
         leadershipService.withdraw(appId.name());
@@ -129,13 +137,7 @@
         log.info("Stopped");
     }
 
-    private void setIngressRules(KubevirtPort port, boolean install) {
-        if (port.ipAddress() == null) {
-            return;
-        }
-
-        KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
-
+    private void setIngressRules(KubevirtNetwork network, boolean install) {
         if (network == null) {
             return;
         }
@@ -148,33 +150,65 @@
             return;
         }
 
-        KubevirtNode localNode = kubevirtNodeService.node(port.deviceId());
-        if (localNode == null || localNode.type() == MASTER) {
-            return;
+        for (KubevirtNode localNode : kubevirtNodeService.completeNodes(WORKER)) {
+
+            while (true) {
+                if (tunnelToTenantPort(localNode, network) != null) {
+                    break;
+                } else {
+                    try {
+                        sleep(SLEEP_MS);
+                    } catch (InterruptedException e) {
+                        log.error("Failed to install security group default rules.");
+                    }
+                }
+            }
+
+            PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
+
+            TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+                    .matchTunnelId(Long.parseLong(network.segmentId()));
+
+            TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+                    .setOutput(patchPortNumber);
+
+            flowRuleService.setRule(
+                    appId,
+                    localNode.tunBridge(),
+                    sBuilder.build(),
+                    tBuilder.build(),
+                    PRIORITY_TUNNEL_RULE,
+                    TUNNEL_DEFAULT_TABLE,
+                    install);
+
+            log.debug("Install ingress rules for segment ID {}", network.segmentId());
         }
+    }
 
-        PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
-        if (patchPortNumber == null) {
-            return;
+    private void setIngressRules(KubevirtNode node, boolean install) {
+        for (KubevirtNetwork network : kubevirtNetworkService.tenantNetworks()) {
+            PortNumber patchPortNumber = tunnelToTenantPort(node, network);
+            if (patchPortNumber == null) {
+                return;
+            }
+
+            TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+                    .matchTunnelId(Long.parseLong(network.segmentId()));
+
+            TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+                    .setOutput(patchPortNumber);
+
+            flowRuleService.setRule(
+                    appId,
+                    node.tunBridge(),
+                    sBuilder.build(),
+                    tBuilder.build(),
+                    PRIORITY_TUNNEL_RULE,
+                    TUNNEL_DEFAULT_TABLE,
+                    install);
+
+            log.debug("Install ingress rules for segment ID {}", network.segmentId());
         }
-
-        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
-                .matchTunnelId(Long.parseLong(network.segmentId()));
-
-        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
-                .setOutput(patchPortNumber);
-
-        flowRuleService.setRule(
-                appId,
-                localNode.tunBridge(),
-                sBuilder.build(),
-                tBuilder.build(),
-                PRIORITY_TUNNEL_RULE,
-                TUNNEL_DEFAULT_TABLE,
-                install);
-
-        log.debug("Install ingress rules for instance {}, segment ID {}",
-                port.ipAddress(), network.segmentId());
     }
 
     private void setEgressRules(KubevirtPort port, boolean install) {
@@ -284,15 +318,52 @@
                 return;
             }
 
+            setIngressRules(node, true);
             kubevirtPortService.ports().stream()
                     .filter(port -> node.equals(kubevirtNodeService.node(port.deviceId())))
                     .forEach(port -> {
-                        setIngressRules(port, true);
                         setEgressRules(port, true);
                     });
         }
     }
 
+    private class InternalKubevirtNetworkListener implements KubevirtNetworkListener {
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(KubevirtNetworkEvent event) {
+            switch (event.type()) {
+                case KUBEVIRT_NETWORK_CREATED:
+                    eventExecutor.execute(() -> processNetworkAddition(event.subject()));
+                    break;
+                case KUBEVIRT_NETWORK_REMOVED:
+                    eventExecutor.execute(() -> processNetworkRemoval(event.subject()));
+                    break;
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+
+        private void processNetworkAddition(KubevirtNetwork network) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setIngressRules(network, true);
+        }
+
+        private void processNetworkRemoval(KubevirtNetwork network) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setIngressRules(network, false);
+        }
+    }
+
     private class InternalKubevirtPortListener implements KubevirtPortListener {
 
         @Override
@@ -325,7 +396,6 @@
                 return;
             }
 
-            setIngressRules(port, true);
             setEgressRules(port, true);
         }
 
@@ -334,7 +404,6 @@
                 return;
             }
 
-            setIngressRules(port, false);
             setEgressRules(port, false);
         }
     }
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
index 78a7f51..43d3e64 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
@@ -73,8 +73,6 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final long SLEEP_MS = 3000; // we wait 3s
-
     private static final String SPEC = "spec";
     private static final String TEMPLATE = "template";
     private static final String METADATA = "metadata";