Fix: register a set of Affinity classes into distributed store

1. Purge group rules when issue k8s purge rules command.
2. Remove k8s port when remove OVS port.
3. Re-install group rules during sync rules.
4. Install: 1) install group rule; 2) install flow rule
   Uninstall: 1) uninstall flow rule; 2) uninstall group rule
5. Add/remove group buckets when receiving POD update/remove
   events.
6. Lower down the endpoint update logging level

Change-Id: Ib50e359a9b2c0cd9cb1490c6172864ad118b2247
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
index fc935c0..d91b0be 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
@@ -39,4 +39,25 @@
      */
     void setRule(ApplicationId appId, DeviceId deviceId, int groupId,
                  Type type, List<GroupBucket> buckets, boolean install);
+
+    /**
+     * Checks whether has the group in store with given device ID and group ID.
+     *
+     * @param deviceId      device ID
+     * @param groupId       group ID
+     * @return true if the group exists, false otherwise
+     */
+    boolean hasGroup(DeviceId deviceId, int groupId);
+
+    /**
+     * Configures buckets to the existing group.
+     *
+     * @param appId         application ID
+     * @param deviceId      device ID
+     * @param groupId       group ID
+     * @param buckets       a list of group buckets
+     * @param install       true for buckets addition, false for buckets removal
+     */
+    void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
+                    List<GroupBucket> buckets, boolean install);
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
index 1658a35..477ec77 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
@@ -20,7 +20,11 @@
 import org.onosproject.cli.AbstractShellCommand;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
 import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupService;
 
 import static java.lang.Thread.sleep;
 import static java.util.stream.StreamSupport.stream;
@@ -40,7 +44,9 @@
     @Override
     protected void doExecute() {
         FlowRuleService flowRuleService = get(FlowRuleService.class);
+        GroupService groupService = get(GroupService.class);
         CoreService coreService = get(CoreService.class);
+        K8sNodeService k8sNodeService = get(K8sNodeService.class);
         ApplicationId appId = coreService.getAppId(K8S_NETWORKING_APP_ID);
 
         if (appId == null) {
@@ -80,6 +86,12 @@
             }
         }
 
+        for (K8sNode node : k8sNodeService.completeNodes()) {
+            for (Group group : groupService.getGroups(node.intgBridge(), appId)) {
+                groupService.removeGroup(node.intgBridge(), group.appCookie(), appId);
+            }
+        }
+
         if (result) {
             print("Successfully purged flow rules!");
         } else {
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
index fe16346..138555b 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sPodStore.java
@@ -16,6 +16,7 @@
 package org.onosproject.k8snetworking.impl;
 
 import com.google.common.collect.ImmutableSet;
+import io.fabric8.kubernetes.api.model.Affinity;
 import io.fabric8.kubernetes.api.model.Capabilities;
 import io.fabric8.kubernetes.api.model.ConfigMapEnvSource;
 import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
@@ -37,16 +38,26 @@
 import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
 import io.fabric8.kubernetes.api.model.IntOrString;
 import io.fabric8.kubernetes.api.model.KeyToPath;
+import io.fabric8.kubernetes.api.model.LabelSelector;
+import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
 import io.fabric8.kubernetes.api.model.Lifecycle;
+import io.fabric8.kubernetes.api.model.NodeAffinity;
+import io.fabric8.kubernetes.api.model.NodeSelector;
+import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
+import io.fabric8.kubernetes.api.model.NodeSelectorTerm;
 import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.OwnerReference;
 import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSource;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodAffinity;
+import io.fabric8.kubernetes.api.model.PodAffinityTerm;
+import io.fabric8.kubernetes.api.model.PodAntiAffinity;
 import io.fabric8.kubernetes.api.model.PodCondition;
 import io.fabric8.kubernetes.api.model.PodSecurityContext;
 import io.fabric8.kubernetes.api.model.PodSpec;
 import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.PreferredSchedulingTerm;
 import io.fabric8.kubernetes.api.model.Probe;
 import io.fabric8.kubernetes.api.model.Quantity;
 import io.fabric8.kubernetes.api.model.ResourceFieldSelector;
@@ -61,6 +72,7 @@
 import io.fabric8.kubernetes.api.model.Volume;
 import io.fabric8.kubernetes.api.model.VolumeDevice;
 import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.WeightedPodAffinityTerm;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -157,6 +169,18 @@
             .register(ConfigMapVolumeSource.class)
             .register(KeyToPath.class)
             .register(HostPathVolumeSource.class)
+            .register(Affinity.class)
+            .register(NodeAffinity.class)
+            .register(NodeSelector.class)
+            .register(NodeSelectorTerm.class)
+            .register(NodeSelectorRequirement.class)
+            .register(PreferredSchedulingTerm.class)
+            .register(PodAffinity.class)
+            .register(WeightedPodAffinityTerm.class)
+            .register(PodAffinityTerm.class)
+            .register(LabelSelector.class)
+            .register(LabelSelectorRequirement.class)
+            .register(PodAntiAffinity.class)
             .register(LinkedHashMap.class)
             .register(Collection.class)
             .build();
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsManager.java
index de594aa..ac9e2c0 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsManager.java
@@ -108,7 +108,7 @@
 
         k8sEndpointsStore.updateEndpoints(endpoints);
 
-        log.info(String.format(MSG_ENDPOINTS, endpoints.getMetadata().getName(), MSG_UPDATED));
+        log.debug(String.format(MSG_ENDPOINTS, endpoints.getMetadata().getName(), MSG_UPDATED));
     }
 
     @Override
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
index 683b070..ecc35fe 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
@@ -143,9 +143,7 @@
                     eventExecutor.execute(() -> processAddition(endpoints));
                     break;
                 case MODIFIED:
-                    // FIXME: there are too frequent endpoints update events
-                    // issued from kubernetes API server, we disable update for now
-                    // eventExecutor.execute(() -> processModification(endpoints));
+                    eventExecutor.execute(() -> processModification(endpoints));
                     break;
                 case DELETED:
                     eventExecutor.execute(() -> processDeletion(endpoints));
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
index aa2b90f..51dc9c8 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
@@ -70,13 +70,31 @@
     @Override
     public void setRule(ApplicationId appId, DeviceId deviceId, int groupId,
                         Type type, List<GroupBucket> buckets, boolean install) {
-        GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
-                type, new GroupBuckets(buckets), getGroupKey(groupId), groupId, appId);
 
         if (install) {
+            GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
+                    type, new GroupBuckets(buckets), getGroupKey(groupId), groupId, appId);
             groupService.addGroup(groupDesc);
         } else {
             groupService.removeGroup(deviceId, getGroupKey(groupId), appId);
         }
     }
+
+    @Override
+    public boolean hasGroup(DeviceId deviceId, int groupId) {
+        return groupService.getGroup(deviceId, getGroupKey(groupId)) != null;
+    }
+
+    @Override
+    public void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
+                           List<GroupBucket> buckets, boolean install) {
+
+        if (install) {
+            groupService.addBucketsToGroup(deviceId, getGroupKey(groupId),
+                    new GroupBuckets(buckets), getGroupKey(groupId), appId);
+        } else {
+            groupService.removeBucketsFromGroup(deviceId, getGroupKey(groupId),
+                    new GroupBuckets(buckets), getGroupKey(groupId), appId);
+        }
+    }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index f2e56e0..791cbc9 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -21,6 +21,7 @@
 import io.fabric8.kubernetes.api.model.EndpointPort;
 import io.fabric8.kubernetes.api.model.EndpointSubset;
 import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServicePort;
 import org.onlab.packet.Ethernet;
@@ -40,6 +41,9 @@
 import org.onosproject.k8snetworking.api.K8sFlowRuleService;
 import org.onosproject.k8snetworking.api.K8sGroupRuleService;
 import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPodEvent;
+import org.onosproject.k8snetworking.api.K8sPodListener;
+import org.onosproject.k8snetworking.api.K8sPodService;
 import org.onosproject.k8snetworking.api.K8sServiceEvent;
 import org.onosproject.k8snetworking.api.K8sServiceListener;
 import org.onosproject.k8snetworking.api.K8sServiceService;
@@ -59,10 +63,7 @@
 import org.onosproject.net.flow.criteria.ExtensionSelector;
 import org.onosproject.net.flow.instructions.ExtensionTreatment;
 import org.onosproject.net.group.GroupBucket;
-import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AtomicCounter;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -133,6 +134,8 @@
 
     private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
 
+    private static final String IP_ADDRESS = "ipAddress";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
@@ -172,6 +175,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected K8sServiceService k8sServiceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sPodService k8sPodService;
+
     /** Service IP address translation mode. */
     private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
 
@@ -181,12 +187,11 @@
             new InternalNodeEventListener();
     private final InternalK8sServiceListener internalK8sServiceListener =
             new InternalK8sServiceListener();
+    private final InternalK8sPodListener internalK8sPodListener =
+            new InternalK8sPodListener();
 
     private AtomicCounter groupIdCounter;
 
-    // service IP ports has following format IP_PORT_PROTO
-    private ConsistentMap<String, Integer> servicePortGroupIdMap;
-
     private ApplicationId appId;
     private NodeId localNodeId;
 
@@ -198,21 +203,17 @@
         leadershipService.runForLeadership(appId.name());
         k8sNodeService.addListener(internalNodeEventListener);
         k8sServiceService.addListener(internalK8sServiceListener);
+        k8sPodService.addListener(internalK8sPodListener);
 
         groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
 
-        servicePortGroupIdMap = storageService.<String, Integer>consistentMapBuilder()
-                .withName("k8s-service-ip-port-set")
-                .withSerializer(Serializer.using(KryoNamespaces.API))
-                .withApplicationId(appId)
-                .build();
-
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
         leadershipService.withdraw(appId.name());
+        k8sPodService.removeListener(internalK8sPodListener);
         k8sNodeService.removeListener(internalNodeEventListener);
         k8sServiceService.removeListener(internalK8sServiceListener);
         configService.unregisterProperties(getClass(), false);
@@ -345,8 +346,65 @@
         return map;
     }
 
-    private void setStatelessGroupFlowRules(DeviceId deviceId, Service service,
-                                            boolean install) {
+    private void setGroupBuckets(DeviceId deviceId, Service service, Pod pod, boolean install) {
+
+        if (pod.getMetadata().getAnnotations() == null) {
+            return;
+        }
+
+        String podIpStr = pod.getMetadata().getAnnotations().get(IP_ADDRESS);
+
+        Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
+        Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
+
+        spEpasMap.forEach((sp, epas) -> {
+            List<GroupBucket> bkts = Lists.newArrayList();
+
+            if (install) {
+                if (epas.contains(podIpStr)) {
+                    bkts = buildBuckets(deviceId, podIpStr, sp);
+                }
+            } else {
+                bkts = buildBuckets(deviceId, podIpStr, sp);
+            }
+
+            spGrpBkts.put(sp, bkts);
+        });
+
+        String serviceIp = service.getSpec().getClusterIP();
+        spGrpBkts.forEach((sp, bkts) -> {
+            String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
+            int groupId = svcStr.hashCode();
+
+            k8sGroupRuleService.setBuckets(appId, deviceId, groupId, bkts, install);
+        });
+    }
+
+    private List<GroupBucket> buildBuckets(DeviceId deviceId,
+                                           String podIpStr,
+                                           ServicePort sp) {
+        List<GroupBucket> bkts = Lists.newArrayList();
+
+        ExtensionTreatment resubmitTreatment = buildResubmitExtension(
+                deviceService.getDevice(deviceId), ROUTING_TABLE);
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+                .setIpDst(IpAddress.valueOf(podIpStr))
+                .extension(resubmitTreatment, deviceId);
+
+        if (TCP.equals(sp.getProtocol())) {
+            tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
+        } else if (UDP.equals(sp.getProtocol())) {
+            tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
+        }
+
+        bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
+
+        return bkts;
+    }
+
+    private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
+                                                         Service service,
+                                                         boolean install) {
         Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
         Map<String, String> nodeIpGatewayIpMap =
                 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
@@ -376,23 +434,36 @@
         String serviceIp = service.getSpec().getClusterIP();
         spGrpBkts.forEach((sp, bkts) -> {
             String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
-            int groupId;
+            int groupId = svcStr.hashCode();
 
-            if (servicePortGroupIdMap.asJavaMap().containsKey(svcStr)) {
-                groupId = servicePortGroupIdMap.asJavaMap().get(svcStr);
+            if (install) {
+
+                // add group table rules
+                k8sGroupRuleService.setRule(appId, deviceId, groupId,
+                        SELECT, bkts, true);
+
+                log.info("Adding group rule {}", groupId);
+
+                // if we failed to add group rule, we will not install flow rules
+                // as this might cause rule inconsistency
+                if (k8sGroupRuleService.hasGroup(deviceId, groupId)) {
+                    // add flow rules for shifting IP domain
+                    setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
+                            PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+                            sp.getProtocol(), true);
+                }
             } else {
-                groupId = (int) groupIdCounter.incrementAndGet();
-                servicePortGroupIdMap.put(svcStr, groupId);
+                // remove flow rules for shifting IP domain
+                setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
+                        PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+                        sp.getProtocol(), false);
+
+                // remove group table rules
+                k8sGroupRuleService.setRule(appId, deviceId, groupId,
+                        SELECT, bkts, false);
+
+                log.info("Removing group rule {}", groupId);
             }
-
-            // add group table rules
-            k8sGroupRuleService.setRule(appId, deviceId, groupId,
-                    SELECT, bkts, install);
-
-            // add flow rules for shifting IP domain
-            setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
-                    PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
-                    sp.getProtocol(), install);
         });
 
         spEpasMap.forEach((sp, epas) ->
@@ -639,6 +710,16 @@
         log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
     }
 
+    private void setServiceNatRules(DeviceId deviceId, boolean install) {
+        if (NAT_STATEFUL.equals(serviceIpNatMode)) {
+            setStatefulServiceNatRules(deviceId, install);
+        } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
+            setStatelessServiceNatRules(deviceId, install);
+        } else {
+            log.warn("Service IP NAT mode was not configured!");
+        }
+    }
+
     private class InternalK8sServiceListener implements K8sServiceListener {
 
         private boolean isRelevantHelper() {
@@ -649,6 +730,7 @@
         public void event(K8sServiceEvent event) {
             switch (event.type()) {
                 case K8S_SERVICE_CREATED:
+                case K8S_SERVICE_UPDATED:
                     eventExecutor.execute(() -> processServiceCreation(event.subject()));
                     break;
                 case K8S_SERVICE_REMOVED:
@@ -697,6 +779,57 @@
         }
     }
 
+    private class InternalK8sPodListener implements K8sPodListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sPodEvent event) {
+            switch (event.type()) {
+                case K8S_POD_UPDATED:
+                    eventExecutor.execute(() -> processPodUpdate(event.subject()));
+                    break;
+                case K8S_POD_REMOVED:
+                    eventExecutor.execute(() -> processPodRemoval(event.subject()));
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processPodUpdate(Pod pod) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setServiceRuleFromPod(pod, true);
+        }
+
+        private void processPodRemoval(Pod pod) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setServiceRuleFromPod(pod, false);
+        }
+
+        private void setServiceRuleFromPod(Pod pod, boolean install) {
+            k8sServiceService.services().forEach(s -> {
+                pod.getMetadata().getLabels().forEach((pk, pv) -> {
+                    Map<String, String> selectors = s.getSpec().getSelector();
+                    if (selectors != null && selectors.containsKey(pk)) {
+                        if (pv.equals(selectors.get(pk))) {
+                            k8sNodeService.completeNodes().forEach(n ->
+                                setGroupBuckets(n.intgBridge(), s, pod, install));
+                        }
+                    }
+                });
+            });
+        }
+    }
+
     private class InternalNodeEventListener implements K8sNodeListener {
 
         private boolean isRelevantHelper() {
@@ -711,8 +844,7 @@
                     eventExecutor.execute(() -> processNodeCompletion(k8sNode));
                     break;
                 case K8S_NODE_INCOMPLETE:
-                    eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
-                    break;
+                case K8S_NODE_REMOVED:
                 default:
                     break;
             }
@@ -723,28 +855,7 @@
                 return;
             }
 
-            if (NAT_STATEFUL.equals(serviceIpNatMode)) {
-                setStatefulServiceNatRules(node.intgBridge(), true);
-            } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
-                setStatelessServiceNatRules(node.intgBridge(), true);
-            } else {
-                log.warn("Service IP NAT mode was not configured!");
-            }
-
-        }
-
-        private void processNodeIncompletion(K8sNode node) {
-            if (!isRelevantHelper()) {
-                return;
-            }
-
-            if (NAT_STATEFUL.equals(serviceIpNatMode)) {
-                setStatefulServiceNatRules(node.intgBridge(), false);
-            } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
-                setStatelessServiceNatRules(node.intgBridge(), false);
-            } else {
-                log.warn("Service IP NAT mode was not configured!");
-            }
+            setServiceNatRules(node.intgBridge(), true);
         }
     }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
index 1dec9d1..7510aaf 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
@@ -68,6 +68,7 @@
 import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
 import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
 import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
+import static org.onosproject.k8snode.api.K8sNodeState.INIT;
 import static org.onosproject.net.AnnotationKeys.PORT_NAME;
 
 /**
@@ -230,6 +231,31 @@
         Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
 
         hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
+
+        K8sPort k8sPort = portToK8sPort(port);
+
+        if (k8sPort == null) {
+            log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
+            return;
+        }
+
+        k8sNetworkService.removePort(k8sPort.portId());
+    }
+
+    /**
+     * Process port inactivate event.
+     *
+     * @param port ONOS port
+     */
+    private void processPortInactivated(Port port) {
+        K8sPort k8sPort = portToK8sPort(port);
+
+        if (k8sPort == null) {
+            log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
+            return;
+        }
+
+        k8sNetworkService.updatePort(k8sPort.updateState(K8sPort.State.INACTIVE));
     }
 
     /**
@@ -349,14 +375,16 @@
 
             switch (event.type()) {
                 case K8S_NODE_COMPLETE:
-                    executor.execute(() -> processCompleteNode(event, event.subject()));
+                    executor.execute(() -> processCompleteNode(event, k8sNode));
                     break;
-                case K8S_NODE_INCOMPLETE:
-                    log.warn("{} is changed to INCOMPLETE state", k8sNode);
+                case K8S_NODE_UPDATED:
+                    if (k8sNode.state() == INIT) {
+                        executor.execute(() -> processIncompleteNode(event, k8sNode));
+                    }
                     break;
                 case K8S_NODE_CREATED:
-                case K8S_NODE_UPDATED:
                 case K8S_NODE_REMOVED:
+                case K8S_NODE_INCOMPLETE:
                 default:
                     break;
             }
@@ -388,5 +416,23 @@
                         hostProviderService.hostVanished(host.id());
                     });
         }
+
+        private void processIncompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
+            if (!isRelevantHelper(event)) {
+                return;
+            }
+
+            log.info("INIT node {} is detected", k8sNode.hostname());
+
+            deviceService.getPorts(k8sNode.intgBridge()).stream()
+                    .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
+                    .filter(Port::isEnabled)
+                    .forEach(port -> {
+                        log.debug("Container port {} is detected from {}",
+                                port.annotations().value(PORT_NAME),
+                                k8sNode.hostname());
+                        processPortInactivated(port);
+                    });
+        }
     }
 }