Add a kubernetes port and pod mapper with sync/purge rules CLIs
Change-Id: I89ed29e4357b84345f95fddf81ab7156715d7c82
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
index f00de40..a5c15f4 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
@@ -25,7 +25,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snetworking.api.K8sPodAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.mastership.MastershipService;
import org.osgi.service.component.annotations.Activate;
@@ -73,6 +74,8 @@
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler"));
private final Watcher<Pod> internalK8sPodWatcher = new InternalK8sPodWatcher();
+ private final InternalK8sApiConfigListener
+ internalK8sApiConfigListener = new InternalK8sApiConfigListener();
private ApplicationId appId;
private NodeId localNodeId;
@@ -82,36 +85,52 @@
appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
-
- initWatcher();
+ k8sApiConfigService.addListener(internalK8sApiConfigListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ k8sApiConfigService.removeListener(internalK8sApiConfigListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
- private void initWatcher() {
- K8sApiConfig config =
- k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
- if (config == null) {
- log.error("Failed to find valid kubernetes API configuration.");
- return;
+ private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
- KubernetesClient client = k8sClient(config);
+ @Override
+ public void event(K8sApiConfigEvent event) {
- if (client == null) {
- log.error("Failed to connect to kubernetes API server.");
- return;
+ switch (event.type()) {
+ case K8S_API_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdating);
+ break;
+ case K8S_API_CONFIG_CREATED:
+ case K8S_API_CONFIG_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
}
- client.pods().watch(internalK8sPodWatcher);
+ private void processConfigUpdating() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubernetesClient client = k8sClient(k8sApiConfigService);
+
+ if (client != null) {
+ client.pods().watch(internalK8sPodWatcher);
+ }
+ }
}
private class InternalK8sPodWatcher implements Watcher<Pod> {
@@ -146,7 +165,7 @@
return;
}
- log.info("Process pod {} creating event from API server.",
+ log.trace("Process pod {} creating event from API server.",
pod.getMetadata().getName());
k8sPodAdminService.createPod(pod);
@@ -157,10 +176,12 @@
return;
}
- log.info("Process pod {} updating event from API server.",
+ log.trace("Process pod {} updating event from API server.",
pod.getMetadata().getName());
- k8sPodAdminService.updatePod(pod);
+ if (k8sPodAdminService.pod(pod.getMetadata().getUid()) != null) {
+ k8sPodAdminService.updatePod(pod);
+ }
}
private void processDeletion(Pod pod) {
@@ -168,7 +189,7 @@
return;
}
- log.info("Process pod {} removal event from API server.",
+ log.trace("Process pod {} removal event from API server.",
pod.getMetadata().getName());
k8sPodAdminService.removePod(pod.getMetadata().getUid());