Support tenant overlay network mode at kubevirt networking
Change-Id: Ife40e40e3ee5e342ac8b90ddea6eb81744ace18a
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
index e538ef7..6572033 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
@@ -16,9 +16,13 @@
package org.onosproject.kubevirtnetworking.impl;
import com.google.common.collect.Lists;
+import org.onlab.packet.ARP;
+import org.onlab.packet.EthType;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IPv4;
+import org.onlab.packet.Ip4Address;
import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
import org.onlab.packet.TpPort;
import org.onlab.packet.UDP;
import org.onosproject.cluster.ClusterService;
@@ -65,14 +69,31 @@
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.packet.ICMP.CODE_ECHO_REQEUST;
+import static org.onlab.packet.ICMP.TYPE_ECHO_REPLY;
+import static org.onlab.packet.ICMP.TYPE_ECHO_REQUEST;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.DEFAULT_GATEWAY_MAC;
import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_DHCP_RULE;
-import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_SWITCHING_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FORWARDING_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ICMP_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ARP_TABLE;
import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_DHCP_TABLE;
import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_FORWARDING_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ICMP_TABLE;
import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_INBOUND_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_TO_TUNNEL_PREFIX;
+import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_TO_TENANT_PREFIX;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.segmentIdHex;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.NXM_NX_IP_TTL;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.NXM_OF_ICMP_TYPE;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildLoadExtension;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildMoveArpShaToThaExtension;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildMoveArpSpaToTpaExtension;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildMoveEthSrcToDstExtension;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildMoveIpSrcToDstExtension;
import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
import static org.slf4j.LoggerFactory.getLogger;
@@ -86,9 +107,7 @@
private static final int DEFAULT_OFPORT = 6653;
private static final int DPID_BEGIN = 3;
private static final long SLEEP_MS = 3000; // we wait 3s for init each node
-
- public static final String INTEGRATION_TO_TUNNEL_PREFIX = "i-to-t-";
- public static final String TUNNEL_TO_INTEGRATION_PREFIX = "t-to-i-";
+ private static final int DEFAULT_TTL = 0xff;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@@ -195,29 +214,29 @@
InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
- String intToTunIntf =
- INTEGRATION_TO_TUNNEL_PREFIX + segmentIdHex(network.segmentId());
- String tunToIntIntf =
- TUNNEL_TO_INTEGRATION_PREFIX + segmentIdHex(network.segmentId());
+ String tenantToTunIntf =
+ TENANT_TO_TUNNEL_PREFIX + segmentIdHex(network.segmentId());
+ String tunToTenantIntf =
+ TUNNEL_TO_TENANT_PREFIX + segmentIdHex(network.segmentId());
- // integration bridge -> tunnel bridge
- PatchDescription brIntTunPatchDesc =
+ // tenant bridge -> tunnel bridge
+ PatchDescription brTenantTunPatchDesc =
DefaultPatchDescription.builder()
.deviceId(network.tenantBridgeName())
- .ifaceName(intToTunIntf)
- .peer(tunToIntIntf)
+ .ifaceName(tenantToTunIntf)
+ .peer(tunToTenantIntf)
.build();
- ifaceConfig.addPatchMode(intToTunIntf, brIntTunPatchDesc);
+ ifaceConfig.addPatchMode(tenantToTunIntf, brTenantTunPatchDesc);
- // tunnel bridge -> integration bridge
- PatchDescription brTunIntPatchDesc =
+ // tunnel bridge -> tenant bridge
+ PatchDescription brTunTenantPatchDesc =
DefaultPatchDescription.builder()
.deviceId(TUNNEL_BRIDGE)
- .ifaceName(tunToIntIntf)
- .peer(intToTunIntf)
+ .ifaceName(tunToTenantIntf)
+ .peer(tenantToTunIntf)
.build();
- ifaceConfig.addPatchMode(tunToIntIntf, brTunIntPatchDesc);
+ ifaceConfig.addPatchMode(tunToTenantIntf, brTunTenantPatchDesc);
}
private void removePatchInterface(KubevirtNode node, KubevirtNetwork network) {
@@ -230,7 +249,7 @@
InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
- String tunToIntIntf = TUNNEL_TO_INTEGRATION_PREFIX + segmentIdHex(network.segmentId());
+ String tunToIntIntf = TUNNEL_TO_TENANT_PREFIX + segmentIdHex(network.segmentId());
ifaceConfig.removePatchMode(tunToIntIntf);
}
@@ -249,10 +268,14 @@
}
flowService.connectTables(deviceId, TENANT_INBOUND_TABLE, TENANT_DHCP_TABLE);
- flowService.connectTables(deviceId, TENANT_DHCP_TABLE, TENANT_FORWARDING_TABLE);
+ flowService.connectTables(deviceId, TENANT_DHCP_TABLE, TENANT_ARP_TABLE);
+ flowService.connectTables(deviceId, TENANT_ARP_TABLE, TENANT_ICMP_TABLE);
+ flowService.connectTables(deviceId, TENANT_ICMP_TABLE, TENANT_FORWARDING_TABLE);
setDhcpRule(deviceId, true);
setForwardingRule(deviceId, true);
+ setGatewayArpRule(node, network, true);
+ setGatewayIcmpRule(node, network, true);
log.info("Install default flow rules for tenant bridge {}", network.tenantBridgeName());
}
@@ -279,7 +302,7 @@
install);
}
- public void setForwardingRule(DeviceId deviceId, boolean install) {
+ private void setForwardingRule(DeviceId deviceId, boolean install) {
TrafficSelector selector = DefaultTrafficSelector.builder().build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.setOutput(PortNumber.NORMAL)
@@ -290,11 +313,71 @@
deviceId,
selector,
treatment,
- PRIORITY_SWITCHING_RULE,
+ PRIORITY_FORWARDING_RULE,
TENANT_FORWARDING_TABLE,
install);
}
+ private void setGatewayArpRule(KubevirtNode node, KubevirtNetwork network, boolean install) {
+ Device device = deviceService.getDevice(network.tenantDeviceId(node.hostname()));
+
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+ sBuilder.matchEthType(EthType.EtherType.ARP.ethType().toShort())
+ .matchArpOp(ARP.OP_REQUEST)
+ .matchArpTpa(Ip4Address.valueOf(network.gatewayIp().toString()));
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+ tBuilder.extension(buildMoveEthSrcToDstExtension(device), device.id())
+ .extension(buildMoveArpShaToThaExtension(device), device.id())
+ .extension(buildMoveArpSpaToTpaExtension(device), device.id())
+ .setArpOp(ARP.OP_REPLY)
+ .setArpSha(DEFAULT_GATEWAY_MAC)
+ .setArpSpa(Ip4Address.valueOf(network.gatewayIp().toString()))
+ .setEthSrc(DEFAULT_GATEWAY_MAC)
+ .setOutput(PortNumber.IN_PORT);
+
+ flowService.setRule(
+ appId,
+ device.id(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_ARP_GATEWAY_RULE,
+ TENANT_ARP_TABLE,
+ install
+ );
+ }
+
+ private void setGatewayIcmpRule(KubevirtNode node, KubevirtNetwork network, boolean install) {
+ DeviceId deviceId = network.tenantDeviceId(node.hostname());
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPProtocol(IPv4.PROTOCOL_ICMP)
+ .matchIcmpType(TYPE_ECHO_REQUEST)
+ .matchIcmpCode(CODE_ECHO_REQEUST)
+ .matchIPDst(IpPrefix.valueOf(network.gatewayIp(), 32));
+
+ Device device = deviceService.getDevice(deviceId);
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .extension(buildMoveEthSrcToDstExtension(device), device.id())
+ .extension(buildMoveIpSrcToDstExtension(device), device.id())
+ .extension(buildLoadExtension(device,
+ NXM_NX_IP_TTL, DEFAULT_TTL), device.id())
+ .extension(buildLoadExtension(device,
+ NXM_OF_ICMP_TYPE, TYPE_ECHO_REPLY), device.id())
+ .setIpSrc(network.gatewayIp())
+ .setEthSrc(DEFAULT_GATEWAY_MAC)
+ .setOutput(PortNumber.IN_PORT);
+
+ flowService.setRule(
+ appId,
+ deviceId,
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_ICMP_RULE,
+ TENANT_ICMP_TABLE,
+ install);
+ }
+
private class InternalNetworkEventListener implements KubevirtNetworkListener {
private boolean isRelevantHelper() {
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
index d9076da..a5379e8 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPodPortMapper.java
@@ -155,12 +155,6 @@
return;
}
- KubernetesClient client = k8sClient(kubevirtApiConfigService);
-
- if (client == null) {
- return;
- }
-
Map<String, String> annots = pod.getMetadata().getAnnotations();
if (annots == null) {
return;
@@ -206,12 +200,6 @@
return;
}
- KubernetesClient client = k8sClient(kubevirtApiConfigService);
-
- if (client == null) {
- return;
- }
-
KubevirtPort port = getPort(kubevirtNetworkAdminService.networks(), pod);
if (port == null) {
return;
@@ -244,6 +232,12 @@
Map<String, String> annots = pod.getMetadata().getAnnotations();
annots.put(NETWORK_STATUS_KEY, networkStatus.toString(4));
+ KubernetesClient client = k8sClient(kubevirtApiConfigService);
+
+ if (client == null) {
+ return;
+ }
+
client.pods().inNamespace(pod.getMetadata().getNamespace())
.withName(pod.getMetadata().getName())
.edit(r -> new PodBuilder(r)
@@ -263,12 +257,6 @@
return;
}
- KubernetesClient client = k8sClient(kubevirtApiConfigService);
-
- if (client == null) {
- return;
- }
-
KubevirtPort port = getPort(kubevirtNetworkAdminService.networks(), pod);
if (port == null) {
return;
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
index 96a31da..c8089fb 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingPhysicalHandler.java
@@ -49,7 +49,7 @@
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
import static org.onosproject.kubevirtnetworking.api.Constants.PRE_FLAT_TABLE;
-import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_SWITCHING_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FORWARDING_RULE;
import static org.onosproject.kubevirtnetworking.api.Constants.VTAG_TABLE;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.structurePortName;
import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
@@ -57,7 +57,7 @@
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Populates switching flow rules on OVS for the physical interfaces.
+ * Populates switching flow rules on OVS for the provider network (underlay).
*/
@Component(immediate = true)
public class KubevirtSwitchingPhysicalHandler {
@@ -121,7 +121,7 @@
deviceId,
selector.build(),
treatment.build(),
- PRIORITY_SWITCHING_RULE,
+ PRIORITY_FORWARDING_RULE,
VTAG_TABLE,
install);
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
new file mode 100644
index 0000000..bc124c6
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSwitchingTenantHandler.java
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpPrefix;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
+import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
+import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
+import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
+import org.onosproject.kubevirtnetworking.api.KubevirtPort;
+import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_TUNNEL_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPort;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelToTenantPort;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Populates switching flow rules on OVS for the tenant network (overlay).
+ */
+@Component(immediate = true)
+public class KubevirtSwitchingTenantHandler {
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DriverService driverService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtFlowRuleService flowRuleService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNodeService kubevirtNodeService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNetworkService kubevirtNetworkService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtPortService kubevirtPortService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtPodService kubevirtPodService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+ private final InternalKubevirtPodListener kubevirtPodListener =
+ new InternalKubevirtPodListener();
+ private final InternalKubevirtNodeListener kubevirtNodeListener =
+ new InternalKubevirtNodeListener();
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ kubevirtPodService.addListener(kubevirtPodListener);
+ kubevirtNodeService.addListener(kubevirtNodeListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ kubevirtPodService.removeListener(kubevirtPodListener);
+ kubevirtNodeService.removeListener(kubevirtNodeListener);
+ leadershipService.withdraw(appId.name());
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ private KubevirtPort getPortByPod(Pod pod) {
+ return getPort(kubevirtNetworkService.networks(), pod);
+ }
+
+ private void setIngressRules(Pod pod, boolean install) {
+ KubevirtPort port = getPortByPod(pod);
+
+ if (port == null) {
+ return;
+ }
+
+ if (port.ipAddress() == null) {
+ return;
+ }
+
+ KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
+
+ if (network == null) {
+ return;
+ }
+
+ // TODO: need to handle VLAN case
+ if (network.type() == FLAT) {
+ return;
+ }
+
+ if (network.segmentId() == null) {
+ return;
+ }
+
+ KubevirtNode localNode = kubevirtNodeService.node(pod.getSpec().getNodeName());
+ if (localNode == null || localNode.type() == MASTER) {
+ return;
+ }
+
+ PortNumber patchPortNumber = tunnelToTenantPort(localNode, network);
+ if (patchPortNumber == null) {
+ return;
+ }
+
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchTunnelId(Long.parseLong(network.segmentId()));
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setOutput(patchPortNumber);
+
+ flowRuleService.setRule(
+ appId,
+ localNode.tunBridge(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_TUNNEL_RULE,
+ TUNNEL_DEFAULT_TABLE,
+ install);
+
+ log.debug("Install ingress rules for instance {}, segment ID {}",
+ port.ipAddress(), network.segmentId());
+ }
+
+ private void setEgressRules(Pod pod, boolean install) {
+ KubevirtNode localNode = kubevirtNodeService.node(pod.getSpec().getNodeName());
+
+ if (localNode == null) {
+ return;
+ }
+
+ if (localNode.type() == MASTER) {
+ return;
+ }
+
+ KubevirtPort port = getPortByPod(pod);
+
+ if (port == null) {
+ return;
+ }
+
+ if (port.ipAddress() == null) {
+ return;
+ }
+
+ KubevirtNetwork network = kubevirtNetworkService.network(port.networkId());
+
+ if (network == null) {
+ return;
+ }
+
+ // TODO: need to handle VLAN case
+ if (network.type() == FLAT) {
+ return;
+ }
+
+ if (network.segmentId() == null) {
+ return;
+ }
+
+ for (KubevirtNode remoteNode : kubevirtNodeService.completeNodes(WORKER)) {
+ if (remoteNode.hostname().equals(localNode.hostname())) {
+ continue;
+ }
+
+ PortNumber patchPortNumber = tunnelToTenantPort(remoteNode, network);
+ if (patchPortNumber == null) {
+ return;
+ }
+
+ PortNumber tunnelPortNumber = tunnelPort(remoteNode, network);
+ if (tunnelPortNumber == null) {
+ return;
+ }
+
+ TrafficSelector.Builder sIpBuilder = DefaultTrafficSelector.builder()
+ .matchInPort(patchPortNumber)
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32));
+
+ TrafficSelector.Builder sArpBuilder = DefaultTrafficSelector.builder()
+ .matchInPort(patchPortNumber)
+ .matchEthType(Ethernet.TYPE_ARP)
+ .matchArpTpa(Ip4Address.valueOf(port.ipAddress().toString()));
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setTunnelId(Long.parseLong(network.segmentId()))
+ .extension(buildExtension(
+ deviceService,
+ remoteNode.tunBridge(),
+ localNode.dataIp().getIp4Address()),
+ remoteNode.tunBridge())
+ .setOutput(tunnelPortNumber);
+
+ flowRuleService.setRule(
+ appId,
+ remoteNode.tunBridge(),
+ sIpBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_TUNNEL_RULE,
+ TUNNEL_DEFAULT_TABLE,
+ install);
+
+ flowRuleService.setRule(
+ appId,
+ remoteNode.tunBridge(),
+ sArpBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_TUNNEL_RULE,
+ TUNNEL_DEFAULT_TABLE,
+ install);
+ }
+
+ log.debug("Install egress rules for instance {}, segment ID {}",
+ port.ipAddress(), network.segmentId());
+ }
+
+ private class InternalKubevirtNodeListener implements KubevirtNodeListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtNodeEvent event) {
+
+ switch (event.type()) {
+ case KUBEVIRT_NODE_COMPLETE:
+ eventExecutor.execute(() -> processNodeCompletion(event.subject()));
+ break;
+ case KUBEVIRT_NODE_INCOMPLETE:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processNodeCompletion(KubevirtNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ kubevirtPodService.pods().stream()
+ .filter(pod -> node.hostname().equals(pod.getSpec().getNodeName()))
+ .forEach(pod -> {
+ setIngressRules(pod, true);
+ setEgressRules(pod, true);
+ });
+ }
+ }
+
+ private class InternalKubevirtPodListener implements KubevirtPodListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtPodEvent event) {
+
+ switch (event.type()) {
+ case KUBEVIRT_POD_UPDATED:
+ eventExecutor.execute(() -> processPodUpdate(event.subject()));
+ break;
+ case KUBEVIRT_POD_REMOVED:
+ eventExecutor.execute(() -> processPodRemoval(event.subject()));
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processPodUpdate(Pod pod) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setIngressRules(pod, true);
+ setEgressRules(pod, true);
+ }
+
+ private void processPodRemoval(Pod pod) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setIngressRules(pod, false);
+ setEgressRules(pod, false);
+ }
+ }
+}