Retrieve MAC address when kubernetes external lb config or interface has only IP address.
Change-Id: I79538f262a1a5d9bceb20d461743212ec6717d5a
(cherry picked from commit 6a83ee6e704e035f8349f93c5bbd8d3b251fe2d5)
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubernetesListServiceCommand.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubernetesListServiceCommand.java
index 47d4a71..fcb7188 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubernetesListServiceCommand.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/cli/KubernetesListServiceCommand.java
@@ -27,6 +27,7 @@
import java.util.List;
import static org.onosproject.kubevirtnetworking.api.Constants.CLI_IP_ADDRESS_LENGTH;
+import static org.onosproject.kubevirtnetworking.api.Constants.CLI_LONG_SERVICE_PORT_LENGTH;
import static org.onosproject.kubevirtnetworking.api.Constants.CLI_MAC_ADDRESS_LENGTH;
import static org.onosproject.kubevirtnetworking.api.Constants.CLI_MARGIN_LENGTH;
import static org.onosproject.kubevirtnetworking.api.Constants.CLI_NAME_LENGTH;
@@ -47,11 +48,12 @@
List<KubernetesExternalLb> elbList = Lists.newArrayList(service.loadBalancers());
String format = genFormatString(ImmutableList.of(CLI_NAME_LENGTH, CLI_IP_ADDRESS_LENGTH,
- CLI_NAME_LENGTH, CLI_NAME_LENGTH, CLI_IP_ADDRESS_LENGTH, CLI_MAC_ADDRESS_LENGTH));
+ CLI_NAME_LENGTH, CLI_NAME_LENGTH, CLI_IP_ADDRESS_LENGTH, CLI_MAC_ADDRESS_LENGTH,
+ CLI_LONG_SERVICE_PORT_LENGTH));
print(format, "Service Name", "Loadbalancer IP", "Elected Gateway", "Elected Worker",
- "Loadbalancer GW IP", "Loadbalancer GW MAC");
+ "Loadbalancer GW IP", "Loadbalancer GW MAC", "Service Port");
for (KubernetesExternalLb elb : elbList) {
String lbIp = elb.loadBalancerIp() == null ? "N/A" : elb.loadBalancerIp().toString();
@@ -59,6 +61,7 @@
String electedWorker = elb.electedWorker() == null ? "N/A" : elb.electedWorker();
String lbGwIp = elb.loadBalancerGwIp() == null ? "N/A" : elb.loadBalancerGwIp().toString();
String lbGwMac = elb.loadBalancerGwMac() == null ? "N/A" : elb.loadBalancerGwMac().toString();
+ String lbServicePort = elb.servicePorts().isEmpty() ? "N/A" : elb.servicePorts().toString();
print(format, StringUtils.substring(elb.serviceName(), 0,
CLI_NAME_LENGTH - CLI_MARGIN_LENGTH),
@@ -71,7 +74,9 @@
StringUtils.substring(lbGwIp, 0,
CLI_IP_ADDRESS_LENGTH - CLI_MARGIN_LENGTH),
StringUtils.substring(lbGwMac, 0,
- CLI_MAC_ADDRESS_LENGTH - CLI_MARGIN_LENGTH)
+ CLI_MAC_ADDRESS_LENGTH - CLI_MARGIN_LENGTH),
+ StringUtils.substring(lbServicePort, 0,
+ CLI_LONG_SERVICE_PORT_LENGTH - CLI_MARGIN_LENGTH)
);
}
}
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubernetesExternalLbStore.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubernetesExternalLbStore.java
index 1bd286e..ce78884 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubernetesExternalLbStore.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/DistributedKubernetesExternalLbStore.java
@@ -21,10 +21,12 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.kubevirtnetworking.api.DefaultKubernetesExternalLb;
+import org.onosproject.kubevirtnetworking.api.DefaultKubernetesServicePort;
import org.onosproject.kubevirtnetworking.api.KubernetesExternalLb;
import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent;
import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbStore;
import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbStoreDelegate;
+import org.onosproject.kubevirtnetworking.api.KubernetesServicePort;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
@@ -72,6 +74,8 @@
.register(KryoNamespaces.API)
.register(KubernetesExternalLb.class)
.register(DefaultKubernetesExternalLb.class)
+ .register(KubernetesServicePort.class)
+ .register(DefaultKubernetesServicePort.class)
.register(IpAddress.class)
.register(Collection.class)
.build();
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbArpHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbArpHandler.java
new file mode 100644
index 0000000..5af9834
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbArpHandler.java
@@ -0,0 +1,501 @@
+/*
+ * Copyright 2022-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import org.onlab.packet.ARP;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
+import org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfig;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigAdminService;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigEvent;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigListener;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbInterface;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.GW_ENTRY_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBERNETES_EXTERNAL_LB_FAKE_MAC;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.kubernetesElbMac;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles arp packets related to the kubernetes external loadbalancer handler.
+ */
+@Component(immediate = true)
+public class KubernetesExternalLbArpHandler {
+ protected final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PacketService packetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubernetesExternalLbConfigAdminService externalLbConfigAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtNodeAdminService nodeAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected KubevirtFlowRuleService kubevirtFlowRuleService;
+
+ private static final IpAddress NON_ROUTABLE_META_ADDRESS = IpAddress.valueOf("0.0.0.0");
+ private final Timer externalLbGwTimer = new Timer("kubernetes-external-lb-gateway");
+ private final Timer externalLbIntfGwTimer = new Timer("kubernetes-external-lb-intf-gateway");
+ private static final long SECONDS = 1000L;
+ private static final long INITIAL_DELAY = 5 * SECONDS;
+ private static final long TASK_PERIOD = 60 * SECONDS;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+ private final PacketProcessor packetProcessor = new InternalPacketProcessor();
+ private final InternalKubernetesExternalLbConfigListener
+ lbConfigListener = new InternalKubernetesExternalLbConfigListener();
+
+ private final InternalNodeEventListener
+ nodeEventListener = new InternalNodeEventListener();
+
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+
+ packetService.addProcessor(packetProcessor, PacketProcessor.director(1));
+ externalLbConfigAdminService.addListener(lbConfigListener);
+ nodeAdminService.addListener(nodeEventListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ leadershipService.withdraw(appId.name());
+ packetService.removeProcessor(packetProcessor);
+ externalLbConfigAdminService.removeListener(lbConfigListener);
+ nodeAdminService.removeListener(nodeEventListener);
+
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ private class InternalKubernetesExternalLbConfigListener
+ implements KubernetesExternalLbConfigListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(KubernetesExternalLbConfigEvent event) {
+ switch (event.type()) {
+ case KUBERNETES_EXTERNAL_LB_CONFIG_CREATED:
+ case KUBERNETES_EXTERNAL_LB_CONFIG_UPDATED:
+ eventExecutor.execute(() -> processConfigCreatedOrUpdated(event.subject()));
+ case KUBERNETES_EXTERNAL_LB_CONFIG_REMOVED:
+ default:
+ //do nothing
+ break;
+ }
+ }
+ private void processConfigCreatedOrUpdated(KubernetesExternalLbConfig config) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ if (config.loadBalancerGwMac() != null) {
+ return;
+ }
+
+ processKubernetesExternalLbConfigMacLearning(config);
+ }
+ }
+
+ private class InternalNodeEventListener implements KubevirtNodeListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+
+ @Override
+ public void event(KubevirtNodeEvent event) {
+ switch (event.type()) {
+ case KUBEVIRT_NODE_COMPLETE:
+ eventExecutor.execute(() -> processNodeCompletion(event.subject()));
+ break;
+ case KUBEVIRT_NODE_INCOMPLETE:
+ case KUBEVIRT_NODE_UPDATED:
+ case KUBEVIRT_NODE_REMOVED:
+ break;
+
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ private void processNodeCompletion(KubevirtNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+ if (node.type().equals(GATEWAY)) {
+ KubernetesExternalLbInterface externalLbInterface =
+ node.kubernetesExternalLbInterface();
+
+ if (externalLbInterface != null && externalLbInterface.externalLbGwMac() == null) {
+ processKubernetesExternalLbIntfGwMacLearningForGatewayNode(node);
+ }
+
+ KubernetesExternalLbConfig config =
+ externalLbConfigAdminService.lbConfigs().stream().findAny().orElse(null);
+
+ if (config != null && config.loadBalancerGwMac() == null) {
+ processKubernetesExternalLbConfigMacLearning(config);
+ }
+ }
+ }
+ }
+
+ private void processKubernetesExternalLbConfigMacLearning(KubernetesExternalLbConfig config) {
+ nodeAdminService.completeExternalLbGatewayNodes().forEach(gateway -> {
+ setRuleArpRequestToController(config.loadBalancerGwIp(),
+ KUBERNETES_EXTERNAL_LB_FAKE_MAC, gateway, true);
+ });
+
+ KubevirtNode gateway = nodeAdminService.completeExternalLbGatewayNodes()
+ .stream().findAny().orElse(null);
+ if (gateway == null) {
+ return;
+ }
+ PortNumber externalPatchPortNum = KubevirtNetworkingUtil.externalPatchPortNum(deviceService, gateway);
+
+ if (externalPatchPortNum == null) {
+ log.warn("processKubernetesExternalLbConfigMacLearning" +
+ " called but there's no external patchPort for {}. Stop this task.",
+ gateway);
+ return;
+ }
+
+ retrievePeerMac(NON_ROUTABLE_META_ADDRESS, KUBERNETES_EXTERNAL_LB_FAKE_MAC,
+ config.loadBalancerGwIp(), gateway, externalPatchPortNum);
+ checkKubernetesExternalLbConfigMacRetrieved(config, gateway);
+ }
+
+
+ private void processKubernetesExternalLbIntfGwMacLearningForGatewayNode(KubevirtNode gatewayNode) {
+
+ MacAddress elbIntfMac = kubernetesElbMac(deviceService, gatewayNode);
+ if (elbIntfMac == null) {
+ log.warn("processKubernetesExternalLbGwMacLearningForNode called but elbIntfMac is null. Stop this task.");
+ return;
+ }
+
+ KubernetesExternalLbInterface externalLbInterface = gatewayNode.kubernetesExternalLbInterface();
+
+ setRuleArpRequestToController(externalLbInterface.externalLbGwIp(), elbIntfMac,
+ gatewayNode, true);
+
+ PortNumber elbPatchPortNum = KubevirtNetworkingUtil.elbPatchPortNum(deviceService, gatewayNode);
+
+ if (elbPatchPortNum == null) {
+ log.warn("processKubernetesExternalLbIntfGwMacLearningForGatewayNode" +
+ " called but there's no elb patchPort for {}. Stop this task.",
+ gatewayNode);
+ return;
+ }
+
+ retrievePeerMac(externalLbInterface.externalLbIp(), elbIntfMac,
+ externalLbInterface.externalLbGwIp(), gatewayNode, elbPatchPortNum);
+
+ checkKubernetesExternalLbIntfGwMacRetrieved(gatewayNode);
+ }
+
+ private void checkKubernetesExternalLbIntfGwMacRetrieved(KubevirtNode gateway) {
+ KubernetesExternalLbInterface externalLbInterface = gateway.kubernetesExternalLbInterface();
+
+ MacAddress elbIntfMac = kubernetesElbMac(deviceService, gateway);
+ if (elbIntfMac == null) {
+ log.warn("setDownstreamRules called but elbIntfMac is null. Stop this task.");
+ return;
+ }
+
+ KubernetexExternalLbIntfTimerTask task = new KubernetexExternalLbIntfTimerTask(
+ externalLbInterface.externalLbIp(), elbIntfMac,
+ externalLbInterface.externalLbGwIp(), gateway);
+
+ externalLbIntfGwTimer.schedule(task, INITIAL_DELAY, TASK_PERIOD);
+ }
+
+ private void checkKubernetesExternalLbConfigMacRetrieved(KubernetesExternalLbConfig config,
+ KubevirtNode gateway) {
+ KubernetesExternalLbConfigTimerTask task = new KubernetesExternalLbConfigTimerTask(
+ IpAddress.valueOf("0.0.0.0"), KUBERNETES_EXTERNAL_LB_FAKE_MAC,
+ config.loadBalancerGwIp(), gateway);
+
+ externalLbGwTimer.schedule(task, INITIAL_DELAY, TASK_PERIOD);
+ }
+
+
+ private void setRuleArpRequestToController(IpAddress targetIpAddress,
+ MacAddress dstMac,
+ KubevirtNode gatewayNode,
+ boolean install) {
+ if (targetIpAddress == null || dstMac == null || gatewayNode == null) {
+ return;
+ }
+
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+ .matchArpOp(ARP.OP_REPLY)
+ .matchArpSpa(targetIpAddress.getIp4Address())
+ .matchArpTha(dstMac)
+ .build();
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .punt()
+ .build();
+
+ kubevirtFlowRuleService.setRule(
+ appId,
+ gatewayNode.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_ARP_GATEWAY_RULE,
+ GW_ENTRY_TABLE,
+ install
+ );
+ }
+
+
+ private void retrievePeerMac(IpAddress srcIp, MacAddress srcMac,
+ IpAddress peerIp, KubevirtNode gatewayNode,
+ PortNumber portNumber) {
+ log.trace("Sending ARP request to the peer {} to retrieve the MAC address.",
+ peerIp.getIp4Address().toString());
+
+ Ethernet ethRequest = ARP.buildArpRequest(srcMac.toBytes(),
+ srcIp.toOctets(),
+ peerIp.toOctets(), VlanId.NO_VID);
+
+ if (gatewayNode == null) {
+ log.warn("retrievePeerMac called but there's no gateway node for {}. Stop this task.",
+ gatewayNode);
+ return;
+ }
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(portNumber)
+ .build();
+
+ packetService.emit(new DefaultOutboundPacket(
+ gatewayNode.intgBridge(),
+ treatment,
+ ByteBuffer.wrap(ethRequest.serialize())));
+
+ }
+
+ private class KubernetesExternalLbConfigTimerTask extends TimerTask {
+ private final IpAddress srcIp;
+ private final MacAddress srcMac;
+ private final IpAddress peerIp;
+ private final KubevirtNode gatewayNode;
+
+ public KubernetesExternalLbConfigTimerTask(IpAddress srcIp, MacAddress srcMac,
+ IpAddress peerIp, KubevirtNode gatewayNode) {
+ this.srcIp = srcIp;
+ this.srcMac = srcMac;
+ this.peerIp = peerIp;
+ this.gatewayNode = gatewayNode;
+ }
+
+ @Override
+ public void run() {
+ KubernetesExternalLbConfig config =
+ externalLbConfigAdminService.lbConfigs().stream().findAny().orElse(null);
+
+ if (config == null) {
+ return;
+ }
+
+ if (config.loadBalancerGwMac() != null) {
+ log.info("Peer Mac {} for KubernetesExternalLbGateway is retrieved. Stop this task.",
+ config.loadBalancerGwMac());
+ this.cancel();
+ return;
+ }
+
+ PortNumber externalPatchPortNum = KubevirtNetworkingUtil.externalPatchPortNum(deviceService, gatewayNode);
+
+ if (externalPatchPortNum == null) {
+ log.warn("processKubernetesExternalLbConfigMacLearning" +
+ " called but there's no external patchPort for {}. Stop this task.",
+ gatewayNode);
+ return;
+ }
+
+ retrievePeerMac(srcIp, srcMac, peerIp, gatewayNode, externalPatchPortNum);
+ }
+ }
+
+
+ private class KubernetexExternalLbIntfTimerTask extends TimerTask {
+ private final IpAddress srcIp;
+ private final MacAddress srcMac;
+ private final IpAddress peerIp;
+ private final KubevirtNode gatewayNode;
+
+ public KubernetexExternalLbIntfTimerTask(IpAddress srcIp, MacAddress srcMac,
+ IpAddress peerIp, KubevirtNode gatewayNode) {
+ this.srcIp = srcIp;
+ this.srcMac = srcMac;
+ this.peerIp = peerIp;
+ this.gatewayNode = gatewayNode;
+ }
+
+ @Override
+ public void run() {
+
+ KubernetesExternalLbInterface externalLbInterface = gatewayNode.kubernetesExternalLbInterface();
+
+ if (externalLbInterface.externalLbGwMac() != null) {
+ log.info("Peer Mac {} for KubernetesExternalLbIntfGw for node {} is retrieved. Stop this task.",
+ externalLbInterface.externalLbGwMac(), gatewayNode.hostname());
+ this.cancel();
+ return;
+ }
+
+ PortNumber elbPatchPortNum = KubevirtNetworkingUtil.elbPatchPortNum(deviceService, gatewayNode);
+
+ if (elbPatchPortNum == null) {
+ log.warn("processKubernetesExternalLbIntfGwMacLearningForGatewayNode" +
+ " called but there's no elb patchPort for {}. Stop this task.",
+ gatewayNode);
+ return;
+ }
+
+ retrievePeerMac(srcIp, srcMac, peerIp, gatewayNode, elbPatchPortNum);
+ }
+ }
+
+ private class InternalPacketProcessor implements PacketProcessor {
+ @Override
+ public void process(PacketContext context) {
+ if (context.isHandled()) {
+ return;
+ }
+
+ InboundPacket pkt = context.inPacket();
+ Ethernet ethernet = pkt.parsed();
+
+ if (ethernet != null && ethernet.getEtherType() == Ethernet.TYPE_ARP) {
+ processArpPacket(ethernet);
+ }
+ }
+
+ private void processArpPacket(Ethernet ethernet) {
+ ARP arp = (ARP) ethernet.getPayload();
+
+ if (arp.getOpCode() == ARP.OP_REQUEST) {
+ return;
+ }
+ log.trace("ARP request {}", arp);
+
+ KubernetesExternalLbConfig config =
+ externalLbConfigAdminService.lbConfigs().stream().findAny().orElse(null);
+
+ IpAddress spa = Ip4Address.valueOf(arp.getSenderProtocolAddress());
+ MacAddress sha = MacAddress.valueOf(arp.getSenderHardwareAddress());
+
+ if (config != null && config.loadBalancerGwIp().equals(spa)) {
+ externalLbConfigAdminService.updateKubernetesExternalLbConfig(config.updateLbGatewayMac(sha));
+ }
+
+ nodeAdminService.completeExternalLbGatewayNodes().forEach(gateway -> {
+ KubernetesExternalLbInterface externalLbInterface =
+ gateway.kubernetesExternalLbInterface();
+ if (externalLbInterface == null) {
+ return;
+ }
+
+ if (externalLbInterface.externalLbGwIp().equals(spa)) {
+ if (externalLbInterface.externalLbGwMac() == null ||
+ !externalLbInterface.externalLbGwMac().equals(sha)) {
+ nodeAdminService.updateNode(gateway.updateKubernetesElbIntfGwMac(sha));
+ }
+ }
+ });
+ }
+ }
+}
+
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java
index 7b1c733..5953c8a 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesExternalLbHandler.java
@@ -80,7 +80,7 @@
public class KubernetesExternalLbHandler {
protected final Logger log = getLogger(getClass());
- private static final int TP_PORT_MINIMUM_NUM = 1025;
+ private static final int TP_PORT_MINIMUM_NUM = 10000;
private static final int TP_PORT_MAXIMUM_NUM = 65535;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -182,9 +182,9 @@
return;
}
- if (lb.electedGateway() == null || lb.electedWorker() == null) {
+ if (lb.electedGateway() == null || lb.electedWorker() == null || lb.loadBalancerGwMac() == null) {
log.warn("processKubernetesExternalLbCreatedOrUpdated called but electedGateway " +
- "or electedWorker is null. Stop this task.");
+ "or electedWorker or loadBalancerGwMacis null. Stop this task.");
return;
}
@@ -198,7 +198,9 @@
return;
}
- if (lb.electedWorker() == null || oldGatway == null) {
+ if (lb.electedWorker() == null || oldGatway == null || lb.loadBalancerGwMac() == null) {
+ log.warn("processKubernetesExternalLbGatewayChanged called but old electedWorker " +
+ "or electedWorker or loadBalancerGwMacis null. Stop this task.");
return;
}
@@ -326,13 +328,13 @@
return;
}
- lb.nodePortSet().forEach(nodeport -> {
+ lb.servicePorts().forEach(servicePort -> {
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchEthDst(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
.matchIPDst(loadBalancerIp.toIpPrefix())
.matchIPProtocol(IPv4.PROTOCOL_TCP)
- .matchTcpDst(TpPort.tpPort(nodeport.intValue()))
+ .matchTcpDst(TpPort.tpPort(servicePort.port().intValue()))
.build();
ExtensionTreatment natTreatment = RulePopulatorUtil
@@ -351,6 +353,7 @@
.setEthSrc(elbIntfMac)
.setEthDst(externalLbInterface.externalLbGwMac())
.setIpDst(electedWorker.dataIp())
+ .setTcpDst(TpPort.tpPort(servicePort.nodePort().intValue()))
.setOutput(elbBridgePortNum)
.build();
@@ -404,13 +407,13 @@
return;
}
- lb.nodePortSet().forEach(nodePort -> {
+ lb.servicePorts().forEach(servicePort -> {
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPSrc(electedWorker.dataIp().toIpPrefix())
.matchIPDst(externalLbInterface.externalLbIp().toIpPrefix())
.matchIPProtocol(IPv4.PROTOCOL_TCP)
- .matchTcpSrc(TpPort.tpPort(nodePort.intValue()));
+ .matchTcpSrc(TpPort.tpPort(servicePort.nodePort().intValue()));
ExtensionTreatment natTreatment = RulePopulatorUtil
.niciraConnTrackTreatmentBuilder(driverService, gateway.intgBridge())
@@ -423,6 +426,7 @@
.setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
.setIpSrc(lb.loadBalancerIp())
.setEthDst(lb.loadBalancerGwMac())
+ .setTcpSrc(TpPort.tpPort(servicePort.port().intValue()))
.extension(natTreatment, gateway.intgBridge())
.transition(GW_DROP_TABLE);
@@ -438,13 +442,12 @@
sBuilder = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPProtocol(IPv4.PROTOCOL_TCP)
- .matchTcpSrc(TpPort.tpPort(nodePort.intValue()));
-
+ .matchEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
+ .matchIPSrc(lb.loadBalancerIp().toIpPrefix())
+ .matchEthDst(lb.loadBalancerGwMac())
+ .matchTcpSrc(TpPort.tpPort(servicePort.port().intValue()));
tBuilder = DefaultTrafficTreatment.builder()
- .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
- .setIpSrc(lb.loadBalancerIp())
- .setEthDst(lb.loadBalancerGwMac())
.setOutput(externalPatchPortNum);
flowService.setRule(
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesServiceWatcher.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesServiceWatcher.java
index 2aec742..d80ee25 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesServiceWatcher.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubernetesServiceWatcher.java
@@ -34,8 +34,10 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.kubevirtnetworking.api.DefaultKubernetesExternalLb;
+import org.onosproject.kubevirtnetworking.api.DefaultKubernetesServicePort;
import org.onosproject.kubevirtnetworking.api.KubernetesExternalLb;
import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbAdminService;
+import org.onosproject.kubevirtnetworking.api.KubernetesServicePort;
import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfig;
import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigEvent;
import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigListener;
@@ -63,6 +65,7 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.configMapUpdated;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedService;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.workerNodeForSpecifiedService;
@@ -175,7 +178,7 @@
switch (event.type()) {
case KUBERNETES_EXTERNAL_LB_CONFIG_CREATED:
case KUBERNETES_EXTERNAL_LB_CONFIG_UPDATED:
- eventExecutor.execute(this::processConfigUpdate);
+ eventExecutor.execute(() -> processConfigUpdate(event.subject()));
break;
case KUBERNETES_EXTERNAL_LB_CONFIG_REMOVED:
default:
@@ -184,11 +187,13 @@
}
}
- private void processConfigUpdate() {
+ private void processConfigUpdate(KubernetesExternalLbConfig externalLbConfig) {
if (!isRelevantHelper()) {
return;
}
- addOrUpdateExternalLoadBalancers();
+ if (configMapUpdated(externalLbConfig)) {
+ addOrUpdateExternalLoadBalancers();
+ }
}
}
@@ -261,7 +266,9 @@
return;
}
- if (!configMapUpdated()) {
+ KubernetesExternalLbConfig config = lbConfigService.lbConfigs().stream().findAny().orElse(null);
+
+ if (!configMapUpdated(config)) {
log.warn("Config map is not set yet. Stop this task");
return;
}
@@ -390,17 +397,6 @@
.findAny().isPresent();
}
- private boolean configMapUpdated() {
- KubernetesExternalLbConfig config = lbConfigService.lbConfigs().stream().findAny().orElse(null);
-
- if (config == null) {
- return false;
- }
-
- return config.configName() != null && config.globalIpRange() != null &&
- config.loadBalancerGwIp() != null && config.loadBalancerGwMac() != null;
- }
-
//Only process if the event when the service type is LoadBalancer
private boolean isLoadBalancerType(Service service) {
return service.getSpec().getType().equals(TYPE_LOADBALANCER);
@@ -421,13 +417,15 @@
return null;
}
- Set<Integer> nodePortSet = Sets.newHashSet();
- Set<Integer> portSet = Sets.newHashSet();
+ Set<KubernetesServicePort> servicePorts = Sets.newHashSet();
Set<String> endpointSet = Sets.newHashSet();
service.getSpec().getPorts().forEach(servicePort -> {
- nodePortSet.add(servicePort.getNodePort());
- portSet.add(servicePort.getPort());
+ if (servicePort.getPort() != null && servicePort.getNodePort() != null) {
+ servicePorts.add(DefaultKubernetesServicePort.builder()
+ .nodePort(servicePort.getNodePort())
+ .port(servicePort.getPort()).build());
+ }
});
nodeService.completeNodes(WORKER).forEach(workerNode -> {
@@ -452,8 +450,7 @@
return DefaultKubernetesExternalLb.builder().serviceName(serviceName)
.loadBalancerIp(IpAddress.valueOf(lbIp))
- .nodePortSet(nodePortSet)
- .portSet(portSet)
+ .servicePorts(servicePorts)
.endpointSet(endpointSet)
.loadBalancerGwIp(IpAddress.valueOf(loadbalancerGatewayIp))
.loadBalancerGwMac(loadBalancerGatewayMac)
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
index 539323f..345e670 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/util/KubevirtNetworkingUtil.java
@@ -53,6 +53,7 @@
import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
+import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfig;
import org.onosproject.kubevirtnode.api.KubernetesExternalLbInterface;
import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
@@ -557,16 +558,32 @@
KubernetesExternalLb externalLb) {
//TODO: enhance election logic for a better load balancing
- int numOfGateways = nodeService.completeNodes(GATEWAY).size();
+ int numOfGateways = nodeService.completeExternalLbGatewayNodes().size();
if (numOfGateways == 0) {
return null;
}
- return (KubevirtNode) nodeService.completeNodes(GATEWAY)
+ return (KubevirtNode) nodeService.completeExternalLbGatewayNodes()
.toArray()[externalLb.hashCode() % numOfGateways];
}
/**
+ * Returns whether a mac address in kubernetes external lb config is updated.
+ *
+ * @param externalLbConfig kubernetes external lb config
+ * @return true if a mac address is added
+ */
+ public static boolean configMapUpdated(KubernetesExternalLbConfig externalLbConfig) {
+ if (externalLbConfig == null) {
+ return false;
+ }
+
+ return externalLbConfig.configName() != null && externalLbConfig.globalIpRange() != null &&
+ externalLbConfig.loadBalancerGwIp() != null && externalLbConfig.loadBalancerGwMac() != null;
+ }
+
+
+ /**
* Returns the worker node for the specified kubernetes external lb.
* Among worker nodes, only one worker would serve the traffic from and to the gateway.
* Currently worker node is selected based on modulo operation with external lb hashcode.