Add a kubernetes port and pod mapper with sync/purge rules CLIs

Change-Id: I89ed29e4357b84345f95fddf81ab7156715d7c82
diff --git a/apps/k8s-networking/app/BUILD b/apps/k8s-networking/app/BUILD
index 515cf0d..725e27d 100644
--- a/apps/k8s-networking/app/BUILD
+++ b/apps/k8s-networking/app/BUILD
@@ -7,6 +7,13 @@
     "@commons_net//jar",
     "@k8s_client//jar",
     "@k8s_model//jar",
+    "@okhttp//jar",
+    "@okio//jar",
+    "@logging_interceptor//jar",
+    "@jackson_dataformat_yaml//jar",
+    "@snakeyaml//jar",
+    "@zjsonpatch//jar",
+    "@validation_api//jar",
 ]
 
 TEST_DEPS = TEST_ADAPTERS + TEST_REST + [
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
new file mode 100644
index 0000000..1658a35
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.flow.FlowRuleService;
+
+import static java.lang.Thread.sleep;
+import static java.util.stream.StreamSupport.stream;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+
+/**
+ * Purges all existing flow rules installed for kubernetes networks.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-purge-rules",
+        description = "Purges all flow rules installed by kubernetes networking app")
+public class K8sPurgeRulesCommand extends AbstractShellCommand {
+
+    private static final long TIMEOUT_MS = 10000; // we wait 10s
+    private static final long SLEEP_MS = 2000; // we wait 2s for init each node
+
+    @Override
+    protected void doExecute() {
+        FlowRuleService flowRuleService = get(FlowRuleService.class);
+        CoreService coreService = get(CoreService.class);
+        ApplicationId appId = coreService.getAppId(K8S_NETWORKING_APP_ID);
+
+        if (appId == null) {
+            error("Failed to purge kubernetes networking flow rules.");
+            return;
+        }
+
+        flowRuleService.removeFlowRulesById(appId);
+        print("Successfully purged flow rules installed by kubernetes networking app.");
+
+        boolean result = true;
+        long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
+
+        // we make sure all flow rules are removed from the store
+        while (stream(flowRuleService.getFlowEntriesById(appId)
+                .spliterator(), false).count() > 0) {
+
+            long  waitMs = timeoutExpiredMs - System.currentTimeMillis();
+
+            try {
+                sleep(SLEEP_MS);
+            } catch (InterruptedException e) {
+                log.error("Exception caused during rule purging...");
+            }
+
+            if (stream(flowRuleService.getFlowEntriesById(appId)
+                    .spliterator(), false).count() == 0) {
+                break;
+            } else {
+                flowRuleService.removeFlowRulesById(appId);
+                print("Failed to purging flow rules, retrying rule purging...");
+            }
+
+            if (waitMs <= 0) {
+                result = false;
+                break;
+            }
+        }
+
+        if (result) {
+            print("Successfully purged flow rules!");
+        } else {
+            error("Failed to purge flow rules.");
+        }
+    }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeState.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeStateCommand.java
similarity index 88%
rename from apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeState.java
rename to apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeStateCommand.java
index ccff90a..7b87d7e 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeState.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeStateCommand.java
@@ -19,6 +19,7 @@
 import org.apache.karaf.shell.api.action.lifecycle.Service;
 import org.onosproject.cli.AbstractShellCommand;
 import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
+import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
 import org.onosproject.k8snetworking.api.K8sPodAdminService;
 import org.onosproject.k8snetworking.api.K8sServiceAdminService;
 
@@ -28,10 +29,11 @@
 @Service
 @Command(scope = "onos", name = "k8s-purge-states",
         description = "Purges all kubernetes states")
-public class K8sPurgeState extends AbstractShellCommand {
+public class K8sPurgeStateCommand extends AbstractShellCommand {
     @Override
     protected void doExecute() {
         get(K8sPodAdminService.class).clear();
+        get(K8sNetworkAdminService.class).clear();
         get(K8sEndpointsAdminService.class).clear();
         get(K8sServiceAdminService.class).clear();
     }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncRulesCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncRulesCommand.java
new file mode 100644
index 0000000..097292c
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncRulesCommand.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeAdminService;
+
+import static java.lang.Thread.sleep;
+import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
+import static org.onosproject.k8snode.api.K8sNodeState.INIT;
+
+/**
+ * Synchronizes flow rules to provide connectivity for kubernetes pods.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-sync-rules",
+        description = "Synchronizes all kubernetes flow rules")
+public class K8sSyncRulesCommand extends AbstractShellCommand {
+
+    private static final long SLEEP_MS = 3000; // we wait 3s for init each node
+    private static final long TIMEOUT_MS = 10000; // we wait 10s
+
+    private static final String SUCCESS_MSG = "Successfully synchronize flow rules for node %s!";
+    private static final String FAIL_MSG = "Failed to synchronize flow rules for node %s.";
+
+    @Override
+    protected void doExecute() {
+
+        K8sNodeAdminService adminService = get(K8sNodeAdminService.class);
+        if (adminService == null) {
+            error("Failed to re-install flow rules for kubernetes networking.");
+            return;
+        }
+
+        adminService.completeNodes().forEach(node ->
+                syncRulesBaseForNode(adminService, node));
+
+        print("Successfully requested re-installing flow rules.");
+    }
+
+    private void syncRulesBaseForNode(K8sNodeAdminService adminService,
+                                      K8sNode k8sNode) {
+        K8sNode updated = k8sNode.updateState(INIT);
+        adminService.updateNode(updated);
+
+        boolean result = true;
+        long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
+
+        while (adminService.node(k8sNode.hostname()).state() != COMPLETE) {
+
+            long  waitMs = timeoutExpiredMs - System.currentTimeMillis();
+
+            try {
+                sleep(SLEEP_MS);
+            } catch (InterruptedException e) {
+                error("Exception caused during node synchronization...");
+            }
+
+            if (adminService.node(k8sNode.hostname()).state() == COMPLETE) {
+                break;
+            } else {
+                adminService.updateNode(updated);
+                print("Failed to synchronize flow rules, retrying...");
+            }
+
+            if (waitMs <= 0) {
+                result = false;
+                break;
+            }
+        }
+
+        if (result) {
+            print(SUCCESS_MSG, k8sNode.hostname());
+        } else {
+            error(FAIL_MSG, k8sNode.hostname());
+        }
+    }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncStateCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncStateCommand.java
index 1592755..170425c 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncStateCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncStateCommand.java
@@ -21,15 +21,25 @@
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.karaf.shell.api.action.Command;
 import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
 import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snetworking.api.DefaultK8sPort;
 import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
+import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
 import org.onosproject.k8snetworking.api.K8sPodAdminService;
+import org.onosproject.k8snetworking.api.K8sPort;
 import org.onosproject.k8snetworking.api.K8sServiceAdminService;
 import org.onosproject.k8snetworking.util.K8sNetworkingUtil;
 import org.onosproject.k8snode.api.K8sApiConfig;
 import org.onosproject.k8snode.api.K8sApiConfigService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
 
 import java.util.List;
+import java.util.Map;
+
+import static org.onosproject.k8snetworking.api.K8sPort.State.INACTIVE;
 
 /**
  * Synchronizes kubernetes states.
@@ -43,6 +53,13 @@
     private static final String SERVICE_FORMAT = "%-50s%-30s%-30s";
     private static final String ENDPOINTS_FORMAT = "%-50s%-50s%-20s";
 
+    private static final String PORT_ID = "portId";
+    private static final String DEVICE_ID = "deviceId";
+    private static final String PORT_NUMBER = "portNumber";
+    private static final String IP_ADDRESS = "ipAddress";
+    private static final String MAC_ADDRESS = "macAddress";
+    private static final String NETWORK_ID = "networkId";
+
     @Override
     protected void doExecute() {
         K8sApiConfigService configService = get(K8sApiConfigService.class);
@@ -51,6 +68,8 @@
                 get(K8sServiceAdminService.class);
         K8sEndpointsAdminService endpointsAdminService =
                 get(K8sEndpointsAdminService.class);
+        K8sNetworkAdminService networkAdminService =
+                get(K8sNetworkAdminService.class);
 
         K8sApiConfig config =
                 configService.apiConfigs().stream().findAny().orElse(null);
@@ -96,6 +115,9 @@
             } else {
                 podAdminService.createPod(pod);
             }
+
+            syncPortFromPod(pod, networkAdminService);
+
             printPod(pod);
         });
     }
@@ -139,4 +161,36 @@
                 pod.getStatus().getPodIP(),
                 containers.isEmpty() ? "" : containers);
     }
+
+    private void syncPortFromPod(Pod pod, K8sNetworkAdminService adminService) {
+        Map<String, String> annotations = pod.getMetadata().getAnnotations();
+        if (annotations != null && !annotations.isEmpty() &&
+                annotations.get(PORT_ID) != null) {
+            String portId = annotations.get(PORT_ID);
+
+            K8sPort oldPort = adminService.port(portId);
+
+            String networkId = annotations.get(NETWORK_ID);
+            DeviceId deviceId = DeviceId.deviceId(annotations.get(DEVICE_ID));
+            PortNumber portNumber = PortNumber.portNumber(annotations.get(PORT_NUMBER));
+            IpAddress ipAddress = IpAddress.valueOf(annotations.get(IP_ADDRESS));
+            MacAddress macAddress = MacAddress.valueOf(annotations.get(MAC_ADDRESS));
+
+            K8sPort newPort = DefaultK8sPort.builder()
+                    .portId(portId)
+                    .networkId(networkId)
+                    .deviceId(deviceId)
+                    .ipAddress(ipAddress)
+                    .macAddress(macAddress)
+                    .portNumber(portNumber)
+                    .state(INACTIVE)
+                    .build();
+
+            if (oldPort == null) {
+                adminService.createPort(newPort);
+            } else {
+                adminService.updatePort(newPort);
+            }
+        }
+    }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
index 371a34a..683b070 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
@@ -25,7 +25,8 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
 import org.onosproject.k8snode.api.K8sApiConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.osgi.service.component.annotations.Activate;
@@ -74,7 +75,8 @@
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
     private final Watcher<Endpoints>
             internalEndpointsWatcher = new InternalK8sEndpointsWatcher();
-
+    private final InternalK8sApiConfigListener
+            internalK8sApiConfigListener = new InternalK8sApiConfigListener();
 
     private ApplicationId appId;
     private NodeId localNodeId;
@@ -84,36 +86,52 @@
         appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
-
-        initWatcher();
+        k8sApiConfigService.addListener(internalK8sApiConfigListener);
 
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        k8sApiConfigService.removeListener(internalK8sApiConfigListener);
         leadershipService.withdraw(appId.name());
         eventExecutor.shutdown();
 
         log.info("Stopped");
     }
 
-    private void initWatcher() {
-        K8sApiConfig config =
-                k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
-        if (config == null) {
-            log.error("Failed to find valid kubernetes API configuration.");
-            return;
+    private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
-        KubernetesClient client = k8sClient(config);
+        @Override
+        public void event(K8sApiConfigEvent event) {
 
-        if (client == null) {
-            log.error("Failed to connect to kubernetes API server.");
-            return;
+            switch (event.type()) {
+                case K8S_API_CONFIG_UPDATED:
+                    eventExecutor.execute(this::processConfigUpdating);
+                    break;
+                case K8S_API_CONFIG_CREATED:
+                case K8S_API_CONFIG_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
         }
 
-        client.endpoints().watch(internalEndpointsWatcher);
+        private void processConfigUpdating() {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            KubernetesClient client = k8sClient(k8sApiConfigService);
+
+            if (client != null) {
+                client.endpoints().watch(internalEndpointsWatcher);
+            }
+        }
     }
 
     private class InternalK8sEndpointsWatcher implements Watcher<Endpoints> {
@@ -125,7 +143,9 @@
                     eventExecutor.execute(() -> processAddition(endpoints));
                     break;
                 case MODIFIED:
-                    eventExecutor.execute(() -> processModification(endpoints));
+                    // FIXME: there are too frequent endpoints update events
+                    // issued from kubernetes API server, we disable update for now
+                    // eventExecutor.execute(() -> processModification(endpoints));
                     break;
                 case DELETED:
                     eventExecutor.execute(() -> processDeletion(endpoints));
@@ -149,7 +169,7 @@
                 return;
             }
 
-            log.info("Process endpoints {} creating event from API server.",
+            log.trace("Process endpoints {} creating event from API server.",
                     endpoints.getMetadata().getName());
 
             k8sEndpointsAdminService.createEndpoints(endpoints);
@@ -160,10 +180,13 @@
                 return;
             }
 
-            log.info("Process endpoints {} updating event from API server.",
+            log.trace("Process endpoints {} updating event from API server.",
                     endpoints.getMetadata().getName());
 
-            k8sEndpointsAdminService.updateEndpoints(endpoints);
+            if (k8sEndpointsAdminService.endpoints(
+                    endpoints.getMetadata().getUid()) != null) {
+                k8sEndpointsAdminService.updateEndpoints(endpoints);
+            }
         }
 
         private void processDeletion(Endpoints endpoints) {
@@ -171,7 +194,7 @@
                 return;
             }
 
-            log.info("Process endpoints {} removal event from API server.",
+            log.trace("Process endpoints {} removal event from API server.",
                     endpoints.getMetadata().getName());
 
             k8sEndpointsAdminService.removeEndpoints(endpoints.getMetadata().getUid());
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodPortMapper.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodPortMapper.java
new file mode 100644
index 0000000..5239ee4
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodPortMapper.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import com.google.common.collect.Maps;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
+import org.onosproject.k8snetworking.api.K8sNetworkEvent;
+import org.onosproject.k8snetworking.api.K8sNetworkListener;
+import org.onosproject.k8snetworking.api.K8sPodAdminService;
+import org.onosproject.k8snetworking.api.K8sPodEvent;
+import org.onosproject.k8snetworking.api.K8sPodListener;
+import org.onosproject.k8snetworking.api.K8sPort;
+import org.onosproject.k8snode.api.K8sApiConfigService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.k8sClient;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Associates the kubernetes container port and pod.
+ */
+@Component(immediate = true)
+public class K8sPodPortMapper {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String PORT_ID = "portId";
+    private static final String DEVICE_ID = "deviceId";
+    private static final String PORT_NUMBER = "portNumber";
+    private static final String IP_ADDRESS = "ipAddress";
+    private static final String MAC_ADDRESS = "macAddress";
+    private static final String NETWORK_ID = "networkId";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DriverService driverService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNetworkAdminService k8sNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sPodAdminService k8sPodService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sApiConfigService k8sApiConfigService;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+    private final InternalK8sNetworkListener k8sNetworkListener =
+            new InternalK8sNetworkListener();
+    private final InternalK8sPodListener k8sPodListener =
+            new InternalK8sPodListener();
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        k8sNetworkService.addListener(k8sNetworkListener);
+        k8sPodService.addListener(k8sPodListener);
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        k8sNetworkService.removeListener(k8sNetworkListener);
+        k8sPodService.removeListener(k8sPodListener);
+        leadershipService.withdraw(appId.name());
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    private class InternalK8sPodListener implements K8sPodListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sPodEvent event) {
+            switch (event.type()) {
+                case K8S_POD_CREATED:
+                case K8S_POD_UPDATED:
+                    eventExecutor.execute(() -> processPodCreation(event.subject()));
+                    break;
+                case K8S_POD_REMOVED:
+                default:
+                    break;
+            }
+        }
+
+        private void processPodCreation(Pod pod) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            KubernetesClient client = k8sClient(k8sApiConfigService);
+
+            if (client == null) {
+                return;
+            }
+
+            // if the annotations were configured, we will not update it
+            if (pod.getMetadata().getAnnotations() != null) {
+                return;
+            }
+
+            Set<K8sPort> ports = k8sNetworkService.ports();
+
+            // TODO: we assume that POD IP is unique, there might be other
+            // variable which preserves better uniqueness
+            ports.stream().filter(p -> p.ipAddress().toString()
+                            .equals(pod.getStatus().getPodIP()))
+                .forEach(p -> {
+                    Map<String, String> annotations = Maps.newConcurrentMap();
+                    annotations.put(PORT_ID, p.portId());
+                    annotations.put(NETWORK_ID, p.networkId());
+                    annotations.put(DEVICE_ID, p.deviceId().toString());
+                    annotations.put(PORT_NUMBER, p.portNumber().toString());
+                    annotations.put(IP_ADDRESS, p.ipAddress().toString());
+                    annotations.put(MAC_ADDRESS, p.macAddress().toString());
+
+                    client.pods().inNamespace(pod.getMetadata().getNamespace())
+                            .withName(pod.getMetadata().getName())
+                            .edit()
+                            .editMetadata()
+                            .addToAnnotations(annotations)
+                            .endMetadata().done();
+                });
+        }
+    }
+
+    private class InternalK8sNetworkListener implements K8sNetworkListener {
+
+        private boolean isRelevantHelper(K8sNetworkEvent event) {
+            return mastershipService.isLocalMaster(event.port().deviceId());
+        }
+
+        @Override
+        public void event(K8sNetworkEvent event) {
+            switch (event.type()) {
+                case K8S_PORT_UPDATED:
+                case K8S_PORT_REMOVED:
+                    // no need to process port removal event...
+                default:
+                    break;
+            }
+        }
+    }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
index f00de40..a5c15f4 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
@@ -25,7 +25,8 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.k8snetworking.api.K8sPodAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
 import org.onosproject.k8snode.api.K8sApiConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.osgi.service.component.annotations.Activate;
@@ -73,6 +74,8 @@
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
     private final Watcher<Pod> internalK8sPodWatcher = new InternalK8sPodWatcher();
+    private final InternalK8sApiConfigListener
+            internalK8sApiConfigListener = new InternalK8sApiConfigListener();
 
     private ApplicationId appId;
     private NodeId localNodeId;
@@ -82,36 +85,52 @@
         appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
-
-        initWatcher();
+        k8sApiConfigService.addListener(internalK8sApiConfigListener);
 
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        k8sApiConfigService.removeListener(internalK8sApiConfigListener);
         leadershipService.withdraw(appId.name());
         eventExecutor.shutdown();
 
         log.info("Stopped");
     }
 
-    private void initWatcher() {
-        K8sApiConfig config =
-                k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
-        if (config == null) {
-            log.error("Failed to find valid kubernetes API configuration.");
-            return;
+    private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
-        KubernetesClient client = k8sClient(config);
+        @Override
+        public void event(K8sApiConfigEvent event) {
 
-        if (client == null) {
-            log.error("Failed to connect to kubernetes API server.");
-            return;
+            switch (event.type()) {
+                case K8S_API_CONFIG_UPDATED:
+                    eventExecutor.execute(this::processConfigUpdating);
+                    break;
+                case K8S_API_CONFIG_CREATED:
+                case K8S_API_CONFIG_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
         }
 
-        client.pods().watch(internalK8sPodWatcher);
+        private void processConfigUpdating() {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            KubernetesClient client = k8sClient(k8sApiConfigService);
+
+            if (client != null) {
+                client.pods().watch(internalK8sPodWatcher);
+            }
+        }
     }
 
     private class InternalK8sPodWatcher implements Watcher<Pod> {
@@ -146,7 +165,7 @@
                 return;
             }
 
-            log.info("Process pod {} creating event from API server.",
+            log.trace("Process pod {} creating event from API server.",
                     pod.getMetadata().getName());
 
             k8sPodAdminService.createPod(pod);
@@ -157,10 +176,12 @@
                 return;
             }
 
-            log.info("Process pod {} updating event from API server.",
+            log.trace("Process pod {} updating event from API server.",
                     pod.getMetadata().getName());
 
-            k8sPodAdminService.updatePod(pod);
+            if (k8sPodAdminService.pod(pod.getMetadata().getUid()) != null) {
+                k8sPodAdminService.updatePod(pod);
+            }
         }
 
         private void processDeletion(Pod pod) {
@@ -168,7 +189,7 @@
                 return;
             }
 
-            log.info("Process pod {} removal event from API server.",
+            log.trace("Process pod {} removal event from API server.",
                     pod.getMetadata().getName());
 
             k8sPodAdminService.removePod(pod.getMetadata().getUid());
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceWatcher.java
index 41e12c2..17f9946 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceWatcher.java
@@ -25,7 +25,8 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.k8snetworking.api.K8sServiceAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
 import org.onosproject.k8snode.api.K8sApiConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.osgi.service.component.annotations.Activate;
@@ -75,6 +76,8 @@
 
     private final InternalK8sServiceWatcher
             internalK8sServiceWatcher = new InternalK8sServiceWatcher();
+    private final InternalK8sApiConfigListener
+            internalK8sApiConfigListener = new InternalK8sApiConfigListener();
 
     private ApplicationId appId;
     private NodeId localNodeId;
@@ -84,36 +87,52 @@
         appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
-
-        initWatcher();
+        k8sApiConfigService.addListener(internalK8sApiConfigListener);
 
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        k8sApiConfigService.removeListener(internalK8sApiConfigListener);
         leadershipService.withdraw(appId.name());
         eventExecutor.shutdown();
 
         log.info("Stopped");
     }
 
-    private void initWatcher() {
-        K8sApiConfig config =
-                k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
-        if (config == null) {
-            log.error("Failed to find valid kubernetes API configuration.");
-            return;
+    private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
-        KubernetesClient client = k8sClient(config);
+        @Override
+        public void event(K8sApiConfigEvent event) {
 
-        if (client == null) {
-            log.error("Failed to connect to kubernetes API server.");
-            return;
+            switch (event.type()) {
+                case K8S_API_CONFIG_UPDATED:
+                    eventExecutor.execute(this::processConfigUpdating);
+                    break;
+                case K8S_API_CONFIG_CREATED:
+                case K8S_API_CONFIG_REMOVED:
+                default:
+                    // do nothing
+                    break;
+            }
         }
 
-        client.services().watch(internalK8sServiceWatcher);
+        private void processConfigUpdating() {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            KubernetesClient client = k8sClient(k8sApiConfigService);
+
+            if (client != null) {
+                client.services().watch(internalK8sServiceWatcher);
+            }
+        }
     }
 
     private class InternalK8sServiceWatcher implements Watcher<Service> {
@@ -149,7 +168,7 @@
                 return;
             }
 
-            log.info("Process service {} creating event from API server.",
+            log.trace("Process service {} creating event from API server.",
                     service.getMetadata().getName());
 
             k8sServiceAdminService.createService(service);
@@ -160,10 +179,13 @@
                 return;
             }
 
-            log.info("Process service {} updating event from API server.",
+            log.trace("Process service {} updating event from API server.",
                     service.getMetadata().getName());
 
-            k8sServiceAdminService.updateService(service);
+            if (k8sServiceAdminService.service(
+                    service.getMetadata().getUid()) != null) {
+                k8sServiceAdminService.updateService(service);
+            }
         }
 
         private void processDeletion(Service service) {
@@ -171,7 +193,7 @@
                 return;
             }
 
-            log.info("Process service {} removal event from API server.",
+            log.trace("Process service {} removal event from API server.",
                     service.getMetadata().getName());
 
             k8sServiceAdminService.removeService(service.getMetadata().getUid());
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
index b6c2f69..2e838e8 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
@@ -29,6 +29,7 @@
 import org.onosproject.k8snetworking.api.K8sNetwork;
 import org.onosproject.k8snetworking.api.K8sNetworkService;
 import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigService;
 import org.onosproject.k8snode.api.K8sNode;
 import org.onosproject.net.PortNumber;
 import org.slf4j.Logger;
@@ -223,4 +224,28 @@
 
         return new DefaultKubernetesClient(configBuilder.build());
     }
+
+    /**
+     * Obtains workable kubernetes client.
+     *
+     * @param service kubernetes API service
+     * @return kubernetes client
+     */
+    public static KubernetesClient k8sClient(K8sApiConfigService service) {
+        K8sApiConfig config =
+                service.apiConfigs().stream().findAny().orElse(null);
+        if (config == null) {
+            log.error("Failed to find valid kubernetes API configuration.");
+            return null;
+        }
+
+        KubernetesClient client = k8sClient(config);
+
+        if (client == null) {
+            log.error("Failed to connect to kubernetes API server.");
+            return null;
+        }
+
+        return client;
+    }
 }