Add a kubernetes port and pod mapper with sync/purge rules CLIs

Change-Id: I89ed29e4357b84345f95fddf81ab7156715d7c82
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
index 371a34a..683b070 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
@@ -25,7 +25,8 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
 import org.onosproject.k8snode.api.K8sApiConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.osgi.service.component.annotations.Activate;
@@ -74,7 +75,8 @@
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
     private final Watcher<Endpoints>
             internalEndpointsWatcher = new InternalK8sEndpointsWatcher();
-
+    private final InternalK8sApiConfigListener
+            internalK8sApiConfigListener = new InternalK8sApiConfigListener();
 
     private ApplicationId appId;
     private NodeId localNodeId;
@@ -84,36 +86,52 @@
         appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
-
-        initWatcher();
+        k8sApiConfigService.addListener(internalK8sApiConfigListener);
 
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        k8sApiConfigService.removeListener(internalK8sApiConfigListener);
         leadershipService.withdraw(appId.name());
         eventExecutor.shutdown();
 
         log.info("Stopped");
     }
 
-    private void initWatcher() {
-        K8sApiConfig config =
-                k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
-        if (config == null) {
-            log.error("Failed to find valid kubernetes API configuration.");
-            return;
+    private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
-        KubernetesClient client = k8sClient(config);
+        @Override
+        public void event(K8sApiConfigEvent event) {
 
-        if (client == null) {
-            log.error("Failed to connect to kubernetes API server.");
-            return;
+            switch (event.type()) {
+                case K8S_API_CONFIG_UPDATED:
+                    eventExecutor.execute(this::processConfigUpdating);
+                    break;
+                case K8S_API_CONFIG_CREATED:
+                case K8S_API_CONFIG_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
         }
 
-        client.endpoints().watch(internalEndpointsWatcher);
+        private void processConfigUpdating() {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            KubernetesClient client = k8sClient(k8sApiConfigService);
+
+            if (client != null) {
+                client.endpoints().watch(internalEndpointsWatcher);
+            }
+        }
     }
 
     private class InternalK8sEndpointsWatcher implements Watcher<Endpoints> {
@@ -125,7 +143,9 @@
                     eventExecutor.execute(() -> processAddition(endpoints));
                     break;
                 case MODIFIED:
-                    eventExecutor.execute(() -> processModification(endpoints));
+                    // FIXME: there are too frequent endpoints update events
+                    // issued from kubernetes API server, we disable update for now
+                    // eventExecutor.execute(() -> processModification(endpoints));
                     break;
                 case DELETED:
                     eventExecutor.execute(() -> processDeletion(endpoints));
@@ -149,7 +169,7 @@
                 return;
             }
 
-            log.info("Process endpoints {} creating event from API server.",
+            log.trace("Process endpoints {} creating event from API server.",
                     endpoints.getMetadata().getName());
 
             k8sEndpointsAdminService.createEndpoints(endpoints);
@@ -160,10 +180,13 @@
                 return;
             }
 
-            log.info("Process endpoints {} updating event from API server.",
+            log.trace("Process endpoints {} updating event from API server.",
                     endpoints.getMetadata().getName());
 
-            k8sEndpointsAdminService.updateEndpoints(endpoints);
+            if (k8sEndpointsAdminService.endpoints(
+                    endpoints.getMetadata().getUid()) != null) {
+                k8sEndpointsAdminService.updateEndpoints(endpoints);
+            }
         }
 
         private void processDeletion(Endpoints endpoints) {
@@ -171,7 +194,7 @@
                 return;
             }
 
-            log.info("Process endpoints {} removal event from API server.",
+            log.trace("Process endpoints {} removal event from API server.",
                     endpoints.getMetadata().getName());
 
             k8sEndpointsAdminService.removeEndpoints(endpoints.getMetadata().getUid());