Implement east-west communication for k8s network
Change-Id: Ibac91b7a856e35a26cf0e0f23d6d01f65197625d
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
new file mode 100644
index 0000000..90ad174
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
@@ -0,0 +1,361 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import org.onlab.packet.ARP;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPort;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeEvent;
+import org.onosproject.k8snode.api.K8sNodeListener;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE;
+import static org.onosproject.k8snetworking.api.Constants.ARP_PROXY_MODE;
+import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE_DEFAULT;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC_DEFAULT;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
+
+/**
+ * Handles ARP packet from containers.
+ */
+@Component(
+ immediate = true,
+ property = {
+ GATEWAY_MAC + "=" + GATEWAY_MAC_DEFAULT,
+ ARP_MODE + "=" + ARP_MODE_DEFAULT
+ }
+)
+public class K8sSwitchingArpHandler {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PacketService packetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sNodeService k8sNodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sNetworkService k8sNetworkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected K8sFlowRuleService k8sFlowRuleService;
+
+ /** Fake MAC address for virtual network subnet gateway. */
+ private String gatewayMac = GATEWAY_MAC_DEFAULT;
+
+ /** ARP processing mode, broadcast | proxy (default). */
+ protected String arpMode = ARP_MODE_DEFAULT;
+
+ private MacAddress gwMacAddress;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
+ private final InternalNodeEventListener k8sNodeListener = new InternalNodeEventListener();
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ void activate() {
+ appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+ configService.registerProperties(getClass());
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+ k8sNodeService.addListener(k8sNodeListener);
+ packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ void deactivate() {
+ packetService.removeProcessor(packetProcessor);
+ k8sNodeService.removeListener(k8sNodeListener);
+ leadershipService.withdraw(appId.name());
+ configService.unregisterProperties(getClass(), false);
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Modified
+ void modified(ComponentContext context) {
+ readComponentConfiguration(context);
+
+ log.info("Modified");
+ }
+
+ /**
+ * Processes ARP request packets.
+ *
+ * @param context packet context
+ * @param ethPacket ethernet packet
+ */
+ private void processPacketIn(PacketContext context, Ethernet ethPacket) {
+ // if the ARP mode is configured as broadcast mode, we simply ignore ARP packet_in
+ if (ARP_BROADCAST_MODE.equals(getArpMode())) {
+ return;
+ }
+
+ ARP arpPacket = (ARP) ethPacket.getPayload();
+ if (arpPacket.getOpCode() != ARP.OP_REQUEST) {
+ return;
+ }
+
+ K8sPort srcPort = k8sNetworkService.ports().stream()
+ .filter(p -> p.macAddress().equals(ethPacket.getSourceMAC()))
+ .findAny().orElse(null);
+
+ if (srcPort == null && !context.inPacket().receivedFrom().port()
+ .equals(PortNumber.LOCAL)) {
+ log.warn("Failed to find source port(MAC:{})", ethPacket.getSourceMAC());
+ return;
+ }
+
+ // FIXME: this is a workaround for storing host GW MAC address,
+ // need to find a way to store the MAC address in persistent way
+ if (context.inPacket().receivedFrom().port().equals(PortNumber.LOCAL)) {
+ gwMacAddress = ethPacket.getSourceMAC();
+ }
+
+ IpAddress targetIp = Ip4Address.valueOf(arpPacket.getTargetProtocolAddress());
+
+ MacAddress replyMac = k8sNetworkService.ports().stream()
+ // .filter(p -> p.networkId().equals(srcPort.networkId()))
+ .filter(p -> p.ipAddress().equals(targetIp))
+ .map(K8sPort::macAddress)
+ .findAny().orElse(null);
+
+ long gwIpCnt = k8sNetworkService.networks().stream()
+ .filter(n -> n.gatewayIp().equals(targetIp))
+ .count();
+
+ if (gwIpCnt > 0) {
+ replyMac = gwMacAddress;
+ }
+
+ if (replyMac == null) {
+ log.debug("Failed to find MAC address for {}", targetIp);
+ return;
+ }
+
+ Ethernet ethReply = ARP.buildArpReply(
+ targetIp.getIp4Address(),
+ replyMac,
+ ethPacket);
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(context.inPacket().receivedFrom().port())
+ .build();
+
+ packetService.emit(new DefaultOutboundPacket(
+ context.inPacket().receivedFrom().deviceId(),
+ treatment,
+ ByteBuffer.wrap(ethReply.serialize())));
+ }
+
+ private String getArpMode() {
+ Set<ConfigProperty> properties = configService.getProperties(this.getClass().getName());
+ return getPropertyValue(properties, ARP_MODE);
+ }
+
+ /**
+ * Extracts properties from the component configuration context.
+ *
+ * @param context the component context
+ */
+ private void readComponentConfiguration(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ String updatedMac = Tools.get(properties, GATEWAY_MAC);
+ gatewayMac = updatedMac != null ? updatedMac : GATEWAY_MAC_DEFAULT;
+ log.info("Configured. Gateway MAC is {}", gatewayMac);
+ }
+
+ /**
+ * An internal packet processor which processes ARP request, and results in
+ * packet-out ARP reply.
+ */
+ private class InternalPacketProcessor implements PacketProcessor {
+
+ @Override
+ public void process(PacketContext context) {
+ if (context.isHandled()) {
+ return;
+ }
+
+ Ethernet ethPacket = context.inPacket().parsed();
+ if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
+ return;
+ }
+
+ eventExecutor.execute(() -> processPacketIn(context, ethPacket));
+ }
+ }
+
+ /**
+ * An internal kubernetes node listener which is used for listening kubernetes
+ * node activity. As long as a node is in complete state, we will install
+ * default ARP rule to handle ARP request.
+ */
+ private class InternalNodeEventListener implements K8sNodeListener {
+
+ private boolean isRelevantHelper() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(K8sNodeEvent event) {
+ K8sNode k8sNode = event.subject();
+ switch (event.type()) {
+ case K8S_NODE_COMPLETE:
+ eventExecutor.execute(() -> processNodeCompletion(k8sNode));
+ break;
+ case K8S_NODE_INCOMPLETE:
+ eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processNodeCompletion(K8sNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setDefaultArpRule(node, true);
+ }
+
+ private void processNodeIncompletion(K8sNode node) {
+ if (!isRelevantHelper()) {
+ return;
+ }
+
+ setDefaultArpRule(node, false);
+ }
+
+ private void setDefaultArpRule(K8sNode node, boolean install) {
+
+ if (getArpMode() == null) {
+ return;
+ }
+
+ switch (getArpMode()) {
+ case ARP_PROXY_MODE:
+ setDefaultArpRuleForProxyMode(node, install);
+ break;
+ case ARP_BROADCAST_MODE:
+ // TODO: need to implement broadcast mode
+ log.warn("Not implemented yet.");
+ break;
+ default:
+ log.warn("Invalid ARP mode {}. Please use either " +
+ "broadcast or proxy mode.", getArpMode());
+ break;
+ }
+ }
+
+ private void setDefaultArpRuleForProxyMode(K8sNode node, boolean install) {
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+ .build();
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .punt()
+ .build();
+
+ k8sFlowRuleService.setRule(
+ appId,
+ node.intgBridge(),
+ selector,
+ treatment,
+ PRIORITY_ARP_CONTROL_RULE,
+ ARP_TABLE,
+ install
+ );
+ }
+ }
+}