Implement kubernetes external loadbalancer handler.
Change-Id: I0f3057d66769f0ca7db7d508483835cdd1ff1593
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubernetesListServiceCommand.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubernetesListServiceCommand.java
new file mode 100644
index 0000000..47d4a71
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubernetesListServiceCommand.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.cli;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLb;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbService;
+
+import java.util.List;
+
+import static org.onosproject.kubevirtnetworking.api.Constants.CLI_IP_ADDRESS_LENGTH;
+import static org.onosproject.kubevirtnetworking.api.Constants.CLI_MAC_ADDRESS_LENGTH;
+import static org.onosproject.kubevirtnetworking.api.Constants.CLI_MARGIN_LENGTH;
+import static org.onosproject.kubevirtnetworking.api.Constants.CLI_NAME_LENGTH;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.genFormatString;
+
+/**
+ * Lists kubernetes services.
+ */
+@Service
+@Command(scope = "onos", name = "kubernetes-services",
+ description = "Lists all kubernetes services")
+public class KubernetesListServiceCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() throws Exception {
+ KubernetesExternalLbService service = get(KubernetesExternalLbService.class);
+
+ List<KubernetesExternalLb> elbList = Lists.newArrayList(service.loadBalancers());
+
+ String format = genFormatString(ImmutableList.of(CLI_NAME_LENGTH, CLI_IP_ADDRESS_LENGTH,
+ CLI_NAME_LENGTH, CLI_NAME_LENGTH, CLI_IP_ADDRESS_LENGTH, CLI_MAC_ADDRESS_LENGTH));
+
+
+ print(format, "Service Name", "Loadbalancer IP", "Elected Gateway", "Elected Worker",
+ "Loadbalancer GW IP", "Loadbalancer GW MAC");
+
+ for (KubernetesExternalLb elb : elbList) {
+ String lbIp = elb.loadBalancerIp() == null ? "N/A" : elb.loadBalancerIp().toString();
+ String electedGw = elb.electedGateway() == null ? "N/A" : elb.electedGateway();
+ String electedWorker = elb.electedWorker() == null ? "N/A" : elb.electedWorker();
+ String lbGwIp = elb.loadBalancerGwIp() == null ? "N/A" : elb.loadBalancerGwIp().toString();
+ String lbGwMac = elb.loadBalancerGwMac() == null ? "N/A" : elb.loadBalancerGwMac().toString();
+
+ print(format, StringUtils.substring(elb.serviceName(), 0,
+ CLI_NAME_LENGTH - CLI_MARGIN_LENGTH),
+ StringUtils.substring(lbIp, 0,
+ CLI_IP_ADDRESS_LENGTH - CLI_MARGIN_LENGTH),
+ StringUtils.substring(electedGw, 0,
+ CLI_NAME_LENGTH - CLI_MARGIN_LENGTH),
+ StringUtils.substring(electedWorker, 0,
+ CLI_NAME_LENGTH - CLI_MARGIN_LENGTH),
+ StringUtils.substring(lbGwIp, 0,
+ CLI_IP_ADDRESS_LENGTH - CLI_MARGIN_LENGTH),
+ StringUtils.substring(lbGwMac, 0,
+ CLI_MAC_ADDRESS_LENGTH - CLI_MARGIN_LENGTH)
+ );
+ }
+ }
+}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubernetesExternalLbStore.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubernetesExternalLbStore.java
index fce69d7..1bd286e 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubernetesExternalLbStore.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubernetesExternalLbStore.java
@@ -16,6 +16,7 @@
package org.onosproject.kubevirtnetworking.impl;
import com.google.common.collect.ImmutableSet;
+import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -47,12 +48,14 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent.Type.KUBERNETES_EXTERNAL_LOAD_BALANCER_CREATED;
+import static org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent.Type.KUBERNETES_EXTERNAL_LOAD_BALANCER_GATEWAY_CHANGED;
import static org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent.Type.KUBERNETES_EXTERNAL_LOAD_BALANCER_REMOVED;
import static org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent.Type.KUBERNETES_EXTERNAL_LOAD_BALANCER_UPDATED;
+import static org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent.Type.KUBERNETES_EXTERNAL_LOAD_BALANCER_WORKER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Implementation of kubernetes external lb store using consistent map.
+ * Implementation of kubernetes external load balancer store using consistent map.
*/
@Component(immediate = true, service = KubernetesExternalLbStore.class)
public class DistributedKubernetesExternalLbStore
@@ -69,6 +72,7 @@
.register(KryoNamespaces.API)
.register(KubernetesExternalLb.class)
.register(DefaultKubernetesExternalLb.class)
+ .register(IpAddress.class)
.register(Collection.class)
.build();
@@ -161,19 +165,44 @@
KUBERNETES_EXTERNAL_LOAD_BALANCER_CREATED, event.newValue().value())));
break;
case UPDATE:
- eventExecutor.execute(() ->
- notifyDelegate(new KubernetesExternalLbEvent(
- KUBERNETES_EXTERNAL_LOAD_BALANCER_UPDATED, event.newValue().value())));
+ eventExecutor.execute(() -> processMapUpdate(event));
break;
case REMOVE:
eventExecutor.execute(() ->
notifyDelegate(new KubernetesExternalLbEvent(
- KUBERNETES_EXTERNAL_LOAD_BALANCER_REMOVED, event.newValue().value())));
+ KUBERNETES_EXTERNAL_LOAD_BALANCER_REMOVED, event.oldValue().value())));
break;
default:
//do nothing
break;
}
}
+
+ private void processMapUpdate(MapEvent<String, KubernetesExternalLb> event) {
+ log.debug("Kubernetes External LB updated");
+
+ KubernetesExternalLb oldValue = event.oldValue().value();
+ KubernetesExternalLb newValue = event.newValue().value();
+
+ if (oldValue.electedGateway() != null && newValue.electedGateway() != null &&
+ !oldValue.electedGateway().equals(newValue.electedGateway())) {
+ notifyDelegate(new KubernetesExternalLbEvent(
+ KUBERNETES_EXTERNAL_LOAD_BALANCER_GATEWAY_CHANGED,
+ newValue, oldValue.electedGateway(), oldValue.electedWorker())
+ );
+ }
+
+ if (oldValue.electedWorker() != null && newValue.electedWorker() != null &&
+ !oldValue.electedWorker().equals(newValue.electedWorker())) {
+ notifyDelegate(new KubernetesExternalLbEvent(
+ KUBERNETES_EXTERNAL_LOAD_BALANCER_WORKER_CHANGED,
+ newValue, oldValue.electedWorker())
+ );
+ }
+
+ notifyDelegate(new KubernetesExternalLbEvent(
+ KUBERNETES_EXTERNAL_LOAD_BALANCER_UPDATED, event.newValue().value()));
+
+ }
}
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java
new file mode 100644
index 0000000..7b1c733
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2022-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import org.onlab.packet.ARP;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLb;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbAdminService;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbListener;
+import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
+import org.onosproject.kubevirtnetworking.api.KubevirtGroupRuleService;
+import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbInterface;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.net.Device;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.instructions.ExtensionTreatment;
+import org.onosproject.net.packet.PacketService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.GW_DROP_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.GW_ENTRY_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBERNETES_EXTERNAL_LB_FAKE_MAC;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ELB_DOWNSTREAM_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ELB_UPSTREAM_RULE;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.elbPatchPortNum;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.kubernetesElbMac;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.CT_NAT_SRC_FLAG;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles Kubernetes External load balancer.
+ */
+@Component(immediate = true)
+public class KubernetesExternalLbHandler {
+ protected final Logger log = getLogger(getClass());
+
+ private static final int TP_PORT_MINIMUM_NUM = 1025;
+ private static final int TP_PORT_MAXIMUM_NUM = 65535;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtApiConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtGroupRuleService groupRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PacketService packetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtFlowRuleService flowService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubernetesExternalLbAdminService externalLbService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DriverService driverService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ private final InternalKubernetesExternalLbListener lbListener =
+ new InternalKubernetesExternalLbListener();
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ externalLbService.addListener(lbListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ leadershipService.withdraw(appId.name());
+ externalLbService.removeListener(lbListener);
+
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+
+ private class InternalKubernetesExternalLbListener implements KubernetesExternalLbListener {
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubernetesExternalLbEvent event) {
+ switch (event.type()) {
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_CREATED:
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_UPDATED:
+ eventExecutor.execute(() -> processKubernetesExternalLbCreatedOrUpdated(
+ event.subject()));
+ break;
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_GATEWAY_CHANGED:
+ eventExecutor.execute(() -> processKubernetesExternalLbGatewayChanged(
+ event.subject(), event.oldGateway()));
+ break;
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_WORKER_CHANGED:
+ eventExecutor.execute(() -> processKubernetesExternalLbWorkerChanged(
+ event.subject(), event.oldWorker()));
+ break;
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_REMOVED:
+ eventExecutor.execute(() -> processKubernetesExternalLbRemoved(
+ event.subject()));
+ break;
+ default:
+ //do nothing
+ break;
+ }
+ }
+
+ private void processKubernetesExternalLbCreatedOrUpdated(KubernetesExternalLb lb) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (lb.electedGateway() == null || lb.electedWorker() == null) {
+ log.warn("processKubernetesExternalLbCreatedOrUpdated called but electedGateway " +
+ "or electedWorker is null. Stop this task.");
+ return;
+ }
+
+ log.info("processKubernetesExternalLbCreatedOrUpdated and updated elb with elecedGateway: {}", lb);
+
+ setExternalLbRulesForService(lb, true);
+ }
+
+ private void processKubernetesExternalLbGatewayChanged(KubernetesExternalLb lb, String oldGatway) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (lb.electedWorker() == null || oldGatway == null) {
+ return;
+ }
+
+ log.info("processKubernetesExternalLbGatewayChanged with oldateway: {}", oldGatway);
+
+ setExternalLbRulesForService(lb.updateElectedGateway(oldGatway), false);
+
+ setExternalLbRulesForService(lb, true);
+ }
+
+ private void processKubernetesExternalLbWorkerChanged(KubernetesExternalLb lb, String oldWorker) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (lb.electedGateway() == null || oldWorker == null) {
+ return;
+ }
+
+ log.info("processKubernetesExternalLbWorkerChanged with oldworker: {}", oldWorker);
+
+ setExternalLbRulesForService(lb.updateElectedWorker(oldWorker), false);
+
+ setExternalLbRulesForService(lb, true);
+ }
+
+
+ private void processKubernetesExternalLbRemoved(KubernetesExternalLb lb) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (lb.electedGateway() == null) {
+ return;
+ }
+
+ setExternalLbRulesForService(lb, false);
+ }
+ }
+
+ private void setExternalLbRulesForService(KubernetesExternalLb lb, boolean install) {
+ if (lb.electedGateway() == null) {
+ return;
+ }
+
+ KubevirtNode gateway = nodeService.node(lb.electedGateway());
+
+ if (gateway == null) {
+ return;
+ }
+
+ setLoadbalanceIpArpResponseRules(lb, gateway, install);
+ setDownstreamRules(lb, gateway, install);
+ setUpstreamRules(lb, gateway, install);
+ }
+
+ private void setLoadbalanceIpArpResponseRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
+
+ IpAddress loadBalancerIp = lb.loadBalancerIp();
+
+ if (loadBalancerIp == null) {
+ return;
+ }
+
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchInPort(externalPatchPortNum(deviceService, gateway))
+ .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+ .matchArpOp(ARP.OP_REQUEST)
+ .matchArpTpa(loadBalancerIp.getIp4Address())
+ .build();
+
+ Device device = deviceService.getDevice(gateway.intgBridge());
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
+ .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
+ .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
+ .setArpOp(ARP.OP_REPLY)
+ .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .setArpSha(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .setArpSpa(loadBalancerIp.getIp4Address())
+ .setOutput(PortNumber.IN_PORT)
+ .build();
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_ARP_GATEWAY_RULE,
+ GW_ENTRY_TABLE,
+ install);
+ }
+
+ private void setDownstreamRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
+
+ IpAddress loadBalancerIp = lb.loadBalancerIp();
+
+ if (loadBalancerIp == null) {
+ log.warn("setDownstreamRules called but loadBalancerIp is null. Stop this task.");
+ return;
+ }
+
+ MacAddress elbIntfMac = kubernetesElbMac(deviceService, gateway);
+ if (elbIntfMac == null) {
+ log.warn("setDownstreamRules called but elbIntfMac is null. Stop this task.");
+ return;
+ }
+
+ PortNumber elbBridgePortNum = elbPatchPortNum(deviceService, gateway);
+ if (elbBridgePortNum == null) {
+ log.warn("setDownstreamRules called but elbBridgePortNum is null. Stop this task.");
+ return;
+ }
+
+ KubernetesExternalLbInterface externalLbInterface = gateway.kubernetesExternalLbInterface();
+ if (externalLbInterface == null) {
+ log.warn("setDownstreamRules called but externalLbInterface is null. Stop this task.");
+ return;
+ }
+
+ KubevirtNode electedWorker = nodeService.node(lb.electedWorker());
+ if (electedWorker == null) {
+ log.warn("setDownstreamRules called but electedWorker is null. Stop this task.");
+ return;
+ }
+
+ lb.nodePortSet().forEach(nodeport -> {
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchEthDst(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .matchIPDst(loadBalancerIp.toIpPrefix())
+ .matchIPProtocol(IPv4.PROTOCOL_TCP)
+ .matchTcpDst(TpPort.tpPort(nodeport.intValue()))
+ .build();
+
+ ExtensionTreatment natTreatment = RulePopulatorUtil
+ .niciraConnTrackTreatmentBuilder(driverService, gateway.intgBridge())
+ .commit(true)
+ .natFlag(CT_NAT_SRC_FLAG)
+ .natAction(true)
+ .natIp(externalLbInterface.externalLbIp())
+ .natPortMin(TpPort.tpPort(TP_PORT_MINIMUM_NUM))
+ .natPortMax(TpPort.tpPort(TP_PORT_MAXIMUM_NUM))
+ .build();
+
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .extension(natTreatment, gateway.intgBridge())
+ .setEthSrc(elbIntfMac)
+ .setEthDst(externalLbInterface.externalLbGwMac())
+ .setIpDst(electedWorker.dataIp())
+ .setOutput(elbBridgePortNum)
+ .build();
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_ELB_DOWNSTREAM_RULE,
+ GW_ENTRY_TABLE,
+ install);
+ });
+ }
+
+ private void setUpstreamRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
+ IpAddress loadBalancerIp = lb.loadBalancerIp();
+
+ if (loadBalancerIp == null) {
+ log.warn("setUpstreamRules called but loadBalancerIp is null. Stop this task.");
+ return;
+ }
+
+ MacAddress elbIntfMac = kubernetesElbMac(deviceService, gateway);
+ if (elbIntfMac == null) {
+ log.warn("setUpstreamRules called but elbIntfMac is null. Stop this task.");
+ return;
+ }
+
+ PortNumber elbBridgePortNum = elbPatchPortNum(deviceService, gateway);
+ if (elbBridgePortNum == null) {
+ log.warn("setUpstreamRules called but elbBridgePortNum is null. Stop this task.");
+ return;
+ }
+
+ PortNumber externalPatchPortNum = externalPatchPortNum(deviceService, gateway);
+ if (externalPatchPortNum == null) {
+ log.warn("setUpstreamRules called but externalPatchPortNum is null. Stop this task.");
+ return;
+ }
+
+ KubernetesExternalLbInterface externalLbInterface = gateway.kubernetesExternalLbInterface();
+ if (externalLbInterface == null) {
+ log.warn("setUpstreamRules called but externalLbInterface is null. Stop this task.");
+ return;
+ }
+
+
+ KubevirtNode electedWorker = nodeService.node(lb.electedWorker());
+ if (electedWorker == null) {
+ log.warn("setDownstreamRules called but electedWorker is null. Stop this task.");
+ return;
+ }
+
+ lb.nodePortSet().forEach(nodePort -> {
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPSrc(electedWorker.dataIp().toIpPrefix())
+ .matchIPDst(externalLbInterface.externalLbIp().toIpPrefix())
+ .matchIPProtocol(IPv4.PROTOCOL_TCP)
+ .matchTcpSrc(TpPort.tpPort(nodePort.intValue()));
+
+ ExtensionTreatment natTreatment = RulePopulatorUtil
+ .niciraConnTrackTreatmentBuilder(driverService, gateway.intgBridge())
+ .commit(false)
+ .natAction(true)
+ .table((short) GW_DROP_TABLE)
+ .build();
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .setIpSrc(lb.loadBalancerIp())
+ .setEthDst(lb.loadBalancerGwMac())
+ .extension(natTreatment, gateway.intgBridge())
+ .transition(GW_DROP_TABLE);
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_ELB_UPSTREAM_RULE,
+ GW_ENTRY_TABLE,
+ install);
+
+ sBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPProtocol(IPv4.PROTOCOL_TCP)
+ .matchTcpSrc(TpPort.tpPort(nodePort.intValue()));
+
+
+ tBuilder = DefaultTrafficTreatment.builder()
+ .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .setIpSrc(lb.loadBalancerIp())
+ .setEthDst(lb.loadBalancerGwMac())
+ .setOutput(externalPatchPortNum);
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_ELB_UPSTREAM_RULE,
+ GW_DROP_TABLE,
+ install);
+ });
+ }
+}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesServiceWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesServiceWatcher.java
new file mode 100644
index 0000000..2aec742
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesServiceWatcher.java
@@ -0,0 +1,529 @@
+/*
+ * Copyright 2022-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.LoadBalancerIngress;
+import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.DefaultKubernetesExternalLb;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLb;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbAdminService;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfig;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigEvent;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigListener;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedService;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.workerNodeForSpecifiedService;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Kubernetes service watcher used for external loadbalancing among PODs.
+ */
+@Component(immediate = true)
+public class KubernetesServiceWatcher {
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtApiConfigService apiConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubernetesExternalLbConfigService lbConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubernetesExternalLbAdminService adminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNodeService nodeService;
+
+ private static final String KUBE_DASH_VIP = "kube-vip";
+ private static final String KUBE_VIP = "kubevip";
+ private static final String LOADBALANCER_IP = "loadBalancerIP";
+ private static final String TYPE_LOADBALANCER = "LoadBalancer";
+ private static final String KUBE_SYSTEM = "kube-system";
+ private static final String GATEWAY_IP = "gateway-ip";
+ private static final String GATEWAY_MAC = "gateway-mac";
+ private static final String DEFAULT = "default";
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+ private final InternalKubevirtApiConfigListener
+ apiConfigListener = new InternalKubevirtApiConfigListener();
+
+ private final InternalKubernetesServiceWatcher
+ serviceWatcher = new InternalKubernetesServiceWatcher();
+
+ private final InternalKubernetesExternalLbConfigListener
+ lbConfigListener = new InternalKubernetesExternalLbConfigListener();
+
+ private final InternalNodeEventListener
+ nodeEventListener = new InternalNodeEventListener();
+
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+
+ apiConfigService.addListener(apiConfigListener);
+ lbConfigService.addListener(lbConfigListener);
+ nodeService.addListener(nodeEventListener);
+
+ log.info("Started");
+ }
+
+
+ @Deactivate
+ protected void deactivate() {
+ leadershipService.withdraw(appId.name());
+
+ apiConfigService.removeListener(apiConfigListener);
+ lbConfigService.removeListener(lbConfigListener);
+ nodeService.removeListener(nodeEventListener);
+
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ private void instantiateWatcher() {
+ KubernetesClient client = k8sClient(apiConfigService);
+
+ if (client != null) {
+ client.services().inAnyNamespace().watch(serviceWatcher);
+ }
+ }
+
+ private class InternalKubernetesExternalLbConfigListener
+ implements KubernetesExternalLbConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubernetesExternalLbConfigEvent event) {
+ switch (event.type()) {
+ case KUBERNETES_EXTERNAL_LB_CONFIG_CREATED:
+ case KUBERNETES_EXTERNAL_LB_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdate);
+ break;
+ case KUBERNETES_EXTERNAL_LB_CONFIG_REMOVED:
+ default:
+ //do nothing
+ break;
+ }
+ }
+
+ private void processConfigUpdate() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+ addOrUpdateExternalLoadBalancers();
+ }
+ }
+
+ private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtApiConfigEvent event) {
+
+ switch (event.type()) {
+ case KUBEVIRT_API_CONFIG_UPDATED:
+ eventExecutor.execute(this::processConfigUpdate);
+ break;
+ case KUBEVIRT_API_CONFIG_CREATED:
+ case KUBEVIRT_API_CONFIG_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processConfigUpdate() {
+ if (!isRelevantHelper()) {
+ return;
+ }
+ instantiateWatcher();
+ addOrUpdateExternalLoadBalancers();
+ }
+ }
+
+ private class InternalKubernetesServiceWatcher implements Watcher<Service> {
+
+ @Override
+ public void eventReceived(Action action, Service service) {
+ switch (action) {
+ case ADDED:
+ log.info("Service event ADDED received ");
+ eventExecutor.execute(() -> processAddOrMod(service));
+ break;
+ case MODIFIED:
+ log.info("Service event MODIFIED received");
+ eventExecutor.execute(() -> processAddOrMod(service));
+ break;
+ case DELETED:
+ log.info("Service event DELETED received");
+ eventExecutor.execute(() -> processDeletion(service));
+ break;
+ case ERROR:
+ log.warn("Failures processing pod manipulation.");
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void onClose(WatcherException e) {
+ // due to the bugs in fabric8, pod watcher might be closed,
+ // we will re-instantiate the pod watcher in this case
+ // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
+ log.warn("Service watcher OnClose, re-instantiate the POD watcher...");
+ instantiateWatcher();
+ }
+
+ private void processAddOrMod(Service service) {
+ if (service == null || !isMaster()) {
+ return;
+ }
+
+ if (!configMapUpdated()) {
+ log.warn("Config map is not set yet. Stop this task");
+ return;
+ }
+
+ try {
+ if (addOrUpdateExternalLoadBalancer(service) &&
+ !isLoadBalancerStatusAlreadySet(service)) {
+ serviceStatusUpdate(service);
+ }
+ } catch (Exception e) {
+ log.error("Exception occurred because of {}", e.toString());
+ }
+ }
+
+ private void processDeletion(Service service) {
+ if (service == null || !isMaster()) {
+ return;
+ }
+ if (isLoadBalancerType(service) &&
+ isKubeVipCloudProviderLabelIsSet(service)) {
+ KubernetesExternalLb lb = adminService.loadBalancer(service.getMetadata().getName());
+
+ if (lb == null) {
+ return;
+ }
+
+ adminService.removeExternalLb(lb.serviceName());
+ }
+ }
+ private boolean isMaster() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+ }
+
+
+ //When api config or configmap updated, check every prerequisite and update all external load balancers
+ private void addOrUpdateExternalLoadBalancers() {
+ KubernetesClient client = k8sClient(apiConfigService);
+
+ client.services().inNamespace(DEFAULT).list()
+ .getItems().forEach(this::addOrUpdateExternalLoadBalancer);
+ }
+
+ private boolean addOrUpdateExternalLoadBalancer(Service service) {
+ if (isLoadBalancerType(service) &&
+ isKubeVipCloudProviderLabelIsSet(service)) {
+
+ KubernetesExternalLb lb = parseKubernetesExternalLb(service);
+ if (lb == null) {
+ log.warn("Failed to parse the kubernetes external lb");
+ return false;
+ }
+
+ KubevirtNode electedGatewayNode = gatewayNodeForSpecifiedService(nodeService, lb);
+ if (electedGatewayNode == null) {
+ log.warn("Service created but there's no gateway nodes ready. Stop this task.");
+ return false;
+ }
+
+ lb = lb.updateElectedGateway(electedGatewayNode.hostname());
+
+ KubevirtNode electedWorkerNode = workerNodeForSpecifiedService(nodeService, lb);
+ if (electedWorkerNode == null) {
+ log.warn("Service created but there's no worker nodes ready. Stop this task.");
+ return false;
+ }
+ lb = lb.updateElectedWorker(electedWorkerNode.hostname());
+
+ log.trace("processAddOrMod called and parsed lb is {}", lb);
+
+ if (adminService.loadBalancer(lb.serviceName()) == null) {
+ adminService.createExternalLb(lb);
+ } else {
+ adminService.updateExternalLb(lb);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void serviceStatusUpdate(Service service) {
+ KubernetesClient client = k8sClient(apiConfigService);
+
+ String lbIp = service.getSpec().getLoadBalancerIP();
+ if (lbIp == null) {
+ return;
+ }
+
+ LoadBalancerIngress lbIngress = new LoadBalancerIngress(KUBE_VIP, lbIp, Lists.newArrayList());
+
+ service.getStatus().getLoadBalancer().setIngress(Lists.newArrayList(lbIngress));
+
+ //When a service is deleted, the event MODIFED is also along with DELETED event
+ //So filter out this MODIFIED events
+ if (client.services().withName(service.getMetadata().getName()) != null) {
+ client.services().patchStatus(service);
+ }
+ }
+
+ //Only process if the event when the kube-vip-cloud-provider label is set
+ // and loadbalancer status is not set.
+ private boolean isKubeVipCloudProviderLabelIsSet(Service service) {
+ log.trace("isKubeVipCloudProviderLabelIsSet called with labels {}", service.getMetadata().getLabels());
+ if (service.getMetadata().getLabels() == null) {
+ return false;
+ }
+
+ return service.getMetadata().getLabels().containsValue(KUBE_DASH_VIP);
+ }
+
+ private boolean isLoadBalancerStatusAlreadySet(Service service) {
+ log.trace("isLoadBalancerStatusAlreadySet called with status {}", service.getStatus());
+
+ LoadBalancerStatus lbStatus = service.getStatus().getLoadBalancer();
+ if (lbStatus.getIngress().isEmpty()) {
+ return false;
+ }
+
+ String lbIp = service.getSpec().getLoadBalancerIP();
+ if (lbIp == null) {
+ return false;
+ }
+
+ return lbStatus.getIngress().stream()
+ .filter(lbIngress -> Objects.equals(lbIngress.getIp(), lbIp))
+ .findAny().isPresent();
+ }
+
+ private boolean configMapUpdated() {
+ KubernetesExternalLbConfig config = lbConfigService.lbConfigs().stream().findAny().orElse(null);
+
+ if (config == null) {
+ return false;
+ }
+
+ return config.configName() != null && config.globalIpRange() != null &&
+ config.loadBalancerGwIp() != null && config.loadBalancerGwMac() != null;
+ }
+
+ //Only process if the event when the service type is LoadBalancer
+ private boolean isLoadBalancerType(Service service) {
+ return service.getSpec().getType().equals(TYPE_LOADBALANCER);
+ }
+
+ private KubernetesExternalLb parseKubernetesExternalLb(Service service) {
+ if (service.getMetadata() == null || service.getSpec() == null) {
+ return null;
+ }
+
+ String serviceName = service.getMetadata().getName();
+ if (serviceName == null) {
+ return null;
+ }
+
+ String lbIp = service.getSpec().getLoadBalancerIP();
+ if (lbIp == null) {
+ return null;
+ }
+
+ Set<Integer> nodePortSet = Sets.newHashSet();
+ Set<Integer> portSet = Sets.newHashSet();
+ Set<String> endpointSet = Sets.newHashSet();
+
+ service.getSpec().getPorts().forEach(servicePort -> {
+ nodePortSet.add(servicePort.getNodePort());
+ portSet.add(servicePort.getPort());
+ });
+
+ nodeService.completeNodes(WORKER).forEach(workerNode -> {
+ endpointSet.add(workerNode.dataIp().toString());
+ });
+
+ String loadbalancerGatewayIp = loadBalancerGatewayIp();
+
+ if (loadbalancerGatewayIp == null) {
+ log.error("Can't find the loadbalancer gateway ip in the kubevip configmap.." +
+ "Failed to parse kubernetes external lb and return null");
+ return null;
+ }
+
+ MacAddress loadBalancerGatewayMac = loadBalancerGatewayMac();
+
+ if (loadbalancerGatewayIp == null) {
+ log.error("Can't find the loadbalancer gateway mac in the kubevip configmap.." +
+ "Failed to parse kubernetes external lb and return null");
+ return null;
+ }
+
+ return DefaultKubernetesExternalLb.builder().serviceName(serviceName)
+ .loadBalancerIp(IpAddress.valueOf(lbIp))
+ .nodePortSet(nodePortSet)
+ .portSet(portSet)
+ .endpointSet(endpointSet)
+ .loadBalancerGwIp(IpAddress.valueOf(loadbalancerGatewayIp))
+ .loadBalancerGwMac(loadBalancerGatewayMac)
+ .build();
+ }
+
+ private String loadBalancerGatewayIp() {
+ KubernetesClient client = k8sClient(apiConfigService);
+
+ Resource<ConfigMap> kubeVipConfigMapResource =
+ client.configMaps().inNamespace(KUBE_SYSTEM).withName(KUBE_VIP);
+
+ if (kubeVipConfigMapResource == null) {
+ return null;
+ }
+
+ Map<String, String> kubeVipConfigMap = kubeVipConfigMapResource.get().getData();
+
+ if (!kubeVipConfigMap.containsKey(GATEWAY_IP)) {
+ return null;
+ }
+
+ return kubeVipConfigMap.get(GATEWAY_IP);
+ }
+
+ private MacAddress loadBalancerGatewayMac() {
+ KubernetesExternalLbConfig config = lbConfigService.lbConfigs().stream().findAny().orElse(null);
+
+ if (config == null) {
+ return null;
+ }
+
+ return config.loadBalancerGwMac();
+ }
+
+ private class InternalNodeEventListener implements KubevirtNodeListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubevirtNodeEvent event) {
+ switch (event.type()) {
+ case KUBEVIRT_NODE_COMPLETE:
+ eventExecutor.execute(() -> processNodeCompletion(event.subject()));
+ break;
+ case KUBEVIRT_NODE_INCOMPLETE:
+ case KUBEVIRT_NODE_REMOVED:
+ eventExecutor.execute(() -> processNodeDeletion(event.subject()));
+ break;
+ case KUBEVIRT_NODE_UPDATED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processNodeCompletion(KubevirtNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+ addOrUpdateExternalLoadBalancers();
+ }
+
+ private void processNodeDeletion(KubevirtNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+ addOrUpdateExternalLoadBalancers();
+ }
+ }
+}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
index 5bc1548..1d0a2e9 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtNetworkHandler.java
@@ -45,6 +45,8 @@
import org.onosproject.kubevirtnetworking.api.KubevirtRouterAdminService;
import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
import org.onosproject.kubevirtnetworking.api.KubevirtRouterListener;
+import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbInterface;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
import org.onosproject.kubevirtnode.api.KubevirtNode;
import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
@@ -109,10 +111,12 @@
import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_TO_TENANT_PREFIX;
import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.elbPatchPortNum;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedRouter;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterForKubevirtNetwork;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterForKubevirtPort;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterMacAddress;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.kubernetesElbMac;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.portNumber;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.resolveHostname;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.segmentIdHex;
@@ -1324,6 +1328,12 @@
}
} else if (node.type().equals(GATEWAY)) {
updateGatewayNodeForRouter();
+
+ KubernetesExternalLbInterface externalLbInterface = node.kubernetesExternalLbInterface();
+
+ if (externalLbInterface != null) {
+ setElbInternalIpArpResponseRules(node, true);
+ }
}
}
@@ -1358,6 +1368,49 @@
kubevirtRouterService.updateRouter(router.updatedElectedGateway(newGwNode.hostname()));
});
}
+
+ private void setElbInternalIpArpResponseRules(KubevirtNode gateway, boolean install) {
+ KubernetesExternalLbInterface externalLbInterface = gateway.kubernetesExternalLbInterface();
+ if (externalLbInterface == null) {
+ return;
+ }
+
+ IpAddress eIp = externalLbInterface.externalLbIp();
+
+ MacAddress elbIntfMac = kubernetesElbMac(deviceService, gateway);
+ if (elbIntfMac == null) {
+ return;
+ }
+
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchInPort(elbPatchPortNum(deviceService, gateway))
+ .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+ .matchArpOp(ARP.OP_REQUEST)
+ .matchArpTpa(eIp.getIp4Address())
+ .build();
+
+ Device device = deviceService.getDevice(gateway.intgBridge());
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
+ .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
+ .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
+ .setArpOp(ARP.OP_REPLY)
+ .setEthSrc(elbIntfMac)
+ .setArpSha(elbIntfMac)
+ .setArpSpa(eIp.getIp4Address())
+ .setOutput(PortNumber.IN_PORT)
+ .build();
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_ARP_GATEWAY_RULE,
+ GW_ENTRY_TABLE,
+ install);
+ }
}
private class InternalKubevirtPortListener implements KubevirtPortListener {
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
index 3496008..539323f 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
@@ -53,6 +53,7 @@
import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbInterface;
import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
import org.onosproject.kubevirtnode.api.KubevirtNode;
@@ -89,6 +90,7 @@
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.OTHER;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.onosproject.net.AnnotationKeys.PORT_MAC;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
/**
@@ -545,7 +547,7 @@
/**
* Returns the gateway node for the specified kubernetes external lb.
* Among gateways, only one gateway would act as a gateway per external lb.
- * Currently gateway node is selected based on modulo operation with router hashcode.
+ * Currently gateway node is selected based on modulo operation with external lb hashcode.
*
* @param nodeService kubevirt node service
* @param externalLb kubernetes external lb
@@ -565,6 +567,28 @@
}
/**
+ * Returns the worker node for the specified kubernetes external lb.
+ * Among worker nodes, only one worker would serve the traffic from and to the gateway.
+ * Currently worker node is selected based on modulo operation with external lb hashcode.
+ *
+ * @param nodeService kubevirt node service
+ * @param externalLb kubernetes external lb
+ * @return elected worker node
+ */
+ public static KubevirtNode workerNodeForSpecifiedService(KubevirtNodeService nodeService,
+ KubernetesExternalLb externalLb) {
+ //TODO: enhance election logic for a better load balancing
+
+ int numOfWorkers = nodeService.completeNodes(WORKER).size();
+ if (numOfWorkers == 0) {
+ return null;
+ }
+
+ return (KubevirtNode) nodeService.completeNodes(WORKER)
+ .toArray()[externalLb.hashCode() % numOfWorkers];
+ }
+
+ /**
* Returns the mac address of the router.
*
* @param router kubevirt router
@@ -659,6 +683,73 @@
}
/**
+ * Returns the external lb patch port number with specified gateway.
+ *
+ * @param deviceService device service
+ * @param gateway gateway node
+ * @return external lb bridge patch port number
+ */
+ public static PortNumber elbPatchPortNum(DeviceService deviceService, KubevirtNode gateway) {
+ KubernetesExternalLbInterface kubernetesExternalLbInterface =
+ gateway.kubernetesExternalLbInterface();
+
+ if (kubernetesExternalLbInterface == null) {
+ log.warn("No elb interface is attached to gateway {}", gateway.hostname());
+ return null;
+ }
+
+ String elbBridgeName = kubernetesExternalLbInterface.externalLbBridgeName();
+
+ String patchPortName = "int-to-" + elbBridgeName;
+
+ Port port = deviceService.getPorts(gateway.intgBridge()).stream()
+ .filter(p -> p.isEnabled() &&
+ Objects.equals(p.annotations().value(PORT_NAME), patchPortName))
+ .findAny().orElse(null);
+
+ return port != null ? port.number() : null;
+ }
+
+ /**
+ * Returns the external lb patch port Mac with specified gateway.
+ *
+ * @param deviceService device service
+ * @param gateway gateway node
+ * @return external lb bridge patch Mac Address
+ */
+ public static MacAddress kubernetesElbMac(DeviceService deviceService, KubevirtNode gateway) {
+
+ KubernetesExternalLbInterface kubernetesExternalLbInterface =
+ gateway.kubernetesExternalLbInterface();
+
+ if (kubernetesExternalLbInterface == null) {
+ log.warn("No elb interface is attached to gateway {}", gateway.hostname());
+ return null;
+ }
+
+ String elbBridgeName = kubernetesExternalLbInterface.externalLbBridgeName();
+
+ String patchPortName = "int-to-" + elbBridgeName;
+
+ Port port = deviceService.getPorts(gateway.intgBridge()).stream()
+ .filter(p -> p.isEnabled() &&
+ Objects.equals(p.annotations().value(PORT_NAME), patchPortName))
+ .findAny().orElse(null);
+
+ if (port == null) {
+ return null;
+ }
+
+ String portMacStr = port.annotations().value(PORT_MAC);
+
+ if (portMacStr == null) {
+ return null;
+ }
+
+ return MacAddress.valueOf(portMacStr);
+ }
+
+ /**
* Returns the kubevirt external network with specified router.
*
* @param networkService kubevirt network service