Support to learn external gateway MAC at controller

Change-Id: I72c13133708de1ac86e26160397233518489d46b
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java
new file mode 100644
index 0000000..f11aab4
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import org.onlab.packet.ARP;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeAdminService;
+import org.onosproject.k8snode.api.K8sNodeEvent;
+import org.onosproject.k8snode.api.K8sNodeListener;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.lang.Thread.sleep;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_REPLY_RULE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles ARP request/reply from external gateway.
+ */
+@Component(immediate = true)
+public class K8sRoutingArpHandler {
+    private final Logger log = getLogger(getClass());
+
+    private static final long SLEEP_MS = 5000;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PacketService packetService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNodeAdminService k8sNodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sFlowRuleService k8sFlowRuleService;
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    private final InternalK8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+    private final PacketProcessor packetProcessor = new InternalPacketProcessor();
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        k8sNodeService.addListener(k8sNodeListener);
+        packetService.addProcessor(packetProcessor, PacketProcessor.director(1));
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        k8sNodeService.removeListener(k8sNodeListener);
+        packetService.removeProcessor(packetProcessor);
+        leadershipService.withdraw(appId.name());
+        eventExecutor.shutdown();
+        log.info("Stopped");
+    }
+
+    private void processArpPacket(PacketContext context, Ethernet ethernet) {
+        ARP arp = (ARP) ethernet.getPayload();
+
+        if (arp.getOpCode() == ARP.OP_REPLY) {
+            IpAddress spa = Ip4Address.valueOf(arp.getSenderProtocolAddress());
+            MacAddress sha = MacAddress.valueOf(arp.getSenderHardwareAddress());
+
+            log.info("ARP reply from external gateway ip: {}, mac: {}", spa, sha);
+
+            Set<IpAddress> gatewayIps = k8sNodeService.completeNodes().stream()
+                    .map(K8sNode::extGatewayIp).collect(Collectors.toSet());
+
+            if (!gatewayIps.contains(spa)) {
+                return;
+            }
+
+            k8sNodeService.completeNodes().stream()
+                    .filter(n -> n.extGatewayMac() == null)
+                    .forEach(n -> {
+                        K8sNode updated = n.updateExtGatewayMac(sha);
+                        k8sNodeService.updateNode(updated);
+                    });
+        }
+    }
+
+    private void sendArpRequest(K8sNode k8sNode) {
+        MacAddress bridgeMac = k8sNode.extBridgeMac();
+        IpAddress bridgeIp = k8sNode.extBridgeIp();
+        IpAddress extGatewayIp = k8sNode.extGatewayIp();
+        Ethernet ethRequest = ARP.buildArpRequest(bridgeMac.toBytes(), bridgeIp.toOctets(),
+                extGatewayIp.toOctets(), VlanId.NO_VID);
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(k8sNode.extBridgePortNum())
+                .build();
+
+        packetService.emit(new DefaultOutboundPacket(
+                k8sNode.extBridge(),
+                treatment,
+                ByteBuffer.wrap(ethRequest.serialize())));
+    }
+
+    private void setArpReplyRule(K8sNode k8sNode, boolean install) {
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_ARP)
+                .matchArpOp(ARP.OP_REPLY)
+                .matchArpSpa(Ip4Address.valueOf(k8sNode.extGatewayIp().toString()))
+                .build();
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .punt()
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                k8sNode.extBridge(),
+                selector,
+                treatment,
+                PRIORITY_ARP_REPLY_RULE,
+                EXT_ENTRY_TABLE,
+                install
+        );
+    }
+
+    private class InternalK8sNodeListener implements K8sNodeListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sNodeEvent event) {
+            switch (event.type()) {
+                case K8S_NODE_COMPLETE:
+                    eventExecutor.execute(() -> processNodeCompletion(event.subject()));
+                    break;
+                case K8S_NODE_INCOMPLETE:
+                default:
+                    break;
+            }
+        }
+
+        private void processNodeCompletion(K8sNode k8sNode) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setArpReplyRule(k8sNode, true);
+
+            try {
+                sleep(SLEEP_MS);
+            } catch (InterruptedException e) {
+                log.error("Exception caused during ARP requesting...");
+            }
+
+            sendArpRequest(k8sNode);
+        }
+    }
+
+    private class InternalPacketProcessor implements PacketProcessor {
+
+        @Override
+        public void process(PacketContext context) {
+            if (context.isHandled()) {
+                return;
+            }
+
+            InboundPacket pkt = context.inPacket();
+            Ethernet ethernet = pkt.parsed();
+            if (ethernet != null && ethernet.getEtherType() == Ethernet.TYPE_ARP) {
+                eventExecutor.execute(() -> processArpPacket(context, ethernet));
+            }
+        }
+    }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingSnatHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingSnatHandler.java
index 04913b6..fccbaaf 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingSnatHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingSnatHandler.java
@@ -315,6 +315,9 @@
                 case K8S_NODE_COMPLETE:
                     eventExecutor.execute(() -> processNodeCompletion(event.subject()));
                     break;
+                case K8S_NODE_UPDATED:
+                    eventExecutor.execute(() -> processNodeUpdate(event.subject()));
+                    break;
                 case K8S_NODE_INCOMPLETE:
                 default:
                     break;
@@ -327,10 +330,15 @@
             }
 
             setExtIntfArpRule(k8sNode, true);
-            setSnatUpstreamRule(k8sNode, true);
             setSnatDownstreamRule(k8sNode, true);
             setContainerToExtRule(k8sNode, true);
         }
+
+        private void processNodeUpdate(K8sNode k8sNode) {
+            if (k8sNode.extGatewayMac() != null) {
+                setSnatUpstreamRule(k8sNode, true);
+            }
+        }
     }
 
     private class InternalK8sNetworkListener implements K8sNetworkListener {
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
index 779bbc6..33d4feb 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
@@ -292,6 +292,8 @@
                 context.inPacket().receivedFrom().deviceId(),
                 treatment,
                 ByteBuffer.wrap(ethReply.serialize())));
+
+        context.block();
     }
 
     private String getArpMode() {
diff --git a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java
index 7be1e9a5..87e1f1d 100644
--- a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java
+++ b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/DefaultK8sNode.java
@@ -60,6 +60,7 @@
     private final IpAddress managementIp;
     private final IpAddress dataIp;
     private final K8sNodeState state;
+    private final MacAddress extGatewayMac;
 
     private static final String NOT_NULL_MSG = "Node % cannot be null";
 
@@ -75,10 +76,12 @@
      * @param managementIp      management IP address
      * @param dataIp            data IP address
      * @param state             node state
+     * @param extGatewayMac     external gateway MAC address
      */
     protected DefaultK8sNode(String hostname, Type type, DeviceId intgBridge,
                              DeviceId extBridge, IpAddress managementIp,
-                             IpAddress dataIp, K8sNodeState state) {
+                             IpAddress dataIp, K8sNodeState state,
+                             MacAddress extGatewayMac) {
         this.hostname = hostname;
         this.type = type;
         this.intgBridge = intgBridge;
@@ -86,6 +89,7 @@
         this.managementIp = managementIp;
         this.dataIp = dataIp;
         this.state = state;
+        this.extGatewayMac = extGatewayMac;
     }
 
     @Override
@@ -123,6 +127,7 @@
                 .managementIp(managementIp)
                 .dataIp(dataIp)
                 .state(state)
+                .extGatewayMac(extGatewayMac)
                 .build();
     }
 
@@ -136,6 +141,7 @@
                 .managementIp(managementIp)
                 .dataIp(dataIp)
                 .state(state)
+                .extGatewayMac(extGatewayMac)
                 .build();
     }
 
@@ -163,10 +169,25 @@
                 .managementIp(managementIp)
                 .dataIp(dataIp)
                 .state(newState)
+                .extGatewayMac(extGatewayMac)
                 .build();
     }
 
     @Override
+    public K8sNode updateExtGatewayMac(MacAddress newMac) {
+        return new Builder()
+                .hostname(hostname)
+                .type(type)
+                .intgBridge(intgBridge)
+                .managementIp(managementIp)
+                .dataIp(dataIp)
+                .state(state)
+                .extGatewayMac(newMac)
+                .build();
+
+    }
+
+    @Override
     public PortNumber grePortNum() {
         return tunnelPortNum(GRE_TUNNEL);
     }
@@ -245,15 +266,7 @@
 
     @Override
     public MacAddress extGatewayMac() {
-        OvsdbClientService client = getOvsClient();
-
-        if (client == null) {
-            return null;
-        }
-
-        Interface iface = getOvsClient().getInterface(EXTERNAL_BRIDGE);
-        OvsdbMap data = (OvsdbMap) iface.getExternalIdsColumn().data();
-        return MacAddress.valueOf((String) data.map().get(EXT_GW_MAC));
+        return extGatewayMac;
     }
 
     @Override
@@ -308,7 +321,7 @@
     @Override
     public int hashCode() {
         return Objects.hash(hostname, type, intgBridge, extBridge,
-                            managementIp, dataIp, state);
+                            managementIp, dataIp, state, extGatewayMac);
     }
 
     @Override
@@ -321,6 +334,7 @@
                 .add("managementIp", managementIp)
                 .add("dataIp", dataIp)
                 .add("state", state)
+                .add("extGatewayMac", extGatewayMac)
                 .toString();
     }
 
@@ -376,7 +390,8 @@
                 .extBridge(node.extBridge())
                 .managementIp(node.managementIp())
                 .dataIp(node.dataIp())
-                .state(node.state());
+                .state(node.state())
+                .extGatewayMac(node.extGatewayMac());
     }
 
     public static final class Builder implements K8sNode.Builder {
@@ -389,6 +404,7 @@
         private IpAddress dataIp;
         private K8sNodeState state;
         private K8sApiConfig apiConfig;
+        private MacAddress extGatewayMac;
 
         // private constructor not intended to use from external
         private Builder() {
@@ -407,7 +423,8 @@
                     extBridge,
                     managementIp,
                     dataIp,
-                    state);
+                    state,
+                    extGatewayMac);
         }
 
         @Override
@@ -451,5 +468,11 @@
             this.state = state;
             return this;
         }
+
+        @Override
+        public Builder extGatewayMac(MacAddress extGatewayMac) {
+            this.extGatewayMac = extGatewayMac;
+            return this;
+        }
     }
 }
diff --git a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java
index 30449ff..065f2dd 100644
--- a/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java
+++ b/apps/k8s-node/api/src/main/java/org/onosproject/k8snode/api/K8sNode.java
@@ -121,6 +121,14 @@
     K8sNode updateState(K8sNodeState newState);
 
     /**
+     * Returns new kubernetes node instance with given external gateway MAC address.
+     *
+     * @param macAddress updated MAC address
+     * @return updated kubernetes node
+     */
+    K8sNode updateExtGatewayMac(MacAddress macAddress);
+
+    /**
      * Returns the GRE tunnel port number.
      *
      * @return GRE port number; null if the GRE tunnel port does not exist
@@ -271,5 +279,13 @@
          * @return kubernetes node builder
          */
         Builder state(K8sNodeState state);
+
+        /**
+         * Returns kubernetes node builder with supplied external gateway MAC.
+         *
+         * @param extGatewayMac external gateway MAC address
+         * @return kubernetes node builder
+         */
+        Builder extGatewayMac(MacAddress extGatewayMac);
     }
 }