Implement kubernetes external loadbalancer handler.
Change-Id: I0f3057d66769f0ca7db7d508483835cdd1ff1593
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java
new file mode 100644
index 0000000..7b1c733
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2022-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import org.onlab.packet.ARP;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLb;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbAdminService;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent;
+import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbListener;
+import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
+import org.onosproject.kubevirtnetworking.api.KubevirtGroupRuleService;
+import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbInterface;
+import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.net.Device;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.instructions.ExtensionTreatment;
+import org.onosproject.net.packet.PacketService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.GW_DROP_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.GW_ENTRY_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBERNETES_EXTERNAL_LB_FAKE_MAC;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ELB_DOWNSTREAM_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ELB_UPSTREAM_RULE;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.elbPatchPortNum;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.kubernetesElbMac;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.CT_NAT_SRC_FLAG;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles Kubernetes External load balancer.
+ */
+@Component(immediate = true)
+public class KubernetesExternalLbHandler {
+ protected final Logger log = getLogger(getClass());
+
+ private static final int TP_PORT_MINIMUM_NUM = 1025;
+ private static final int TP_PORT_MAXIMUM_NUM = 65535;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtApiConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNodeService nodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtGroupRuleService groupRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PacketService packetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtFlowRuleService flowService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubernetesExternalLbAdminService externalLbService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DriverService driverService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ private final InternalKubernetesExternalLbListener lbListener =
+ new InternalKubernetesExternalLbListener();
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ externalLbService.addListener(lbListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ leadershipService.withdraw(appId.name());
+ externalLbService.removeListener(lbListener);
+
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+
+ private class InternalKubernetesExternalLbListener implements KubernetesExternalLbListener {
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubernetesExternalLbEvent event) {
+ switch (event.type()) {
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_CREATED:
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_UPDATED:
+ eventExecutor.execute(() -> processKubernetesExternalLbCreatedOrUpdated(
+ event.subject()));
+ break;
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_GATEWAY_CHANGED:
+ eventExecutor.execute(() -> processKubernetesExternalLbGatewayChanged(
+ event.subject(), event.oldGateway()));
+ break;
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_WORKER_CHANGED:
+ eventExecutor.execute(() -> processKubernetesExternalLbWorkerChanged(
+ event.subject(), event.oldWorker()));
+ break;
+ case KUBERNETES_EXTERNAL_LOAD_BALANCER_REMOVED:
+ eventExecutor.execute(() -> processKubernetesExternalLbRemoved(
+ event.subject()));
+ break;
+ default:
+ //do nothing
+ break;
+ }
+ }
+
+ private void processKubernetesExternalLbCreatedOrUpdated(KubernetesExternalLb lb) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (lb.electedGateway() == null || lb.electedWorker() == null) {
+ log.warn("processKubernetesExternalLbCreatedOrUpdated called but electedGateway " +
+ "or electedWorker is null. Stop this task.");
+ return;
+ }
+
+ log.info("processKubernetesExternalLbCreatedOrUpdated and updated elb with elecedGateway: {}", lb);
+
+ setExternalLbRulesForService(lb, true);
+ }
+
+ private void processKubernetesExternalLbGatewayChanged(KubernetesExternalLb lb, String oldGatway) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (lb.electedWorker() == null || oldGatway == null) {
+ return;
+ }
+
+ log.info("processKubernetesExternalLbGatewayChanged with oldateway: {}", oldGatway);
+
+ setExternalLbRulesForService(lb.updateElectedGateway(oldGatway), false);
+
+ setExternalLbRulesForService(lb, true);
+ }
+
+ private void processKubernetesExternalLbWorkerChanged(KubernetesExternalLb lb, String oldWorker) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (lb.electedGateway() == null || oldWorker == null) {
+ return;
+ }
+
+ log.info("processKubernetesExternalLbWorkerChanged with oldworker: {}", oldWorker);
+
+ setExternalLbRulesForService(lb.updateElectedWorker(oldWorker), false);
+
+ setExternalLbRulesForService(lb, true);
+ }
+
+
+ private void processKubernetesExternalLbRemoved(KubernetesExternalLb lb) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (lb.electedGateway() == null) {
+ return;
+ }
+
+ setExternalLbRulesForService(lb, false);
+ }
+ }
+
+ private void setExternalLbRulesForService(KubernetesExternalLb lb, boolean install) {
+ if (lb.electedGateway() == null) {
+ return;
+ }
+
+ KubevirtNode gateway = nodeService.node(lb.electedGateway());
+
+ if (gateway == null) {
+ return;
+ }
+
+ setLoadbalanceIpArpResponseRules(lb, gateway, install);
+ setDownstreamRules(lb, gateway, install);
+ setUpstreamRules(lb, gateway, install);
+ }
+
+ private void setLoadbalanceIpArpResponseRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
+
+ IpAddress loadBalancerIp = lb.loadBalancerIp();
+
+ if (loadBalancerIp == null) {
+ return;
+ }
+
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchInPort(externalPatchPortNum(deviceService, gateway))
+ .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+ .matchArpOp(ARP.OP_REQUEST)
+ .matchArpTpa(loadBalancerIp.getIp4Address())
+ .build();
+
+ Device device = deviceService.getDevice(gateway.intgBridge());
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
+ .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
+ .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
+ .setArpOp(ARP.OP_REPLY)
+ .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .setArpSha(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .setArpSpa(loadBalancerIp.getIp4Address())
+ .setOutput(PortNumber.IN_PORT)
+ .build();
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_ARP_GATEWAY_RULE,
+ GW_ENTRY_TABLE,
+ install);
+ }
+
+ private void setDownstreamRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
+
+ IpAddress loadBalancerIp = lb.loadBalancerIp();
+
+ if (loadBalancerIp == null) {
+ log.warn("setDownstreamRules called but loadBalancerIp is null. Stop this task.");
+ return;
+ }
+
+ MacAddress elbIntfMac = kubernetesElbMac(deviceService, gateway);
+ if (elbIntfMac == null) {
+ log.warn("setDownstreamRules called but elbIntfMac is null. Stop this task.");
+ return;
+ }
+
+ PortNumber elbBridgePortNum = elbPatchPortNum(deviceService, gateway);
+ if (elbBridgePortNum == null) {
+ log.warn("setDownstreamRules called but elbBridgePortNum is null. Stop this task.");
+ return;
+ }
+
+ KubernetesExternalLbInterface externalLbInterface = gateway.kubernetesExternalLbInterface();
+ if (externalLbInterface == null) {
+ log.warn("setDownstreamRules called but externalLbInterface is null. Stop this task.");
+ return;
+ }
+
+ KubevirtNode electedWorker = nodeService.node(lb.electedWorker());
+ if (electedWorker == null) {
+ log.warn("setDownstreamRules called but electedWorker is null. Stop this task.");
+ return;
+ }
+
+ lb.nodePortSet().forEach(nodeport -> {
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchEthDst(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .matchIPDst(loadBalancerIp.toIpPrefix())
+ .matchIPProtocol(IPv4.PROTOCOL_TCP)
+ .matchTcpDst(TpPort.tpPort(nodeport.intValue()))
+ .build();
+
+ ExtensionTreatment natTreatment = RulePopulatorUtil
+ .niciraConnTrackTreatmentBuilder(driverService, gateway.intgBridge())
+ .commit(true)
+ .natFlag(CT_NAT_SRC_FLAG)
+ .natAction(true)
+ .natIp(externalLbInterface.externalLbIp())
+ .natPortMin(TpPort.tpPort(TP_PORT_MINIMUM_NUM))
+ .natPortMax(TpPort.tpPort(TP_PORT_MAXIMUM_NUM))
+ .build();
+
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .extension(natTreatment, gateway.intgBridge())
+ .setEthSrc(elbIntfMac)
+ .setEthDst(externalLbInterface.externalLbGwMac())
+ .setIpDst(electedWorker.dataIp())
+ .setOutput(elbBridgePortNum)
+ .build();
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_ELB_DOWNSTREAM_RULE,
+ GW_ENTRY_TABLE,
+ install);
+ });
+ }
+
+ private void setUpstreamRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
+ IpAddress loadBalancerIp = lb.loadBalancerIp();
+
+ if (loadBalancerIp == null) {
+ log.warn("setUpstreamRules called but loadBalancerIp is null. Stop this task.");
+ return;
+ }
+
+ MacAddress elbIntfMac = kubernetesElbMac(deviceService, gateway);
+ if (elbIntfMac == null) {
+ log.warn("setUpstreamRules called but elbIntfMac is null. Stop this task.");
+ return;
+ }
+
+ PortNumber elbBridgePortNum = elbPatchPortNum(deviceService, gateway);
+ if (elbBridgePortNum == null) {
+ log.warn("setUpstreamRules called but elbBridgePortNum is null. Stop this task.");
+ return;
+ }
+
+ PortNumber externalPatchPortNum = externalPatchPortNum(deviceService, gateway);
+ if (externalPatchPortNum == null) {
+ log.warn("setUpstreamRules called but externalPatchPortNum is null. Stop this task.");
+ return;
+ }
+
+ KubernetesExternalLbInterface externalLbInterface = gateway.kubernetesExternalLbInterface();
+ if (externalLbInterface == null) {
+ log.warn("setUpstreamRules called but externalLbInterface is null. Stop this task.");
+ return;
+ }
+
+
+ KubevirtNode electedWorker = nodeService.node(lb.electedWorker());
+ if (electedWorker == null) {
+ log.warn("setDownstreamRules called but electedWorker is null. Stop this task.");
+ return;
+ }
+
+ lb.nodePortSet().forEach(nodePort -> {
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPSrc(electedWorker.dataIp().toIpPrefix())
+ .matchIPDst(externalLbInterface.externalLbIp().toIpPrefix())
+ .matchIPProtocol(IPv4.PROTOCOL_TCP)
+ .matchTcpSrc(TpPort.tpPort(nodePort.intValue()));
+
+ ExtensionTreatment natTreatment = RulePopulatorUtil
+ .niciraConnTrackTreatmentBuilder(driverService, gateway.intgBridge())
+ .commit(false)
+ .natAction(true)
+ .table((short) GW_DROP_TABLE)
+ .build();
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+ .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .setIpSrc(lb.loadBalancerIp())
+ .setEthDst(lb.loadBalancerGwMac())
+ .extension(natTreatment, gateway.intgBridge())
+ .transition(GW_DROP_TABLE);
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_ELB_UPSTREAM_RULE,
+ GW_ENTRY_TABLE,
+ install);
+
+ sBuilder = DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPProtocol(IPv4.PROTOCOL_TCP)
+ .matchTcpSrc(TpPort.tpPort(nodePort.intValue()));
+
+
+ tBuilder = DefaultTrafficTreatment.builder()
+ .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .setIpSrc(lb.loadBalancerIp())
+ .setEthDst(lb.loadBalancerGwMac())
+ .setOutput(externalPatchPortNum);
+
+ flowService.setRule(
+ appId,
+ gateway.intgBridge(),
+ sBuilder.build(),
+ tBuilder.build(),
+ PRIORITY_ELB_UPSTREAM_RULE,
+ GW_DROP_TABLE,
+ install);
+ });
+ }
+}