Implement kubevirt floating IP's class, codec, watcher and APIs
Change-Id: I2111902e86083add8a00af62557fac1e98b7e7fc
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubevirtRouterStore.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubevirtRouterStore.java
index 537d91c..2d566fc 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubevirtRouterStore.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubevirtRouterStore.java
@@ -15,11 +15,14 @@
*/
package org.onosproject.kubevirtnetworking.impl;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.DefaultKubevirtFloatingIp;
import org.onosproject.kubevirtnetworking.api.DefaultKubevirtRouter;
+import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
import org.onosproject.kubevirtnetworking.api.KubevirtPeerRouter;
import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
@@ -47,6 +50,11 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_ASSOCIATED;
+import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_CREATED;
+import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_DISASSOCIATED;
+import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_REMOVED;
+import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_FLOATING_IP_UPDATED;
import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_CREATED;
import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_REMOVED;
import static org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent.Type.KUBEVIRT_ROUTER_UPDATED;
@@ -72,6 +80,8 @@
.register(KubevirtRouter.class)
.register(DefaultKubevirtRouter.class)
.register(KubevirtPeerRouter.class)
+ .register(KubevirtFloatingIp.class)
+ .register(DefaultKubevirtFloatingIp.class)
.register(Collection.class)
.build();
@@ -86,8 +96,11 @@
private final MapEventListener<String, KubevirtRouter> routerMapListener =
new KubevirtRouterMapListener();
+ private final MapEventListener<String, KubevirtFloatingIp> fipMapListener =
+ new KubevirtFloatingIpMapListener();
private ConsistentMap<String, KubevirtRouter> routerStore;
+ private ConsistentMap<String, KubevirtFloatingIp> fipStore;
@Activate
protected void activate() {
@@ -97,13 +110,20 @@
.withName("kubevirt-routerstore")
.withApplicationId(appId)
.build();
+ fipStore = storageService.<String, KubevirtFloatingIp>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_KUBEVIRT_ROUTER))
+ .withName("kubevirt-fipstore")
+ .withApplicationId(appId)
+ .build();
routerStore.addListener(routerMapListener);
+ fipStore.addListener(fipMapListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
routerStore.removeListener(routerMapListener);
+ fipStore.removeListener(fipMapListener);
eventExecutor.shutdown();
log.info("Stopped");
}
@@ -147,8 +167,48 @@
}
@Override
+ public void createFloatingIp(KubevirtFloatingIp floatingIp) {
+ fipStore.compute(floatingIp.id(), (id, existing) -> {
+ final String error = floatingIp.id() + ERR_DUPLICATE;
+ checkArgument(existing == null, error);
+ return floatingIp;
+ });
+ }
+
+ @Override
+ public void updateFloatingIp(KubevirtFloatingIp floatingIp) {
+ fipStore.compute(floatingIp.id(), (id, existing) -> {
+ final String error = floatingIp.id() + ERR_NOT_FOUND;
+ checkArgument(existing != null, error);
+ return floatingIp;
+ });
+ }
+
+ @Override
+ public KubevirtFloatingIp removeFloatingIp(String id) {
+ Versioned<KubevirtFloatingIp> floatingIp = fipStore.remove(id);
+ if (floatingIp == null) {
+ final String error = id + ERR_NOT_FOUND;
+ throw new IllegalArgumentException(error);
+ }
+ return floatingIp.value();
+ }
+
+ @Override
+ public KubevirtFloatingIp floatingIp(String id) {
+ return fipStore.asJavaMap().get(id);
+ }
+
+ @Override
+ public Set<KubevirtFloatingIp> floatingIps() {
+ return ImmutableSet.copyOf(fipStore.asJavaMap().values());
+ }
+
+
+ @Override
public void clear() {
routerStore.clear();
+ fipStore.clear();
}
private class KubevirtRouterMapListener implements MapEventListener<String, KubevirtRouter> {
@@ -180,4 +240,81 @@
}
}
}
+
+ private class KubevirtFloatingIpMapListener implements MapEventListener<String, KubevirtFloatingIp> {
+
+ @Override
+ public void event(MapEvent<String, KubevirtFloatingIp> event) {
+ switch (event.type()) {
+ case INSERT:
+ eventExecutor.execute(() -> processFloatingIpMapInsertion(event));
+ break;
+ case UPDATE:
+ eventExecutor.execute(() -> processFloatingIpMapUpdate(event));
+ break;
+ case REMOVE:
+ eventExecutor.execute(() -> processFloatingIpMapRemoval(event));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processFloatingIpMapInsertion(MapEvent<String, KubevirtFloatingIp> event) {
+ log.debug("Kubevirt floating IP created");
+ KubevirtRouter router = Strings.isNullOrEmpty(
+ event.newValue().value().routerName()) ?
+ null :
+ router(event.newValue().value().routerName());
+ notifyDelegate(new KubevirtRouterEvent(
+ KUBEVIRT_FLOATING_IP_CREATED,
+ router,
+ event.newValue().value()));
+ }
+
+ private void processFloatingIpMapUpdate(MapEvent<String, KubevirtFloatingIp> event) {
+ log.debug("Kubevirt floating IP updated");
+ KubevirtRouter router = Strings.isNullOrEmpty(
+ event.newValue().value().routerName()) ?
+ null :
+ router(event.newValue().value().routerName());
+ notifyDelegate(new KubevirtRouterEvent(
+ KUBEVIRT_FLOATING_IP_UPDATED,
+ router,
+ event.newValue().value()));
+ processFloatingIpUpdate(event, router);
+ }
+
+ private void processFloatingIpMapRemoval(MapEvent<String, KubevirtFloatingIp> event) {
+ log.debug("Kubevirt floating IP removed");
+ KubevirtRouter router = Strings.isNullOrEmpty(
+ event.oldValue().value().routerName()) ?
+ null :
+ router(event.oldValue().value().routerName());
+ notifyDelegate(new KubevirtRouterEvent(
+ KUBEVIRT_FLOATING_IP_REMOVED,
+ router,
+ event.oldValue().value()));
+ }
+
+ private void processFloatingIpUpdate(MapEvent<String, KubevirtFloatingIp> event,
+ KubevirtRouter router) {
+ String oldPodName = event.oldValue().value().podName();
+ String newPodName = event.newValue().value().podName();
+
+ if (Strings.isNullOrEmpty(oldPodName) && !Strings.isNullOrEmpty(newPodName)) {
+ notifyDelegate(new KubevirtRouterEvent(
+ KUBEVIRT_FLOATING_IP_ASSOCIATED,
+ router,
+ event.newValue().value(), newPodName));
+ }
+
+ if (!Strings.isNullOrEmpty(oldPodName) && Strings.isNullOrEmpty(newPodName)) {
+ notifyDelegate(new KubevirtRouterEvent(
+ KUBEVIRT_FLOATING_IP_DISASSOCIATED,
+ router,
+ event.newValue().value(), oldPodName));
+ }
+ }
+ }
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpWatcher.java
new file mode 100644
index 0000000..d15a515
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpWatcher.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.AbstractWatcher;
+import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouterAdminService;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.mastership.MastershipService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Kubevirt floating IP watcher used for feeding kubevirt floating IP information.
+ */
+@Component(immediate = true)
+public class KubevirtFloatingIpWatcher extends AbstractWatcher {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtRouterAdminService adminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtApiConfigService configService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+ private final InternalFloatingIpWatcher watcher = new InternalFloatingIpWatcher();
+ private final InternalKubevirtApiConfigListener configListener =
+ new InternalKubevirtApiConfigListener();
+
+ CustomResourceDefinitionContext fipCrdCxt = new CustomResourceDefinitionContext
+ .Builder()
+ .withGroup("kubevirt.io")
+ .withScope("Cluster")
+ .withVersion("v1")
+ .withPlural("floatingips")
+ .build();
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ configService.addListener(configListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ configService.removeListener(configListener);
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ private void instantiateWatcher() {
+ KubernetesClient client = k8sClient(configService);
+
+ if (client != null) {
+ try {
+ client.customResource(fipCrdCxt).watch(watcher);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private KubevirtFloatingIp parseKubevirtFloatingIp(String resource) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ ObjectNode spec = (ObjectNode) json.get("spec");
+ return codec(KubevirtFloatingIp.class).decode(spec, this);
+ } catch (IOException e) {
+ log.error("Failed to parse kubevirt floating IP object");
+ }
+
+ return null;
+ }
+
+ private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtApiConfigEvent event) {
+
+ switch (event.type()) {
+ case KUBEVIRT_API_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdate);
+ break;
+ case KUBEVIRT_API_CONFIG_CREATED:
+ case KUBEVIRT_API_CONFIG_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processConfigUpdate() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ instantiateWatcher();
+ }
+ }
+
+ private class InternalFloatingIpWatcher implements Watcher<String> {
+
+ @Override
+ public void eventReceived(Action action, String resource) {
+ switch (action) {
+ case ADDED:
+ eventExecutor.execute(() -> processAddition(resource));
+ break;
+ case MODIFIED:
+ eventExecutor.execute(() -> processModification(resource));
+ break;
+ case DELETED:
+ eventExecutor.execute(() -> processDeletion(resource));
+ break;
+ case ERROR:
+ log.warn("Failures processing floating IP manipulation.");
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ @Override
+ public void onClose(WatcherException e) {
+
+ }
+
+ private void processAddition(String resource) {
+ if (!isMaster()) {
+ return;
+ }
+
+ KubevirtFloatingIp fip = parseKubevirtFloatingIp(resource);
+
+ if (fip != null) {
+ log.trace("Process Floating IP {} creating event from API server.", fip.floatingIp());
+
+ if (adminService.floatingIp(fip.id()) == null) {
+ adminService.createFloatingIp(fip);
+ }
+ }
+ }
+
+ private void processModification(String resource) {
+ if (!isMaster()) {
+ return;
+ }
+
+ KubevirtFloatingIp fip = parseKubevirtFloatingIp(resource);
+
+ if (fip != null) {
+ log.trace("Process Floating IP {} updating event from API server.", fip.floatingIp());
+
+ if (adminService.floatingIp(fip.id()) != null) {
+ adminService.updateFloatingIp(fip);
+ }
+ }
+ }
+
+ private void processDeletion(String resource) {
+ if (!isMaster()) {
+ return;
+ }
+
+ KubevirtFloatingIp fip = parseKubevirtFloatingIp(resource);
+
+ if (fip != null) {
+ log.trace("Process floating IP {} removal event from API server.", fip.floatingIp());
+
+ adminService.removeFloatingIp(fip.id());
+ }
+ }
+
+ private boolean isMaster() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+ }
+}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtRouterManager.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtRouterManager.java
index bae3ff9..e9c06da 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtRouterManager.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtRouterManager.java
@@ -21,6 +21,7 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.ListenerRegistry;
+import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
import org.onosproject.kubevirtnetworking.api.KubevirtPeerRouter;
import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
import org.onosproject.kubevirtnetworking.api.KubevirtRouterAdminService;
@@ -37,6 +38,7 @@
import org.slf4j.Logger;
import java.util.Set;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -57,12 +59,16 @@
protected final Logger log = getLogger(getClass());
private static final String MSG_ROUTER = "Kubevirt router %s %s";
+ private static final String MSG_FLOATING_IP = "Kubevirt floating IP %s %s";
private static final String MSG_CREATED = "created";
private static final String MSG_UPDATED = "updated";
private static final String MSG_REMOVED = "removed";
private static final String ERR_NULL_ROUTER = "Kubevirt router cannot be null";
private static final String ERR_NULL_ROUTER_NAME = "Kubevirt router name cannot be null";
+ private static final String ERR_NULL_FLOATING_IP = "Kubevirt floating IP cannot be null";
+ private static final String ERR_NULL_FLOATING_IP_ID = "Kubevirt floating IP ID cannot be null";
+ private static final String ERR_NULL_POD_NAME = "Kubevirt POD name cannot be null";
private static final String ERR_IN_USE = " still in use";
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -142,6 +148,39 @@
}
@Override
+ public void createFloatingIp(KubevirtFloatingIp floatingIp) {
+ checkNotNull(floatingIp, ERR_NULL_FLOATING_IP);
+ checkArgument(!Strings.isNullOrEmpty(floatingIp.id()), ERR_NULL_FLOATING_IP_ID);
+
+ kubevirtRouterStore.createFloatingIp(floatingIp);
+
+ log.info(String.format(MSG_FLOATING_IP, floatingIp.floatingIp(), MSG_CREATED));
+ }
+
+ @Override
+ public void updateFloatingIp(KubevirtFloatingIp floatingIp) {
+ checkNotNull(floatingIp, ERR_NULL_FLOATING_IP);
+ checkArgument(!Strings.isNullOrEmpty(floatingIp.id()), ERR_NULL_FLOATING_IP_ID);
+
+ kubevirtRouterStore.updateFloatingIp(floatingIp);
+
+ log.info(String.format(MSG_FLOATING_IP, floatingIp.floatingIp(), MSG_UPDATED));
+ }
+
+ @Override
+ public void removeFloatingIp(String id) {
+ checkArgument(!Strings.isNullOrEmpty(id), ERR_NULL_FLOATING_IP_ID);
+
+ synchronized (this) {
+ KubevirtFloatingIp floatingIp = kubevirtRouterStore.removeFloatingIp(id);
+
+ if (floatingIp != null) {
+ log.info(String.format(MSG_FLOATING_IP, floatingIp.floatingIp(), MSG_REMOVED));
+ }
+ }
+ }
+
+ @Override
public void clear() {
kubevirtRouterStore.clear();
}
@@ -157,8 +196,35 @@
return ImmutableSet.copyOf(kubevirtRouterStore.routers());
}
+ @Override
+ public KubevirtFloatingIp floatingIp(String id) {
+ checkArgument(!Strings.isNullOrEmpty(id), ERR_NULL_FLOATING_IP_ID);
+ return kubevirtRouterStore.floatingIp(id);
+ }
+
+ @Override
+ public KubevirtFloatingIp floatingIpByPodName(String podName) {
+ checkArgument(!Strings.isNullOrEmpty(podName), ERR_NULL_POD_NAME);
+ return kubevirtRouterStore.floatingIps().stream()
+ .filter(ips -> podName.equals(ips.podName()))
+ .findAny().orElse(null);
+ }
+
+ @Override
+ public Set<KubevirtFloatingIp> floatingIpsByRouter(String routerName) {
+ checkArgument(!Strings.isNullOrEmpty(routerName), ERR_NULL_ROUTER_NAME);
+ return kubevirtRouterStore.floatingIps().stream()
+ .filter(ips -> routerName.equals(ips.routerName()))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<KubevirtFloatingIp> floatingIps() {
+ return ImmutableSet.copyOf(kubevirtRouterStore.floatingIps());
+ }
+
private boolean isRouterInUse(String name) {
- return false;
+ return floatingIpsByRouter(name).size() > 0;
}
private class InternalRouterStorageDelegate implements KubevirtRouterStoreDelegate {