Fix: register a set of Affinity classes into distributed store
1. Purge group rules when issue k8s purge rules command.
2. Remove k8s port when remove OVS port.
3. Re-install group rules during sync rules.
4. Install: 1) install group rule; 2) install flow rule
Uninstall: 1) uninstall flow rule; 2) uninstall group rule
5. Add/remove group buckets when receiving POD update/remove
events.
6. Lower down the endpoint update logging level
Change-Id: Ib50e359a9b2c0cd9cb1490c6172864ad118b2247
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
index 1dec9d1..7510aaf 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
@@ -68,6 +68,7 @@
import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.isContainer;
+import static org.onosproject.k8snode.api.K8sNodeState.INIT;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
/**
@@ -230,6 +231,31 @@
Set<Host> hosts = hostService.getConnectedHosts(connectPoint);
hosts.forEach(h -> hostProviderService.hostVanished(h.id()));
+
+ K8sPort k8sPort = portToK8sPort(port);
+
+ if (k8sPort == null) {
+ log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
+ return;
+ }
+
+ k8sNetworkService.removePort(k8sPort.portId());
+ }
+
+ /**
+ * Process port inactivate event.
+ *
+ * @param port ONOS port
+ */
+ private void processPortInactivated(Port port) {
+ K8sPort k8sPort = portToK8sPort(port);
+
+ if (k8sPort == null) {
+ log.warn(ERR_ADD_HOST + "Kubernetes port for {} not found", port);
+ return;
+ }
+
+ k8sNetworkService.updatePort(k8sPort.updateState(K8sPort.State.INACTIVE));
}
/**
@@ -349,14 +375,16 @@
switch (event.type()) {
case K8S_NODE_COMPLETE:
- executor.execute(() -> processCompleteNode(event, event.subject()));
+ executor.execute(() -> processCompleteNode(event, k8sNode));
break;
- case K8S_NODE_INCOMPLETE:
- log.warn("{} is changed to INCOMPLETE state", k8sNode);
+ case K8S_NODE_UPDATED:
+ if (k8sNode.state() == INIT) {
+ executor.execute(() -> processIncompleteNode(event, k8sNode));
+ }
break;
case K8S_NODE_CREATED:
- case K8S_NODE_UPDATED:
case K8S_NODE_REMOVED:
+ case K8S_NODE_INCOMPLETE:
default:
break;
}
@@ -388,5 +416,23 @@
hostProviderService.hostVanished(host.id());
});
}
+
+ private void processIncompleteNode(K8sNodeEvent event, K8sNode k8sNode) {
+ if (!isRelevantHelper(event)) {
+ return;
+ }
+
+ log.info("INIT node {} is detected", k8sNode.hostname());
+
+ deviceService.getPorts(k8sNode.intgBridge()).stream()
+ .filter(port -> isContainer(port.annotations().value(PORT_NAME)))
+ .filter(Port::isEnabled)
+ .forEach(port -> {
+ log.debug("Container port {} is detected from {}",
+ port.annotations().value(PORT_NAME),
+ k8sNode.hostname());
+ processPortInactivated(port);
+ });
+ }
}
}