Add a kubernetes port and pod mapper with sync/purge rules CLIs
Change-Id: I89ed29e4357b84345f95fddf81ab7156715d7c82
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
index 371a34a..683b070 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
@@ -25,7 +25,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.mastership.MastershipService;
import org.osgi.service.component.annotations.Activate;
@@ -74,7 +75,8 @@
groupedThreads(this.getClass().getSimpleName(), "event-handler"));
private final Watcher<Endpoints>
internalEndpointsWatcher = new InternalK8sEndpointsWatcher();
-
+ private final InternalK8sApiConfigListener
+ internalK8sApiConfigListener = new InternalK8sApiConfigListener();
private ApplicationId appId;
private NodeId localNodeId;
@@ -84,36 +86,52 @@
appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
-
- initWatcher();
+ k8sApiConfigService.addListener(internalK8sApiConfigListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ k8sApiConfigService.removeListener(internalK8sApiConfigListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
- private void initWatcher() {
- K8sApiConfig config =
- k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
- if (config == null) {
- log.error("Failed to find valid kubernetes API configuration.");
- return;
+ private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
- KubernetesClient client = k8sClient(config);
+ @Override
+ public void event(K8sApiConfigEvent event) {
- if (client == null) {
- log.error("Failed to connect to kubernetes API server.");
- return;
+ switch (event.type()) {
+ case K8S_API_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdating);
+ break;
+ case K8S_API_CONFIG_CREATED:
+ case K8S_API_CONFIG_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
}
- client.endpoints().watch(internalEndpointsWatcher);
+ private void processConfigUpdating() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubernetesClient client = k8sClient(k8sApiConfigService);
+
+ if (client != null) {
+ client.endpoints().watch(internalEndpointsWatcher);
+ }
+ }
}
private class InternalK8sEndpointsWatcher implements Watcher<Endpoints> {
@@ -125,7 +143,9 @@
eventExecutor.execute(() -> processAddition(endpoints));
break;
case MODIFIED:
- eventExecutor.execute(() -> processModification(endpoints));
+ // FIXME: there are too frequent endpoints update events
+ // issued from kubernetes API server, we disable update for now
+ // eventExecutor.execute(() -> processModification(endpoints));
break;
case DELETED:
eventExecutor.execute(() -> processDeletion(endpoints));
@@ -149,7 +169,7 @@
return;
}
- log.info("Process endpoints {} creating event from API server.",
+ log.trace("Process endpoints {} creating event from API server.",
endpoints.getMetadata().getName());
k8sEndpointsAdminService.createEndpoints(endpoints);
@@ -160,10 +180,13 @@
return;
}
- log.info("Process endpoints {} updating event from API server.",
+ log.trace("Process endpoints {} updating event from API server.",
endpoints.getMetadata().getName());
- k8sEndpointsAdminService.updateEndpoints(endpoints);
+ if (k8sEndpointsAdminService.endpoints(
+ endpoints.getMetadata().getUid()) != null) {
+ k8sEndpointsAdminService.updateEndpoints(endpoints);
+ }
}
private void processDeletion(Endpoints endpoints) {
@@ -171,7 +194,7 @@
return;
}
- log.info("Process endpoints {} removal event from API server.",
+ log.trace("Process endpoints {} removal event from API server.",
endpoints.getMetadata().getName());
k8sEndpointsAdminService.removeEndpoints(endpoints.getMetadata().getUid());