Add a kubernetes port and pod mapper with sync/purge rules CLIs
Change-Id: I89ed29e4357b84345f95fddf81ab7156715d7c82
diff --git a/apps/k8s-networking/app/BUILD b/apps/k8s-networking/app/BUILD
index 515cf0d..725e27d 100644
--- a/apps/k8s-networking/app/BUILD
+++ b/apps/k8s-networking/app/BUILD
@@ -7,6 +7,13 @@
"@commons_net//jar",
"@k8s_client//jar",
"@k8s_model//jar",
+ "@okhttp//jar",
+ "@okio//jar",
+ "@logging_interceptor//jar",
+ "@jackson_dataformat_yaml//jar",
+ "@snakeyaml//jar",
+ "@zjsonpatch//jar",
+ "@validation_api//jar",
]
TEST_DEPS = TEST_ADAPTERS + TEST_REST + [
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
new file mode 100644
index 0000000..1658a35
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeRulesCommand.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.flow.FlowRuleService;
+
+import static java.lang.Thread.sleep;
+import static java.util.stream.StreamSupport.stream;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+
+/**
+ * Purges all existing flow rules installed for kubernetes networks.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-purge-rules",
+ description = "Purges all flow rules installed by kubernetes networking app")
+public class K8sPurgeRulesCommand extends AbstractShellCommand {
+
+ private static final long TIMEOUT_MS = 10000; // we wait 10s
+ private static final long SLEEP_MS = 2000; // we wait 2s for init each node
+
+ @Override
+ protected void doExecute() {
+ FlowRuleService flowRuleService = get(FlowRuleService.class);
+ CoreService coreService = get(CoreService.class);
+ ApplicationId appId = coreService.getAppId(K8S_NETWORKING_APP_ID);
+
+ if (appId == null) {
+ error("Failed to purge kubernetes networking flow rules.");
+ return;
+ }
+
+ flowRuleService.removeFlowRulesById(appId);
+ print("Successfully purged flow rules installed by kubernetes networking app.");
+
+ boolean result = true;
+ long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
+
+ // we make sure all flow rules are removed from the store
+ while (stream(flowRuleService.getFlowEntriesById(appId)
+ .spliterator(), false).count() > 0) {
+
+ long waitMs = timeoutExpiredMs - System.currentTimeMillis();
+
+ try {
+ sleep(SLEEP_MS);
+ } catch (InterruptedException e) {
+ log.error("Exception caused during rule purging...");
+ }
+
+ if (stream(flowRuleService.getFlowEntriesById(appId)
+ .spliterator(), false).count() == 0) {
+ break;
+ } else {
+ flowRuleService.removeFlowRulesById(appId);
+ print("Failed to purging flow rules, retrying rule purging...");
+ }
+
+ if (waitMs <= 0) {
+ result = false;
+ break;
+ }
+ }
+
+ if (result) {
+ print("Successfully purged flow rules!");
+ } else {
+ error("Failed to purge flow rules.");
+ }
+ }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeState.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeStateCommand.java
similarity index 88%
rename from apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeState.java
rename to apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeStateCommand.java
index ccff90a..7b87d7e 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeState.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPurgeStateCommand.java
@@ -19,6 +19,7 @@
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
+import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
import org.onosproject.k8snetworking.api.K8sPodAdminService;
import org.onosproject.k8snetworking.api.K8sServiceAdminService;
@@ -28,10 +29,11 @@
@Service
@Command(scope = "onos", name = "k8s-purge-states",
description = "Purges all kubernetes states")
-public class K8sPurgeState extends AbstractShellCommand {
+public class K8sPurgeStateCommand extends AbstractShellCommand {
@Override
protected void doExecute() {
get(K8sPodAdminService.class).clear();
+ get(K8sNetworkAdminService.class).clear();
get(K8sEndpointsAdminService.class).clear();
get(K8sServiceAdminService.class).clear();
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncRulesCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncRulesCommand.java
new file mode 100644
index 0000000..097292c
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncRulesCommand.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeAdminService;
+
+import static java.lang.Thread.sleep;
+import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
+import static org.onosproject.k8snode.api.K8sNodeState.INIT;
+
+/**
+ * Synchronizes flow rules to provide connectivity for kubernetes pods.
+ */
+@Service
+@Command(scope = "onos", name = "k8s-sync-rules",
+ description = "Synchronizes all kubernetes flow rules")
+public class K8sSyncRulesCommand extends AbstractShellCommand {
+
+ private static final long SLEEP_MS = 3000; // we wait 3s for init each node
+ private static final long TIMEOUT_MS = 10000; // we wait 10s
+
+ private static final String SUCCESS_MSG = "Successfully synchronize flow rules for node %s!";
+ private static final String FAIL_MSG = "Failed to synchronize flow rules for node %s.";
+
+ @Override
+ protected void doExecute() {
+
+ K8sNodeAdminService adminService = get(K8sNodeAdminService.class);
+ if (adminService == null) {
+ error("Failed to re-install flow rules for kubernetes networking.");
+ return;
+ }
+
+ adminService.completeNodes().forEach(node ->
+ syncRulesBaseForNode(adminService, node));
+
+ print("Successfully requested re-installing flow rules.");
+ }
+
+ private void syncRulesBaseForNode(K8sNodeAdminService adminService,
+ K8sNode k8sNode) {
+ K8sNode updated = k8sNode.updateState(INIT);
+ adminService.updateNode(updated);
+
+ boolean result = true;
+ long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
+
+ while (adminService.node(k8sNode.hostname()).state() != COMPLETE) {
+
+ long waitMs = timeoutExpiredMs - System.currentTimeMillis();
+
+ try {
+ sleep(SLEEP_MS);
+ } catch (InterruptedException e) {
+ error("Exception caused during node synchronization...");
+ }
+
+ if (adminService.node(k8sNode.hostname()).state() == COMPLETE) {
+ break;
+ } else {
+ adminService.updateNode(updated);
+ print("Failed to synchronize flow rules, retrying...");
+ }
+
+ if (waitMs <= 0) {
+ result = false;
+ break;
+ }
+ }
+
+ if (result) {
+ print(SUCCESS_MSG, k8sNode.hostname());
+ } else {
+ error(FAIL_MSG, k8sNode.hostname());
+ }
+ }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncStateCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncStateCommand.java
index 1592755..170425c 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncStateCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sSyncStateCommand.java
@@ -21,15 +21,25 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.k8snetworking.api.DefaultK8sPort;
import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
+import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
import org.onosproject.k8snetworking.api.K8sPodAdminService;
+import org.onosproject.k8snetworking.api.K8sPort;
import org.onosproject.k8snetworking.api.K8sServiceAdminService;
import org.onosproject.k8snetworking.util.K8sNetworkingUtil;
import org.onosproject.k8snode.api.K8sApiConfig;
import org.onosproject.k8snode.api.K8sApiConfigService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
import java.util.List;
+import java.util.Map;
+
+import static org.onosproject.k8snetworking.api.K8sPort.State.INACTIVE;
/**
* Synchronizes kubernetes states.
@@ -43,6 +53,13 @@
private static final String SERVICE_FORMAT = "%-50s%-30s%-30s";
private static final String ENDPOINTS_FORMAT = "%-50s%-50s%-20s";
+ private static final String PORT_ID = "portId";
+ private static final String DEVICE_ID = "deviceId";
+ private static final String PORT_NUMBER = "portNumber";
+ private static final String IP_ADDRESS = "ipAddress";
+ private static final String MAC_ADDRESS = "macAddress";
+ private static final String NETWORK_ID = "networkId";
+
@Override
protected void doExecute() {
K8sApiConfigService configService = get(K8sApiConfigService.class);
@@ -51,6 +68,8 @@
get(K8sServiceAdminService.class);
K8sEndpointsAdminService endpointsAdminService =
get(K8sEndpointsAdminService.class);
+ K8sNetworkAdminService networkAdminService =
+ get(K8sNetworkAdminService.class);
K8sApiConfig config =
configService.apiConfigs().stream().findAny().orElse(null);
@@ -96,6 +115,9 @@
} else {
podAdminService.createPod(pod);
}
+
+ syncPortFromPod(pod, networkAdminService);
+
printPod(pod);
});
}
@@ -139,4 +161,36 @@
pod.getStatus().getPodIP(),
containers.isEmpty() ? "" : containers);
}
+
+ private void syncPortFromPod(Pod pod, K8sNetworkAdminService adminService) {
+ Map<String, String> annotations = pod.getMetadata().getAnnotations();
+ if (annotations != null && !annotations.isEmpty() &&
+ annotations.get(PORT_ID) != null) {
+ String portId = annotations.get(PORT_ID);
+
+ K8sPort oldPort = adminService.port(portId);
+
+ String networkId = annotations.get(NETWORK_ID);
+ DeviceId deviceId = DeviceId.deviceId(annotations.get(DEVICE_ID));
+ PortNumber portNumber = PortNumber.portNumber(annotations.get(PORT_NUMBER));
+ IpAddress ipAddress = IpAddress.valueOf(annotations.get(IP_ADDRESS));
+ MacAddress macAddress = MacAddress.valueOf(annotations.get(MAC_ADDRESS));
+
+ K8sPort newPort = DefaultK8sPort.builder()
+ .portId(portId)
+ .networkId(networkId)
+ .deviceId(deviceId)
+ .ipAddress(ipAddress)
+ .macAddress(macAddress)
+ .portNumber(portNumber)
+ .state(INACTIVE)
+ .build();
+
+ if (oldPort == null) {
+ adminService.createPort(newPort);
+ } else {
+ adminService.updatePort(newPort);
+ }
+ }
+ }
}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
index 371a34a..683b070 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sEndpointsWatcher.java
@@ -25,7 +25,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.mastership.MastershipService;
import org.osgi.service.component.annotations.Activate;
@@ -74,7 +75,8 @@
groupedThreads(this.getClass().getSimpleName(), "event-handler"));
private final Watcher<Endpoints>
internalEndpointsWatcher = new InternalK8sEndpointsWatcher();
-
+ private final InternalK8sApiConfigListener
+ internalK8sApiConfigListener = new InternalK8sApiConfigListener();
private ApplicationId appId;
private NodeId localNodeId;
@@ -84,36 +86,52 @@
appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
-
- initWatcher();
+ k8sApiConfigService.addListener(internalK8sApiConfigListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ k8sApiConfigService.removeListener(internalK8sApiConfigListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
- private void initWatcher() {
- K8sApiConfig config =
- k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
- if (config == null) {
- log.error("Failed to find valid kubernetes API configuration.");
- return;
+ private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
- KubernetesClient client = k8sClient(config);
+ @Override
+ public void event(K8sApiConfigEvent event) {
- if (client == null) {
- log.error("Failed to connect to kubernetes API server.");
- return;
+ switch (event.type()) {
+ case K8S_API_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdating);
+ break;
+ case K8S_API_CONFIG_CREATED:
+ case K8S_API_CONFIG_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
}
- client.endpoints().watch(internalEndpointsWatcher);
+ private void processConfigUpdating() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubernetesClient client = k8sClient(k8sApiConfigService);
+
+ if (client != null) {
+ client.endpoints().watch(internalEndpointsWatcher);
+ }
+ }
}
private class InternalK8sEndpointsWatcher implements Watcher<Endpoints> {
@@ -125,7 +143,9 @@
eventExecutor.execute(() -> processAddition(endpoints));
break;
case MODIFIED:
- eventExecutor.execute(() -> processModification(endpoints));
+ // FIXME: there are too frequent endpoints update events
+ // issued from kubernetes API server, we disable update for now
+ // eventExecutor.execute(() -> processModification(endpoints));
break;
case DELETED:
eventExecutor.execute(() -> processDeletion(endpoints));
@@ -149,7 +169,7 @@
return;
}
- log.info("Process endpoints {} creating event from API server.",
+ log.trace("Process endpoints {} creating event from API server.",
endpoints.getMetadata().getName());
k8sEndpointsAdminService.createEndpoints(endpoints);
@@ -160,10 +180,13 @@
return;
}
- log.info("Process endpoints {} updating event from API server.",
+ log.trace("Process endpoints {} updating event from API server.",
endpoints.getMetadata().getName());
- k8sEndpointsAdminService.updateEndpoints(endpoints);
+ if (k8sEndpointsAdminService.endpoints(
+ endpoints.getMetadata().getUid()) != null) {
+ k8sEndpointsAdminService.updateEndpoints(endpoints);
+ }
}
private void processDeletion(Endpoints endpoints) {
@@ -171,7 +194,7 @@
return;
}
- log.info("Process endpoints {} removal event from API server.",
+ log.trace("Process endpoints {} removal event from API server.",
endpoints.getMetadata().getName());
k8sEndpointsAdminService.removeEndpoints(endpoints.getMetadata().getUid());
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodPortMapper.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodPortMapper.java
new file mode 100644
index 0000000..5239ee4
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodPortMapper.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import com.google.common.collect.Maps;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
+import org.onosproject.k8snetworking.api.K8sNetworkEvent;
+import org.onosproject.k8snetworking.api.K8sNetworkListener;
+import org.onosproject.k8snetworking.api.K8sPodAdminService;
+import org.onosproject.k8snetworking.api.K8sPodEvent;
+import org.onosproject.k8snetworking.api.K8sPodListener;
+import org.onosproject.k8snetworking.api.K8sPort;
+import org.onosproject.k8snode.api.K8sApiConfigService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.k8sClient;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Associates the kubernetes container port and pod.
+ */
+@Component(immediate = true)
+public class K8sPodPortMapper {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String PORT_ID = "portId";
+ private static final String DEVICE_ID = "deviceId";
+ private static final String PORT_NUMBER = "portNumber";
+ private static final String IP_ADDRESS = "ipAddress";
+ private static final String MAC_ADDRESS = "macAddress";
+ private static final String NETWORK_ID = "networkId";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DriverService driverService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sNetworkAdminService k8sNetworkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sPodAdminService k8sPodService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sApiConfigService k8sApiConfigService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+ private final InternalK8sNetworkListener k8sNetworkListener =
+ new InternalK8sNetworkListener();
+ private final InternalK8sPodListener k8sPodListener =
+ new InternalK8sPodListener();
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ k8sNetworkService.addListener(k8sNetworkListener);
+ k8sPodService.addListener(k8sPodListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ k8sNetworkService.removeListener(k8sNetworkListener);
+ k8sPodService.removeListener(k8sPodListener);
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ private class InternalK8sPodListener implements K8sPodListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sPodEvent event) {
+ switch (event.type()) {
+ case K8S_POD_CREATED:
+ case K8S_POD_UPDATED:
+ eventExecutor.execute(() -> processPodCreation(event.subject()));
+ break;
+ case K8S_POD_REMOVED:
+ default:
+ break;
+ }
+ }
+
+ private void processPodCreation(Pod pod) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubernetesClient client = k8sClient(k8sApiConfigService);
+
+ if (client == null) {
+ return;
+ }
+
+ // if the annotations were configured, we will not update it
+ if (pod.getMetadata().getAnnotations() != null) {
+ return;
+ }
+
+ Set<K8sPort> ports = k8sNetworkService.ports();
+
+ // TODO: we assume that POD IP is unique, there might be other
+ // variable which preserves better uniqueness
+ ports.stream().filter(p -> p.ipAddress().toString()
+ .equals(pod.getStatus().getPodIP()))
+ .forEach(p -> {
+ Map<String, String> annotations = Maps.newConcurrentMap();
+ annotations.put(PORT_ID, p.portId());
+ annotations.put(NETWORK_ID, p.networkId());
+ annotations.put(DEVICE_ID, p.deviceId().toString());
+ annotations.put(PORT_NUMBER, p.portNumber().toString());
+ annotations.put(IP_ADDRESS, p.ipAddress().toString());
+ annotations.put(MAC_ADDRESS, p.macAddress().toString());
+
+ client.pods().inNamespace(pod.getMetadata().getNamespace())
+ .withName(pod.getMetadata().getName())
+ .edit()
+ .editMetadata()
+ .addToAnnotations(annotations)
+ .endMetadata().done();
+ });
+ }
+ }
+
+ private class InternalK8sNetworkListener implements K8sNetworkListener {
+
+ private boolean isRelevantHelper(K8sNetworkEvent event) {
+ return mastershipService.isLocalMaster(event.port().deviceId());
+ }
+
+ @Override
+ public void event(K8sNetworkEvent event) {
+ switch (event.type()) {
+ case K8S_PORT_UPDATED:
+ case K8S_PORT_REMOVED:
+ // no need to process port removal event...
+ default:
+ break;
+ }
+ }
+ }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
index f00de40..a5c15f4 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sPodWatcher.java
@@ -25,7 +25,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snetworking.api.K8sPodAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.mastership.MastershipService;
import org.osgi.service.component.annotations.Activate;
@@ -73,6 +74,8 @@
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler"));
private final Watcher<Pod> internalK8sPodWatcher = new InternalK8sPodWatcher();
+ private final InternalK8sApiConfigListener
+ internalK8sApiConfigListener = new InternalK8sApiConfigListener();
private ApplicationId appId;
private NodeId localNodeId;
@@ -82,36 +85,52 @@
appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
-
- initWatcher();
+ k8sApiConfigService.addListener(internalK8sApiConfigListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ k8sApiConfigService.removeListener(internalK8sApiConfigListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
- private void initWatcher() {
- K8sApiConfig config =
- k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
- if (config == null) {
- log.error("Failed to find valid kubernetes API configuration.");
- return;
+ private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
- KubernetesClient client = k8sClient(config);
+ @Override
+ public void event(K8sApiConfigEvent event) {
- if (client == null) {
- log.error("Failed to connect to kubernetes API server.");
- return;
+ switch (event.type()) {
+ case K8S_API_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdating);
+ break;
+ case K8S_API_CONFIG_CREATED:
+ case K8S_API_CONFIG_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
}
- client.pods().watch(internalK8sPodWatcher);
+ private void processConfigUpdating() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubernetesClient client = k8sClient(k8sApiConfigService);
+
+ if (client != null) {
+ client.pods().watch(internalK8sPodWatcher);
+ }
+ }
}
private class InternalK8sPodWatcher implements Watcher<Pod> {
@@ -146,7 +165,7 @@
return;
}
- log.info("Process pod {} creating event from API server.",
+ log.trace("Process pod {} creating event from API server.",
pod.getMetadata().getName());
k8sPodAdminService.createPod(pod);
@@ -157,10 +176,12 @@
return;
}
- log.info("Process pod {} updating event from API server.",
+ log.trace("Process pod {} updating event from API server.",
pod.getMetadata().getName());
- k8sPodAdminService.updatePod(pod);
+ if (k8sPodAdminService.pod(pod.getMetadata().getUid()) != null) {
+ k8sPodAdminService.updatePod(pod);
+ }
}
private void processDeletion(Pod pod) {
@@ -168,7 +189,7 @@
return;
}
- log.info("Process pod {} removal event from API server.",
+ log.trace("Process pod {} removal event from API server.",
pod.getMetadata().getName());
k8sPodAdminService.removePod(pod.getMetadata().getUid());
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceWatcher.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceWatcher.java
index 41e12c2..17f9946 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceWatcher.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceWatcher.java
@@ -25,7 +25,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.k8snetworking.api.K8sServiceAdminService;
-import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigEvent;
+import org.onosproject.k8snode.api.K8sApiConfigListener;
import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.mastership.MastershipService;
import org.osgi.service.component.annotations.Activate;
@@ -75,6 +76,8 @@
private final InternalK8sServiceWatcher
internalK8sServiceWatcher = new InternalK8sServiceWatcher();
+ private final InternalK8sApiConfigListener
+ internalK8sApiConfigListener = new InternalK8sApiConfigListener();
private ApplicationId appId;
private NodeId localNodeId;
@@ -84,36 +87,52 @@
appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
-
- initWatcher();
+ k8sApiConfigService.addListener(internalK8sApiConfigListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ k8sApiConfigService.removeListener(internalK8sApiConfigListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
- private void initWatcher() {
- K8sApiConfig config =
- k8sApiConfigService.apiConfigs().stream().findAny().orElse(null);
- if (config == null) {
- log.error("Failed to find valid kubernetes API configuration.");
- return;
+ private class InternalK8sApiConfigListener implements K8sApiConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
- KubernetesClient client = k8sClient(config);
+ @Override
+ public void event(K8sApiConfigEvent event) {
- if (client == null) {
- log.error("Failed to connect to kubernetes API server.");
- return;
+ switch (event.type()) {
+ case K8S_API_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdating);
+ break;
+ case K8S_API_CONFIG_CREATED:
+ case K8S_API_CONFIG_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
}
- client.services().watch(internalK8sServiceWatcher);
+ private void processConfigUpdating() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ KubernetesClient client = k8sClient(k8sApiConfigService);
+
+ if (client != null) {
+ client.services().watch(internalK8sServiceWatcher);
+ }
+ }
}
private class InternalK8sServiceWatcher implements Watcher<Service> {
@@ -149,7 +168,7 @@
return;
}
- log.info("Process service {} creating event from API server.",
+ log.trace("Process service {} creating event from API server.",
service.getMetadata().getName());
k8sServiceAdminService.createService(service);
@@ -160,10 +179,13 @@
return;
}
- log.info("Process service {} updating event from API server.",
+ log.trace("Process service {} updating event from API server.",
service.getMetadata().getName());
- k8sServiceAdminService.updateService(service);
+ if (k8sServiceAdminService.service(
+ service.getMetadata().getUid()) != null) {
+ k8sServiceAdminService.updateService(service);
+ }
}
private void processDeletion(Service service) {
@@ -171,7 +193,7 @@
return;
}
- log.info("Process service {} removal event from API server.",
+ log.trace("Process service {} removal event from API server.",
service.getMetadata().getName());
k8sServiceAdminService.removeService(service.getMetadata().getUid());
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
index b6c2f69..2e838e8 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
@@ -29,6 +29,7 @@
import org.onosproject.k8snetworking.api.K8sNetwork;
import org.onosproject.k8snetworking.api.K8sNetworkService;
import org.onosproject.k8snode.api.K8sApiConfig;
+import org.onosproject.k8snode.api.K8sApiConfigService;
import org.onosproject.k8snode.api.K8sNode;
import org.onosproject.net.PortNumber;
import org.slf4j.Logger;
@@ -223,4 +224,28 @@
return new DefaultKubernetesClient(configBuilder.build());
}
+
+ /**
+ * Obtains workable kubernetes client.
+ *
+ * @param service kubernetes API service
+ * @return kubernetes client
+ */
+ public static KubernetesClient k8sClient(K8sApiConfigService service) {
+ K8sApiConfig config =
+ service.apiConfigs().stream().findAny().orElse(null);
+ if (config == null) {
+ log.error("Failed to find valid kubernetes API configuration.");
+ return null;
+ }
+
+ KubernetesClient client = k8sClient(config);
+
+ if (client == null) {
+ log.error("Failed to connect to kubernetes API server.");
+ return null;
+ }
+
+ return client;
+ }
}