Fix: register a set of Affinity classes into distributed store

1. Purge group rules when issue k8s purge rules command.
2. Remove k8s port when remove OVS port.
3. Re-install group rules during sync rules.
4. Install: 1) install group rule; 2) install flow rule
   Uninstall: 1) uninstall flow rule; 2) uninstall group rule
5. Add/remove group buckets when receiving POD update/remove
   events.
6. Lower down the endpoint update logging level

Change-Id: Ib50e359a9b2c0cd9cb1490c6172864ad118b2247
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
index 1dec9d1..7510aaf 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
@@ -68,6 +68,7 @@
 import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
 import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
 import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
+import static org.onosproject.k8snode.api.K8sNodeState.INIT;
 import static org.onosproject.net.AnnotationKeys.PORT_NAME;
 
 /**
@@ -230,6 +231,31 @@
         Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
 
         hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
+
+        K8sPort k8sPort = portToK8sPort(port);
+
+        if (k8sPort == null) {
+            log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
+            return;
+        }
+
+        k8sNetworkService.removePort(k8sPort.portId());
+    }
+
+    /**
+     * Process port inactivate event.
+     *
+     * @param port ONOS port
+     */
+    private void processPortInactivated(Port port) {
+        K8sPort k8sPort = portToK8sPort(port);
+
+        if (k8sPort == null) {
+            log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
+            return;
+        }
+
+        k8sNetworkService.updatePort(k8sPort.updateState(K8sPort.State.INACTIVE));
     }
 
     /**
@@ -349,14 +375,16 @@
 
             switch (event.type()) {
                 case K8S_NODE_COMPLETE:
-                    executor.execute(() -> processCompleteNode(event, event.subject()));
+                    executor.execute(() -> processCompleteNode(event, k8sNode));
                     break;
-                case K8S_NODE_INCOMPLETE:
-                    log.warn("{} is changed to INCOMPLETE state", k8sNode);
+                case K8S_NODE_UPDATED:
+                    if (k8sNode.state() == INIT) {
+                        executor.execute(() -> processIncompleteNode(event, k8sNode));
+                    }
                     break;
                 case K8S_NODE_CREATED:
-                case K8S_NODE_UPDATED:
                 case K8S_NODE_REMOVED:
+                case K8S_NODE_INCOMPLETE:
                 default:
                     break;
             }
@@ -388,5 +416,23 @@
                         hostProviderService.hostVanished(host.id());
                     });
         }
+
+        private void processIncompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
+            if (!isRelevantHelper(event)) {
+                return;
+            }
+
+            log.info("INIT node {} is detected", k8sNode.hostname());
+
+            deviceService.getPorts(k8sNode.intgBridge()).stream()
+                    .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
+                    .filter(Port::isEnabled)
+                    .forEach(port -> {
+                        log.debug("Container port {} is detected from {}",
+                                port.annotations().value(PORT_NAME),
+                                k8sNode.hostname());
+                        processPortInactivated(port);
+                    });
+        }
     }
 }