Implement east-west communication for k8s network

Change-Id: Ibac91b7a856e35a26cf0e0f23d6d01f65197625d
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
new file mode 100644
index 0000000..90ad174
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
@@ -0,0 +1,361 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import org.onlab.packet.ARP;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPort;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeEvent;
+import org.onosproject.k8snode.api.K8sNodeListener;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE;
+import static org.onosproject.k8snetworking.api.Constants.ARP_PROXY_MODE;
+import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE_DEFAULT;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC_DEFAULT;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
+
+/**
+ * Handles ARP packet from containers.
+ */
+@Component(
+        immediate = true,
+        property = {
+                GATEWAY_MAC + "=" + GATEWAY_MAC_DEFAULT,
+                ARP_MODE + "=" + ARP_MODE_DEFAULT
+        }
+)
+public class K8sSwitchingArpHandler {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PacketService packetService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService configService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNodeService k8sNodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNetworkService k8sNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sFlowRuleService k8sFlowRuleService;
+
+    /** Fake MAC address for virtual network subnet gateway. */
+    private String gatewayMac = GATEWAY_MAC_DEFAULT;
+
+    /** ARP processing mode, broadcast | proxy (default). */
+    protected String arpMode = ARP_MODE_DEFAULT;
+
+    private MacAddress gwMacAddress;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+    private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
+    private final InternalNodeEventListener k8sNodeListener = new InternalNodeEventListener();
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    void activate() {
+        appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+        configService.registerProperties(getClass());
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        k8sNodeService.addListener(k8sNodeListener);
+        packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    void deactivate() {
+        packetService.removeProcessor(packetProcessor);
+        k8sNodeService.removeListener(k8sNodeListener);
+        leadershipService.withdraw(appId.name());
+        configService.unregisterProperties(getClass(), false);
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    @Modified
+    void modified(ComponentContext context) {
+        readComponentConfiguration(context);
+
+        log.info("Modified");
+    }
+
+    /**
+     * Processes ARP request packets.
+     *
+     * @param context   packet context
+     * @param ethPacket ethernet packet
+     */
+    private void processPacketIn(PacketContext context, Ethernet ethPacket) {
+        // if the ARP mode is configured as broadcast mode, we simply ignore ARP packet_in
+        if (ARP_BROADCAST_MODE.equals(getArpMode())) {
+            return;
+        }
+
+        ARP arpPacket = (ARP) ethPacket.getPayload();
+        if (arpPacket.getOpCode() != ARP.OP_REQUEST) {
+            return;
+        }
+
+        K8sPort srcPort = k8sNetworkService.ports().stream()
+                .filter(p -> p.macAddress().equals(ethPacket.getSourceMAC()))
+                .findAny().orElse(null);
+
+        if (srcPort == null && !context.inPacket().receivedFrom().port()
+                .equals(PortNumber.LOCAL)) {
+            log.warn("Failed to find source port(MAC:{})", ethPacket.getSourceMAC());
+            return;
+        }
+
+        // FIXME: this is a workaround for storing host GW MAC address,
+        // need to find a way to store the MAC address in persistent way
+        if (context.inPacket().receivedFrom().port().equals(PortNumber.LOCAL)) {
+            gwMacAddress = ethPacket.getSourceMAC();
+        }
+
+        IpAddress targetIp = Ip4Address.valueOf(arpPacket.getTargetProtocolAddress());
+
+        MacAddress replyMac = k8sNetworkService.ports().stream()
+        //        .filter(p -> p.networkId().equals(srcPort.networkId()))
+                .filter(p -> p.ipAddress().equals(targetIp))
+                .map(K8sPort::macAddress)
+                .findAny().orElse(null);
+
+        long gwIpCnt = k8sNetworkService.networks().stream()
+                .filter(n -> n.gatewayIp().equals(targetIp))
+                .count();
+
+        if (gwIpCnt > 0) {
+            replyMac = gwMacAddress;
+        }
+
+        if (replyMac == null) {
+            log.debug("Failed to find MAC address for {}", targetIp);
+            return;
+        }
+
+        Ethernet ethReply = ARP.buildArpReply(
+                targetIp.getIp4Address(),
+                replyMac,
+                ethPacket);
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(context.inPacket().receivedFrom().port())
+                .build();
+
+        packetService.emit(new DefaultOutboundPacket(
+                context.inPacket().receivedFrom().deviceId(),
+                treatment,
+                ByteBuffer.wrap(ethReply.serialize())));
+    }
+
+    private String getArpMode() {
+        Set<ConfigProperty> properties = configService.getProperties(this.getClass().getName());
+        return getPropertyValue(properties, ARP_MODE);
+    }
+
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        String updatedMac = Tools.get(properties, GATEWAY_MAC);
+        gatewayMac = updatedMac != null ? updatedMac : GATEWAY_MAC_DEFAULT;
+        log.info("Configured. Gateway MAC is {}", gatewayMac);
+    }
+
+    /**
+     * An internal packet processor which processes ARP request, and results in
+     * packet-out ARP reply.
+     */
+    private class InternalPacketProcessor implements PacketProcessor {
+
+        @Override
+        public void process(PacketContext context) {
+            if (context.isHandled()) {
+                return;
+            }
+
+            Ethernet ethPacket = context.inPacket().parsed();
+            if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
+                return;
+            }
+
+            eventExecutor.execute(() -> processPacketIn(context, ethPacket));
+        }
+    }
+
+    /**
+     * An internal kubernetes node listener which is used for listening kubernetes
+     * node activity. As long as a node is in complete state, we will install
+     * default ARP rule to handle ARP request.
+     */
+    private class InternalNodeEventListener implements K8sNodeListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sNodeEvent event) {
+            K8sNode k8sNode = event.subject();
+            switch (event.type()) {
+                case K8S_NODE_COMPLETE:
+                    eventExecutor.execute(() -> processNodeCompletion(k8sNode));
+                    break;
+                case K8S_NODE_INCOMPLETE:
+                    eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processNodeCompletion(K8sNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setDefaultArpRule(node, true);
+        }
+
+        private void processNodeIncompletion(K8sNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setDefaultArpRule(node, false);
+        }
+
+        private void setDefaultArpRule(K8sNode node, boolean install) {
+
+            if (getArpMode() == null) {
+                return;
+            }
+
+            switch (getArpMode()) {
+                case ARP_PROXY_MODE:
+                    setDefaultArpRuleForProxyMode(node, install);
+                    break;
+                case ARP_BROADCAST_MODE:
+                    // TODO: need to implement broadcast mode
+                    log.warn("Not implemented yet.");
+                    break;
+                default:
+                    log.warn("Invalid ARP mode {}. Please use either " +
+                            "broadcast or proxy mode.", getArpMode());
+                    break;
+            }
+        }
+
+        private void setDefaultArpRuleForProxyMode(K8sNode node, boolean install) {
+            TrafficSelector selector = DefaultTrafficSelector.builder()
+                    .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+                    .build();
+
+            TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                    .punt()
+                    .build();
+
+            k8sFlowRuleService.setRule(
+                    appId,
+                    node.intgBridge(),
+                    selector,
+                    treatment,
+                    PRIORITY_ARP_CONTROL_RULE,
+                    ARP_TABLE,
+                    install
+            );
+        }
+    }
+}