Learn the devId where VM attached by listening VMI event
Refactor: extract wait logic into a common utility class
Change-Id: I96d60832770ee919a7632cd2665c82eb6f96bfba
(cherry picked from commit 0b90d45879020aab3a5fa2bac6a5b19b6bbe75a6)
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
index 466ae9a..6617128 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
@@ -16,6 +16,7 @@
package org.onosproject.kubevirtnetworking.impl;
import com.google.common.collect.Lists;
+import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.packet.ARP;
import org.onlab.packet.EthType;
import org.onlab.packet.Ethernet;
@@ -52,6 +53,7 @@
import org.onosproject.kubevirtnode.api.KubevirtNodeService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.BridgeConfig;
import org.onosproject.net.behaviour.BridgeDescription;
@@ -62,6 +64,7 @@
import org.onosproject.net.behaviour.InterfaceConfig;
import org.onosproject.net.behaviour.PatchDescription;
import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -79,7 +82,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.packet.ICMP.CODE_ECHO_REQEUST;
import static org.onlab.packet.ICMP.TYPE_ECHO_REPLY;
@@ -118,6 +120,7 @@
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.segmentIdHex;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.NXM_NX_IP_TTL;
import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.NXM_OF_ICMP_TYPE;
import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
@@ -130,6 +133,7 @@
import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -141,8 +145,6 @@
private static final String DEFAULT_OF_PROTO = "tcp";
private static final int DEFAULT_OFPORT = 6653;
private static final int DPID_BEGIN = 3;
- private static final long SLEEP_MS = 3000;
- private static final long SLEEP_LARGE_MS = 5000;
private static final int DEFAULT_TTL = 0xff;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -256,15 +258,20 @@
String dpid = network.tenantDeviceId(
node.hostname()).toString().substring(DPID_BEGIN);
- BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
- .name(network.tenantBridgeName())
- .failMode(BridgeDescription.FailMode.SECURE)
- .datapathId(dpid)
- .disableInBand()
- .controllers(controllers);
+ // if the bridge is already available, we skip creating a new bridge
+ if (!deviceService.isAvailable(DeviceId.deviceId(dpid))) {
+ BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
+ .name(network.tenantBridgeName())
+ .failMode(BridgeDescription.FailMode.SECURE)
+ .datapathId(dpid)
+ .disableInBand()
+ .controllers(controllers);
- BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
- bridgeConfig.addBridge(builder.build());
+ BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
+ bridgeConfig.addBridge(builder.build());
+
+ waitFor(3);
+ }
}
private void removeBridge(KubevirtNode node, KubevirtNetwork network) {
@@ -292,24 +299,32 @@
String tunToTenantIntf =
TUNNEL_TO_TENANT_PREFIX + segmentIdHex(network.segmentId());
- // tenant bridge -> tunnel bridge
- PatchDescription brTenantTunPatchDesc =
- DefaultPatchDescription.builder()
- .deviceId(network.tenantBridgeName())
- .ifaceName(tenantToTunIntf)
- .peer(tunToTenantIntf)
- .build();
+ if (!hasPort(DeviceId.deviceId(network.tenantBridgeName()), tenantToTunIntf)) {
+ // tenant bridge -> tunnel bridge
+ PatchDescription brTenantTunPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(network.tenantBridgeName())
+ .ifaceName(tenantToTunIntf)
+ .peer(tunToTenantIntf)
+ .build();
- ifaceConfig.addPatchMode(tenantToTunIntf, brTenantTunPatchDesc);
+ ifaceConfig.addPatchMode(tenantToTunIntf, brTenantTunPatchDesc);
- // tunnel bridge -> tenant bridge
- PatchDescription brTunTenantPatchDesc =
- DefaultPatchDescription.builder()
- .deviceId(TUNNEL_BRIDGE)
- .ifaceName(tunToTenantIntf)
- .peer(tenantToTunIntf)
- .build();
- ifaceConfig.addPatchMode(tunToTenantIntf, brTunTenantPatchDesc);
+ waitFor(1);
+ }
+
+ if (!hasPort(DeviceId.deviceId(TUNNEL_BRIDGE), tunToTenantIntf)) {
+ // tunnel bridge -> tenant bridge
+ PatchDescription brTunTenantPatchDesc =
+ DefaultPatchDescription.builder()
+ .deviceId(TUNNEL_BRIDGE)
+ .ifaceName(tunToTenantIntf)
+ .peer(tenantToTunIntf)
+ .build();
+ ifaceConfig.addPatchMode(tunToTenantIntf, brTunTenantPatchDesc);
+
+ waitFor(1);
+ }
}
private void removeAllFlows(KubevirtNode node, KubevirtNetwork network) {
@@ -387,12 +402,7 @@
while (!deviceService.isAvailable(deviceId)) {
log.warn("Device {} is not ready for installing rules", deviceId);
-
- try {
- sleep(SLEEP_MS);
- } catch (InterruptedException e) {
- log.error("Failed to check device availability", e);
- }
+ waitFor(3);
}
flowService.connectTables(deviceId, TENANT_INBOUND_TABLE, TENANT_DHCP_TABLE);
@@ -927,6 +937,15 @@
install);
}
+ private boolean hasPort(DeviceId deviceId, String portName) {
+ DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
+ Port port = deviceService.getPorts(deviceId).stream()
+ .filter(p -> p.isEnabled() &&
+ Objects.equals(p.annotations().value(PORT_NAME), portName))
+ .findAny().orElse(null);
+ return port != null;
+ }
+
private class InternalRouterEventListener implements KubevirtRouterListener {
private boolean isRelevantHelper() {
return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
@@ -1240,11 +1259,7 @@
removeAllFlows(n, network);
removePatchInterface(n, network);
- try {
- sleep(SLEEP_LARGE_MS);
- } catch (InterruptedException e) {
- log.error("Sleep exception", e);
- }
+ waitFor(5);
removeBridge(n, network);
});
@@ -1440,7 +1455,6 @@
kubevirtPort, gwNode, false);
});
}
-
}
}
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
deleted file mode 100644
index 669f22ff..0000000
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright 2021-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kubevirtnetworking.impl;
-
-import io.fabric8.kubernetes.api.model.Pod;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
-import org.onosproject.kubevirtnetworking.api.KubevirtPodAdminService;
-import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
-import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
-import org.onosproject.kubevirtnetworking.api.KubevirtPort;
-import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
-import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
-import org.onosproject.kubevirtnode.api.KubevirtNode;
-import org.onosproject.kubevirtnode.api.KubevirtNodeService;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.driver.DriverService;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
-import static java.lang.Thread.sleep;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
-import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Associates the kubevirt container port and pod.
- */
-@Component(immediate = true)
-public class KubevirtPodPortMapper {
-
- private final Logger log = getLogger(getClass());
-
- private static final String NETWORK_STATUS_KEY = "k8s.v1.cni.cncf.io/network-status";
- private static final long SLEEP_MS = 2000; // we wait 2s
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MastershipService mastershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected LeadershipService leadershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected DeviceService deviceService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected DriverService driverService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected KubevirtPortAdminService kubevirtPortAdminService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected KubevirtNetworkAdminService kubevirtNetworkAdminService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected KubevirtPodAdminService kubevirtPodAdminService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected KubevirtApiConfigService kubevirtApiConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected KubevirtNodeService kubevirtNodeService;
-
- private final ExecutorService eventExecutor = newSingleThreadExecutor(
- groupedThreads(this.getClass().getSimpleName(), "event-handler"));
-
- private final InternalKubevirtPodListener kubevirtPodListener =
- new InternalKubevirtPodListener();
-
- private ApplicationId appId;
- private NodeId localNodeId;
-
- @Activate
- protected void activate() {
- appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
- localNodeId = clusterService.getLocalNode().id();
- leadershipService.runForLeadership(appId.name());
- kubevirtPodAdminService.addListener(kubevirtPodListener);
-
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- kubevirtPodAdminService.removeListener(kubevirtPodListener);
- leadershipService.withdraw(appId.name());
- eventExecutor.shutdown();
-
- log.info("Stopped");
- }
-
- private class InternalKubevirtPodListener implements KubevirtPodListener {
-
- private boolean isRelevantHelper() {
- return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
- }
-
- @Override
- public void event(KubevirtPodEvent event) {
- switch (event.type()) {
- case KUBEVIRT_POD_UPDATED:
- case KUBEVIRT_POD_CREATED:
- eventExecutor.execute(() -> processPodAddition(event.subject()));
- case KUBEVIRT_POD_REMOVED:
- default:
- // do nothing
- break;
- }
- }
-
- private void processPodAddition(Pod pod) {
- if (!isRelevantHelper()) {
- return;
- }
-
- Map<String, String> annots = pod.getMetadata().getAnnotations();
- if (annots == null) {
- return;
- }
-
- if (!annots.containsKey(NETWORK_STATUS_KEY)) {
- return;
- }
-
- KubevirtNode node = kubevirtNodeService.node(pod.getSpec().getNodeName());
-
- if (node == null) {
- log.warn("POD scheduled node name {} is not ready, " +
- "we wait for a while...", pod.getSpec().getNodeName());
- try {
- // we wait until all k8s nodes are available
- sleep(SLEEP_MS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- Set<KubevirtPort> ports = getPorts(kubevirtNodeService,
- kubevirtNetworkAdminService.networks(), pod);
-
- if (ports.size() == 0) {
- return;
- }
-
- ports.forEach(port -> {
- KubevirtPort existing = kubevirtPortAdminService.port(port.macAddress());
-
- if (existing != null) {
- if (port.deviceId() != null && existing.deviceId() == null) {
- KubevirtPort updated = existing.updateDeviceId(port.deviceId());
- // internal we update device ID of kubevirt port
- kubevirtPortAdminService.updatePort(updated);
- }
- }
- });
- }
- }
-}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
index 09cc288..d97ac47 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
@@ -78,7 +78,6 @@
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
-import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnetworking.api.Constants.ACL_CT_TABLE;
@@ -103,6 +102,7 @@
import static org.onosproject.kubevirtnetworking.impl.OsgiPropertyConstants.USE_SECURITY_GROUP;
import static org.onosproject.kubevirtnetworking.impl.OsgiPropertyConstants.USE_SECURITY_GROUP_DEFAULT;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPropertyValueAsBoolean;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildPortRangeMatches;
import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.computeCtMaskFlag;
import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.computeCtStateFlag;
@@ -154,8 +154,6 @@
private static final int ACTION_NONE = 0;
private static final int ACTION_DROP = -1;
- private static final long SLEEP_MS = 5000;
-
/** Apply EdgeStack security group rule for VM traffic. */
private boolean useSecurityGroup = USE_SECURITY_GROUP_DEFAULT;
@@ -388,11 +386,7 @@
if (deviceService.getDevice(deviceId) != null) {
break;
} else {
- try {
- sleep(SLEEP_MS);
- } catch (InterruptedException e) {
- log.error("Failed to install security group default rules.");
- }
+ waitFor(5);
}
}
@@ -1020,11 +1014,7 @@
// now we wait 5s for all tenant bridges are created,
// FIXME: we need to fina a better way to wait all tenant bridges
// are created before installing default security group rules
- try {
- sleep(SLEEP_MS);
- } catch (InterruptedException e) {
- log.error("Failed to install security group default rules.");
- }
+ waitFor(5);
for (KubevirtNode node : nodes) {
initializeTenantPipeline(network, node, true);
@@ -1108,11 +1098,7 @@
}
// FIXME: we wait all port get its deviceId updated
- try {
- sleep(SLEEP_MS);
- } catch (InterruptedException e) {
- log.error("Failed to install security group default rules.");
- }
+ waitFor(5);
resetSecurityGroupRulesByNode(node);
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
index 30295d1..3967d6a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
@@ -55,7 +55,6 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
-import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
@@ -65,6 +64,7 @@
import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
@@ -76,7 +76,6 @@
@Component(immediate = true)
public class KubevirtSwitchingTenantHandler {
private final Logger log = getLogger(getClass());
- private static final long SLEEP_MS = 3000; // we wait 3s
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -156,11 +155,7 @@
if (tunnelToTenantPort(localNode, network) != null) {
break;
} else {
- try {
- sleep(SLEEP_MS);
- } catch (InterruptedException e) {
- log.error("Failed to install security group default rules.");
- }
+ waitFor(3);
}
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
index fc3baab..8f38c4a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
@@ -18,7 +18,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
-import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
@@ -41,7 +40,6 @@
import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
import org.onosproject.kubevirtnode.api.KubevirtNodeService;
import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.DeviceId;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -56,12 +54,10 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
-import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
import static org.slf4j.LoggerFactory.getLogger;
@@ -242,12 +238,6 @@
port = port.updateIpAddress(ip);
- DeviceId deviceId = getDeviceId(podService.pods(), port);
-
- if (deviceId != null) {
- port = port.updateDeviceId(deviceId);
- }
-
if (portAdminService.port(port.macAddress()) == null) {
portAdminService.createPort(port);
}
@@ -298,22 +288,6 @@
return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}
- // FIXME: to obtains the device ID, we have to search through
- // existing POD inventory, need to find a better wat to obtain device ID
- private DeviceId getDeviceId(Set<Pod> pods, KubevirtPort port) {
- Set<Pod> defaultPods = pods.stream()
- .filter(pod -> pod.getMetadata().getNamespace().equals(DEFAULT))
- .collect(Collectors.toSet());
-
- Set<KubevirtPort> allPorts = new HashSet<>();
- for (Pod pod : defaultPods) {
- allPorts.addAll(getPorts(nodeService, networkAdminService.networks(), pod));
- }
-
- return allPorts.stream().filter(p -> p.macAddress().equals(port.macAddress()))
- .map(KubevirtPort::deviceId).findFirst().orElse(null);
- }
-
private Map<String, IpAddress> parseIpAddresses(String resource) {
try {
ObjectMapper mapper = new ObjectMapper();
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmiWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmiWatcher.java
new file mode 100644
index 0000000..9e0ce3e
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmiWatcher.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
+import org.onosproject.kubevirtnetworking.api.KubevirtPort;
+import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Kubernetes VMI watcher used for feeding VMI information.
+ */
+@Component(immediate = true)
+public class KubevirtVmiWatcher {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String STATUS = "status";
+ private static final String NODE_NAME = "nodeName";
+ private static final String METADATA = "metadata";
+ private static final String NAME = "name";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNetworkAdminService networkAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtPortAdminService portAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtApiConfigService configService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+ private final InternalKubevirtVmiWatcher watcher = new InternalKubevirtVmiWatcher();
+ private final InternalKubevirtApiConfigListener
+ configListener = new InternalKubevirtApiConfigListener();
+
+ CustomResourceDefinitionContext vmiCrdCxt = new CustomResourceDefinitionContext
+ .Builder()
+ .withGroup("kubevirt.io")
+ .withScope("Namespaced")
+ .withVersion("v1")
+ .withPlural("virtualmachineinstances")
+ .build();
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ configService.addListener(configListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ configService.removeListener(configListener);
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ private void instantiateWatcher() {
+ KubernetesClient client = k8sClient(configService);
+
+ if (client != null) {
+ try {
+ client.customResource(vmiCrdCxt).watch(watcher);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtApiConfigEvent event) {
+
+ switch (event.type()) {
+ case KUBEVIRT_API_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdate);
+ break;
+ case KUBEVIRT_API_CONFIG_CREATED:
+ case KUBEVIRT_API_CONFIG_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processConfigUpdate() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ instantiateWatcher();
+ }
+ }
+
+ private class InternalKubevirtVmiWatcher implements Watcher<String> {
+
+ @Override
+ public void eventReceived(Action action, String s) {
+ switch (action) {
+ case ADDED:
+ case MODIFIED:
+ eventExecutor.execute(() -> processAddition(s));
+ break;
+ case ERROR:
+ log.warn("Failures processing VM manipulation.");
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void onClose(WatcherException e) {
+ log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
+ instantiateWatcher();
+ }
+
+ private void processAddition(String resource) {
+ if (!isMaster()) {
+ return;
+ }
+
+ String nodeName = parseNodeName(resource);
+ String vmiName = parseVmiName(resource);
+
+ if (nodeName == null) {
+ return;
+ }
+
+ KubevirtNode node = nodeService.node(nodeName);
+
+ if (node == null) {
+ log.warn("VMI {} scheduled on node {} is not ready, " +
+ "we wait for a while...", vmiName, nodeName);
+ waitFor(2);
+ }
+
+ Set<KubevirtPort> ports = getPorts(nodeService,
+ networkAdminService.networks(), resource);
+
+ if (ports.size() == 0) {
+ return;
+ }
+
+ ports.forEach(port -> {
+ KubevirtPort existing = portAdminService.port(port.macAddress());
+
+ if (existing != null) {
+ if (port.deviceId() != null && existing.deviceId() == null) {
+ KubevirtPort updated = existing.updateDeviceId(port.deviceId());
+ // internal we update device ID of kubevirt port
+ portAdminService.updatePort(updated);
+ }
+ }
+ });
+ }
+
+ private boolean isMaster() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ private String parseVmiName(String resource) {
+ String vmiName = null;
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ JsonNode metadataJson = json.get(METADATA);
+ JsonNode vmiNameJson = metadataJson.get(NAME);
+ vmiName = vmiNameJson != null ? vmiNameJson.asText() : null;
+ } catch (IOException e) {
+ log.error("Failed to parse kubevirt VMI name");
+ }
+
+ return vmiName;
+ }
+
+ private String parseNodeName(String resource) {
+ String nodeName = null;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ JsonNode statusJson = json.get(STATUS);
+ JsonNode nodeNameJson = statusJson.get(NODE_NAME);
+ nodeName = nodeNameJson != null ? nodeNameJson.asText() : null;
+ } catch (IOException e) {
+ log.error("Failed to parse kubevirt VMI nodename");
+ }
+
+ return nodeName;
+ }
+ }
+}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
index 8771dda..a60bd0a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
@@ -15,16 +15,16 @@
*/
package org.onosproject.kubevirtnetworking.util;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
-import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.net.util.SubnetUtils;
-import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.onlab.osgi.DefaultServiceDirectory;
@@ -62,7 +62,6 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -90,6 +89,10 @@
private static final String MAC = "mac";
private static final String IPS = "ips";
private static final String BR_INT = "br-int";
+ private static final String STATUS = "status";
+ private static final String INTERFACES = "interfaces";
+ private static final String IP_ADDRESS = "ipAddress";
+ private static final String NODE_NAME = "nodeName";
/**
* Prevents object installation from external.
@@ -325,65 +328,68 @@
}
/**
- * Obtains the kubevirt port from kubevirt POD.
+ * Obtains the kubevirt port from kubevirt VMI.
*
* @param nodeService kubevirt node service
* @param networks set of existing kubevirt networks
- * @param pod kubevirt POD
- * @return kubevirt ports attached to the POD
+ * @param resource VMI definition
+ * @return kubevirt ports attached to the VMI
*/
public static Set<KubevirtPort> getPorts(KubevirtNodeService nodeService,
- Set<KubevirtNetwork> networks, Pod pod) {
+ Set<KubevirtNetwork> networks,
+ String resource) {
try {
- Map<String, String> annots = pod.getMetadata().getAnnotations();
- if (annots == null) {
- return ImmutableSet.of();
- }
-
- if (!annots.containsKey(NETWORK_STATUS_KEY)) {
- return ImmutableSet.of();
- }
-
- String networkStatusStr = annots.get(NETWORK_STATUS_KEY);
-
- if (networkStatusStr == null) {
- return ImmutableSet.of();
- }
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ JsonNode statusJson = json.get(STATUS);
+ ArrayNode interfacesJson = (ArrayNode) statusJson.get(INTERFACES);
KubevirtPort.Builder builder = DefaultKubevirtPort.builder();
-
- KubevirtNode node = nodeService.node(pod.getSpec().getNodeName());
-
- if (node != null) {
- builder.deviceId(node.intgBridge());
+ String nodeName = parseVmiNodeName(resource);
+ if (nodeName != null && nodeService.node(nodeName) != null) {
+ builder.deviceId(nodeService.node(nodeName).intgBridge());
}
- JSONArray networkStatus = new JSONArray(networkStatusStr);
- Set<KubevirtPort> ports = new HashSet<>();
+ if (interfacesJson == null) {
+ return ImmutableSet.of();
+ }
- for (int i = 0; i < networkStatus.length(); i++) {
- JSONObject object = networkStatus.getJSONObject(i);
- String name = object.getString(NAME);
+ Set<KubevirtPort> ports = new HashSet<>();
+ for (JsonNode interfaceJson : interfacesJson) {
+ String name = interfaceJson.get(NAME).asText();
KubevirtNetwork network = networks.stream()
- .filter(n -> (NETWORK_PREFIX + n.name()).equals(name) || (n.name()).equals(name))
+ .filter(n -> (NETWORK_PREFIX + n.name()).equals(name) ||
+ (n.name() + "-net").equals(name))
.findAny().orElse(null);
if (network != null) {
- String mac = object.getString(MAC);
-
+ // FIXME: we do not update IP address, as learning IP address
+ // requires much more time due to the lag from VM agent
+ String mac = interfaceJson.get(MAC).asText();
builder.macAddress(MacAddress.valueOf(mac))
.networkId(network.networkId());
-
ports.add(builder.build());
}
}
-
return ports;
+ } catch (IOException e) {
+ log.error("Failed to parse port info from VMI object", e);
+ }
+ return ImmutableSet.of();
+ }
- } catch (JSONException e) {
- log.error("Failed to parse network status object", e);
+ public static String parseVmiNodeName(String resource) {
+ String nodeName = null;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(resource);
+ JsonNode statusJson = json.get(STATUS);
+ JsonNode nodeNameJson = statusJson.get(NODE_NAME);
+ nodeName = nodeNameJson != null ? nodeNameJson.asText() : null;
+ } catch (IOException e) {
+ log.error("Failed to parse kubevirt VMI nodename");
}
- return ImmutableSet.of();
+ return nodeName;
}
/**
@@ -663,4 +669,17 @@
.filter(lb -> router.internal().contains(lb.networkId()))
.collect(Collectors.toSet());
}
+
+ /**
+ * Waits for the given length of time.
+ *
+ * @param timeSecond the amount of time for wait in second unit
+ */
+ public static void waitFor(int timeSecond) {
+ try {
+ Thread.sleep(timeSecond * 1000L);
+ } catch (Exception e) {
+ log.error(e.toString());
+ }
+ }
}