Fix: install gateway upstream rules on node completion event

Change-Id: I0abd3a991b6539637085e65ca122e3ea5c8ae62a
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubevirtSyncRulesCommand.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubevirtSyncRulesCommand.java
index 2c0f57d..80db82a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubevirtSyncRulesCommand.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubevirtSyncRulesCommand.java
@@ -22,7 +22,6 @@
 import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
 
 import static java.lang.Thread.sleep;
-import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
 import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
 import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
 
@@ -50,7 +49,7 @@
             return;
         }
 
-        nodeAdminService.completeNodes(WORKER).forEach(node ->
+        nodeAdminService.completeNodes().forEach(node ->
                 syncRulesBaseForNode(nodeAdminService, node));
 
         print("Successfully requested re-installing flow rules.");
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java
index b7d0842..d59a260 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java
@@ -37,6 +37,8 @@
 import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
 import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
 import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
 import org.onosproject.kubevirtnode.api.KubevirtNodeService;
 import org.onosproject.net.Device;
 import org.onosproject.net.PortNumber;
@@ -127,6 +129,8 @@
 
     private final InternalRouterEventListener kubevirtRouterListener =
             new InternalRouterEventListener();
+    private final InternalNodeListener kubevirtNodeListener =
+            new InternalNodeListener();
 
     @Activate
     protected void activate() {
@@ -134,6 +138,7 @@
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
         kubevirtRouterService.addListener(kubevirtRouterListener);
+        kubevirtNodeService.addListener(kubevirtNodeListener);
 
         log.info("Started");
     }
@@ -142,6 +147,7 @@
     protected void deactivate() {
         leadershipService.withdraw(appId.name());
         kubevirtRouterService.removeListener(kubevirtRouterListener);
+        kubevirtNodeService.removeListener(kubevirtNodeListener);
 
         eventExecutor.shutdown();
 
@@ -464,4 +470,41 @@
             setFloatingIpRulesForFip(router, floatingIp, electedGw, false);
         }
     }
+
+    private class InternalNodeListener implements KubevirtNodeListener {
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(KubevirtNodeEvent event) {
+            switch (event.type()) {
+                case KUBEVIRT_NODE_COMPLETE:
+                    eventExecutor.execute(() -> processNodeCompletion(event.subject()));
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processNodeCompletion(KubevirtNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            for (KubevirtFloatingIp fip : kubevirtRouterService.floatingIps()) {
+                KubevirtRouter router = kubevirtRouterService.router(fip.routerName());
+                if (router == null) {
+                    log.warn("The router {} is not found", fip.routerName());
+                    continue;
+                }
+
+                if (node.hostname().equals(router.electedGateway())) {
+                    setFloatingIpRulesForFip(router, fip, node, true);
+                    log.info("Configure floating IP {} on gateway {}",
+                                fip.floatingIp().toString(), node.hostname());
+                }
+            }
+        }
+    }
 }
diff --git a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/web/KubevirtNodeWebResource.java b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/web/KubevirtNodeWebResource.java
index 06b06e8..0eb371b 100644
--- a/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/web/KubevirtNodeWebResource.java
+++ b/apps/kubevirt-node/app/src/main/java/org/onosproject/kubevirtnode/web/KubevirtNodeWebResource.java
@@ -49,7 +49,6 @@
 import static javax.ws.rs.core.Response.created;
 import static org.onlab.util.Tools.nullIsIllegal;
 import static org.onlab.util.Tools.readTreeFromStream;
-import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
 import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
 import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
 
@@ -269,7 +268,7 @@
 
         KubevirtNodeAdminService service = get(KubevirtNodeAdminService.class);
 
-        service.completeNodes(WORKER).forEach(this::syncRulesBase);
+        service.completeNodes().forEach(this::syncRulesBase);
         return ok(mapper().createObjectNode()).build();
     }