Add a kubernetes port and pod mapper with sync/purge rules CLIs

Change-Id: I89ed29e4357b84345f95fddf81ab7156715d7c82
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
index f00de40..a5c15f4 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
@@ -25,7 +25,8 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.k8snetworking.api.K8sPodAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
 import org.onosproject.k8snode.api.K8sApiConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.osgi.service.component.annotations.Activate;
@@ -73,6 +74,8 @@
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
     private final Watcher<Pod> internalK8sPodWatcher = new InternalK8sPodWatcher();
+    private final InternalK8sApiConfigListener
+            internalK8sApiConfigListener = new InternalK8sApiConfigListener();
 
     private ApplicationId appId;
     private NodeId localNodeId;
@@ -82,36 +85,52 @@
         appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
-
-        initWatcher();
+        k8sApiConfigService.addListener(internalK8sApiConfigListener);
 
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        k8sApiConfigService.removeListener(internalK8sApiConfigListener);
         leadershipService.withdraw(appId.name());
         eventExecutor.shutdown();
 
         log.info("Stopped");
     }
 
-    private void initWatcher() {
-        K8sApiConfig config =
-                k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
-        if (config == null) {
-            log.error("Failed to find valid kubernetes API configuration.");
-            return;
+    private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
-        KubernetesClient client = k8sClient(config);
+        @Override
+        public void event(K8sApiConfigEvent event) {
 
-        if (client == null) {
-            log.error("Failed to connect to kubernetes API server.");
-            return;
+            switch (event.type()) {
+                case K8S_API_CONFIG_UPDATED:
+                    eventExecutor.execute(this::processConfigUpdating);
+                    break;
+                case K8S_API_CONFIG_CREATED:
+                case K8S_API_CONFIG_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
         }
 
-        client.pods().watch(internalK8sPodWatcher);
+        private void processConfigUpdating() {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            KubernetesClient client = k8sClient(k8sApiConfigService);
+
+            if (client != null) {
+                client.pods().watch(internalK8sPodWatcher);
+            }
+        }
     }
 
     private class InternalK8sPodWatcher implements Watcher<Pod> {
@@ -146,7 +165,7 @@
                 return;
             }
 
-            log.info("Process pod {} creating event from API server.",
+            log.trace("Process pod {} creating event from API server.",
                     pod.getMetadata().getName());
 
             k8sPodAdminService.createPod(pod);
@@ -157,10 +176,12 @@
                 return;
             }
 
-            log.info("Process pod {} updating event from API server.",
+            log.trace("Process pod {} updating event from API server.",
                     pod.getMetadata().getName());
 
-            k8sPodAdminService.updatePod(pod);
+            if (k8sPodAdminService.pod(pod.getMetadata().getUid()) != null) {
+                k8sPodAdminService.updatePod(pod);
+            }
         }
 
         private void processDeletion(Pod pod) {
@@ -168,7 +189,7 @@
                 return;
             }
 
-            log.info("Process pod {} removal event from API server.",
+            log.trace("Process pod {} removal event from API server.",
                     pod.getMetadata().getName());
 
             k8sPodAdminService.removePod(pod.getMetadata().getUid());