Fix: associate tunnel ingress rules with network events
Change-Id: If5e5ca996d6c3c106450454c3a944a3c9871a6b9
(cherry picked from commit b689e57976810f5c80aed99f5c4d1c8c37f4016b)
diff --git a/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/DefaultKubevirtPort.java b/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/DefaultKubevirtPort.java
index 2a7385f..ccd35e9 100644
--- a/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/DefaultKubevirtPort.java
+++ b/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/DefaultKubevirtPort.java
@@ -92,7 +92,12 @@
DefaultServiceDirectory.getService(KubevirtNodeService.class);
KubevirtNetwork network = networkService.network(networkId);
KubevirtNode node = nodeService.node(deviceId);
- return network.tenantDeviceId(node.hostname());
+
+ if (network == null || node == null) {
+ return null;
+ } else {
+ return network.tenantDeviceId(node.hostname());
+ }
}
@Override
@@ -100,9 +105,13 @@
KubevirtNetworkService networkService =
DefaultServiceDirectory.getService(KubevirtNetworkService.class);
KubevirtNetwork network = networkService.network(networkId);
- return network.type() == KubevirtNetwork.Type.VXLAN ||
- network.type() == KubevirtNetwork.Type.GRE ||
- network.type() == KubevirtNetwork.Type.GENEVE;
+ if (network == null) {
+ return false;
+ } else {
+ return network.type() == KubevirtNetwork.Type.VXLAN ||
+ network.type() == KubevirtNetwork.Type.GRE ||
+ network.type() == KubevirtNetwork.Type.GENEVE;
+ }
}
@Override
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
index c95f8c4..47f778a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
@@ -1230,7 +1230,7 @@
return;
}
- nodeService.completeNodes().forEach(n -> {
+ nodeService.completeNodes(WORKER).forEach(n -> {
removePatchInterface(n, network);
removeBridge(n, network);
});
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
index f49b03f..669f22ff 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
@@ -134,9 +134,8 @@
public void event(KubevirtPodEvent event) {
switch (event.type()) {
case KUBEVIRT_POD_UPDATED:
- eventExecutor.execute(() -> processPodUpdate(event.subject()));
- break;
case KUBEVIRT_POD_CREATED:
+ eventExecutor.execute(() -> processPodAddition(event.subject()));
case KUBEVIRT_POD_REMOVED:
default:
// do nothing
@@ -144,7 +143,7 @@
}
}
- private void processPodUpdate(Pod pod) {
+ private void processPodAddition(Pod pod) {
if (!isRelevantHelper()) {
return;
}
@@ -173,6 +172,7 @@
Set<KubevirtPort> ports = getPorts(kubevirtNodeService,
kubevirtNetworkAdminService.networks(), pod);
+
if (ports.size() == 0) {
return;
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
index c4802a6..09cc288 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
@@ -379,6 +379,23 @@
if (deviceId == null) {
return;
}
+
+ // we check whether the given device is available from the store
+ // if not we will wait until the device is eventually created
+ // FIXME: it would be better to listen to device event to perform
+ // pipeline initialization rather on network events.
+ while (true) {
+ if (deviceService.getDevice(deviceId) != null) {
+ break;
+ } else {
+ try {
+ sleep(SLEEP_MS);
+ } catch (InterruptedException e) {
+ log.error("Failed to install security group default rules.");
+ }
+ }
+ }
+
initializeTenantIngressTable(deviceId, install);
initializeTenantEgressTable(deviceId, install);
initializeTenantConnTrackTable(deviceId, install);
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
index 066ce49..30295d1 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
@@ -25,6 +25,8 @@
import org.onosproject.core.CoreService;
import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkEvent;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkListener;
import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
import org.onosproject.kubevirtnetworking.api.KubevirtPort;
@@ -53,6 +55,7 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
+import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
@@ -73,6 +76,7 @@
@Component(immediate = true)
public class KubevirtSwitchingTenantHandler {
private final Logger log = getLogger(getClass());
+ private static final long SLEEP_MS = 3000; // we wait 3s
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -100,6 +104,8 @@
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+ private final InternalKubevirtNetworkListener kubevirtNetworkListener =
+ new InternalKubevirtNetworkListener();
private final InternalKubevirtPortListener kubevirtPortListener =
new InternalKubevirtPortListener();
private final InternalKubevirtNodeListener kubevirtNodeListener =
@@ -114,6 +120,7 @@
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
kubevirtPortService.addListener(kubevirtPortListener);
+ kubevirtNetworkService.addListener(kubevirtNetworkListener);
kubevirtNodeService.addListener(kubevirtNodeListener);
log.info("Started");
@@ -121,6 +128,7 @@
@Deactivate
protected void deactivate() {
+ kubevirtNetworkService.removeListener(kubevirtNetworkListener);
kubevirtPortService.removeListener(kubevirtPortListener);
kubevirtNodeService.removeListener(kubevirtNodeListener);
leadershipService.withdraw(appId.name());
@@ -129,13 +137,7 @@
log.info("Stopped");
}
- private void setIngressRules(KubevirtPort port, boolean install) {
- if (port.ipAddress() == null) {
- return;
- }
-
- KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
-
+ private void setIngressRules(KubevirtNetwork network, boolean install) {
if (network == null) {
return;
}
@@ -148,33 +150,65 @@
return;
}
- KubevirtNode localNode = kubevirtNodeService.node(port.deviceId());
- if (localNode == null || localNode.type() == MASTER) {
- return;
+ for (KubevirtNode localNode : kubevirtNodeService.completeNodes(WORKER)) {
+
+ while (true) {
+ if (tunnelToTenantPort(localNode, network) != null) {
+ break;
+ } else {
+ try {
+ sleep(SLEEP_MS);
+ } catch (InterruptedException e) {
+ log.error("Failed to install security group default rules.");
+ }
+ }
+ }
+
+ PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
+
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchTunnelId(Long.parseLong(network.segmentId()));
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setOutput(patchPortNumber);
+
+ flowRuleService.setRule(
+ appId,
+ localNode.tunBridge(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_TUNNEL_RULE,
+ TUNNEL_DEFAULT_TABLE,
+ install);
+
+ log.debug("Install ingress rules for segment ID {}", network.segmentId());
}
+ }
- PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
- if (patchPortNumber == null) {
- return;
+ private void setIngressRules(KubevirtNode node, boolean install) {
+ for (KubevirtNetwork network : kubevirtNetworkService.tenantNetworks()) {
+ PortNumber patchPortNumber = tunnelToTenantPort(node, network);
+ if (patchPortNumber == null) {
+ return;
+ }
+
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchTunnelId(Long.parseLong(network.segmentId()));
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setOutput(patchPortNumber);
+
+ flowRuleService.setRule(
+ appId,
+ node.tunBridge(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_TUNNEL_RULE,
+ TUNNEL_DEFAULT_TABLE,
+ install);
+
+ log.debug("Install ingress rules for segment ID {}", network.segmentId());
}
-
- TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
- .matchTunnelId(Long.parseLong(network.segmentId()));
-
- TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
- .setOutput(patchPortNumber);
-
- flowRuleService.setRule(
- appId,
- localNode.tunBridge(),
- sBuilder.build(),
- tBuilder.build(),
- PRIORITY_TUNNEL_RULE,
- TUNNEL_DEFAULT_TABLE,
- install);
-
- log.debug("Install ingress rules for instance {}, segment ID {}",
- port.ipAddress(), network.segmentId());
}
private void setEgressRules(KubevirtPort port, boolean install) {
@@ -284,15 +318,52 @@
return;
}
+ setIngressRules(node, true);
kubevirtPortService.ports().stream()
.filter(port -> node.equals(kubevirtNodeService.node(port.deviceId())))
.forEach(port -> {
- setIngressRules(port, true);
setEgressRules(port, true);
});
}
}
+ private class InternalKubevirtNetworkListener implements KubevirtNetworkListener {
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtNetworkEvent event) {
+ switch (event.type()) {
+ case KUBEVIRT_NETWORK_CREATED:
+ eventExecutor.execute(() -> processNetworkAddition(event.subject()));
+ break;
+ case KUBEVIRT_NETWORK_REMOVED:
+ eventExecutor.execute(() -> processNetworkRemoval(event.subject()));
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processNetworkAddition(KubevirtNetwork network) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setIngressRules(network, true);
+ }
+
+ private void processNetworkRemoval(KubevirtNetwork network) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setIngressRules(network, false);
+ }
+ }
+
private class InternalKubevirtPortListener implements KubevirtPortListener {
@Override
@@ -325,7 +396,6 @@
return;
}
- setIngressRules(port, true);
setEgressRules(port, true);
}
@@ -334,7 +404,6 @@
return;
}
- setIngressRules(port, false);
setEgressRules(port, false);
}
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
index 78a7f51..43d3e64 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtVmWatcher.java
@@ -73,8 +73,6 @@
private final Logger log = getLogger(getClass());
- private static final long SLEEP_MS = 3000; // we wait 3s
-
private static final String SPEC = "spec";
private static final String TEMPLATE = "template";
private static final String METADATA = "metadata";