Support to resolve external hosts' MAC address from k8s POD

1. Do not remove flow rules if we receive invalid service IP.

Change-Id: I88319f1b10c25b68271e6dcd01ea384c191c3224
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
index 8de62c8..feb81fa 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
@@ -78,7 +78,7 @@
     public static final int PRIORITY_ARP_SUBNET_RULE = 40000;
     public static final int PRIORITY_ARP_CONTROL_RULE = 40000;
     public static final int PRIORITY_ARP_REPLY_RULE = 40000;
-    public static final int PRIORITY_ARP_REQUEST_RULE = 40000;
+    public static final int PRIORITY_ARP_POD_RULE = 39000;
     public static final int PRIORITY_ARP_FLOOD_RULE = 39000;
     public static final int PRIORITY_FORCED_ACL_RULE = 50000;
     public static final int PRIORITY_ICMP_PROBE_RULE = 50000;
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java
index f11aab4..8313b81 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sRoutingArpHandler.java
@@ -58,6 +58,7 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE;
 import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_POD_RULE;
 import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_REPLY_RULE;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -180,6 +181,50 @@
         );
     }
 
+    private void setPodArpRequestRule(K8sNode k8sNode, boolean install) {
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchInPort(k8sNode.extToIntgPatchPortNum())
+                .matchEthType(Ethernet.TYPE_ARP)
+                .matchArpOp(ARP.OP_REQUEST)
+                .build();
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(k8sNode.extBridgePortNum())
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                k8sNode.extBridge(),
+                selector,
+                treatment,
+                PRIORITY_ARP_POD_RULE,
+                EXT_ENTRY_TABLE,
+                install
+        );
+    }
+
+    private void setPodArpReplyRule(K8sNode k8sNode, boolean install) {
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchInPort(k8sNode.extBridgePortNum())
+                .matchEthType(Ethernet.TYPE_ARP)
+                .matchArpOp(ARP.OP_REPLY)
+                .build();
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(k8sNode.extToIntgPatchPortNum())
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                k8sNode.extBridge(),
+                selector,
+                treatment,
+                PRIORITY_ARP_POD_RULE,
+                EXT_ENTRY_TABLE,
+                install
+        );
+    }
+
     private class InternalK8sNodeListener implements K8sNodeListener {
 
         private boolean isRelevantHelper() {
@@ -204,6 +249,8 @@
             }
 
             setArpReplyRule(k8sNode, true);
+            setPodArpRequestRule(k8sNode, true);
+            setPodArpReplyRule(k8sNode, true);
 
             try {
                 sleep(SLEEP_MS);
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index f9e0545..bbf505a 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -147,6 +147,7 @@
     private static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
 
     private static final String SERVICE_CIDR = "serviceCidr";
+    private static final String NONE = "None";
     private static final String B_CLASS_SUFFIX = ".0.0/16";
     private static final String A_CLASS_SUFFIX = ".0.0.0/8";
 
@@ -503,6 +504,11 @@
     private void setShiftDomainRules(DeviceId deviceId, int installTable,
                                      int groupId, int priority, String serviceIp,
                                      int servicePort, String protocol, boolean install) {
+
+        if (serviceIp == null || NONE.equals(serviceIp)) {
+            return;
+        }
+
         TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
                 .matchEthType(Ethernet.TYPE_IPV4)
                 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), HOST_CIDR_NUM));
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
index 33d4feb..4c351af 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
@@ -21,6 +21,8 @@
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cfg.ConfigProperty;
@@ -39,6 +41,7 @@
 import org.onosproject.k8snode.api.K8sNodeListener;
 import org.onosproject.k8snode.api.K8sNodeService;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -49,6 +52,10 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -100,6 +107,10 @@
     private static final String GATEWAY_MAC = "gatewayMac";
     private static final String ARP_MODE = "arpMode";
 
+    private static final KryoNamespace SERIALIZER_HOST_MAC = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .build();
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
@@ -122,6 +133,9 @@
     protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected K8sNodeService k8sNodeService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -141,6 +155,8 @@
 
     private MacAddress gwMacAddress;
 
+    private ConsistentMap<IpAddress, MacAddress> extHostMacStore;
+
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
 
@@ -159,6 +175,12 @@
         k8sNodeService.addListener(k8sNodeListener);
         packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
 
+        extHostMacStore = storageService.<IpAddress, MacAddress>consistentMapBuilder()
+                .withSerializer(Serializer.using(SERIALIZER_HOST_MAC))
+                .withName("k8s-host-mac-store")
+                .withApplicationId(appId)
+                .build();
+
         log.info("Started");
     }
 
@@ -193,10 +215,15 @@
         }
 
         ARP arpPacket = (ARP) ethPacket.getPayload();
-        if (arpPacket.getOpCode() != ARP.OP_REQUEST) {
-            return;
+        if (arpPacket.getOpCode() == ARP.OP_REQUEST) {
+            processArpRequest(context, ethPacket);
+        } else if (arpPacket.getOpCode() == ARP.OP_REPLY) {
+            processArpReply(context, ethPacket);
         }
+    }
 
+    private void processArpRequest(PacketContext context, Ethernet ethPacket) {
+        ARP arpPacket = (ARP) ethPacket.getPayload();
         K8sPort srcPort = k8sNetworkService.ports().stream()
                 .filter(p -> p.macAddress().equals(ethPacket.getSourceMAC()))
                 .findAny().orElse(null);
@@ -216,7 +243,7 @@
         IpAddress targetIp = Ip4Address.valueOf(arpPacket.getTargetProtocolAddress());
 
         MacAddress replyMac = k8sNetworkService.ports().stream()
-        //        .filter(p -> p.networkId().equals(srcPort.networkId()))
+                //        .filter(p -> p.networkId().equals(srcPort.networkId()))
                 .filter(p -> p.ipAddress().equals(targetIp))
                 .map(K8sPort::macAddress)
                 .findAny().orElse(null);
@@ -254,19 +281,43 @@
         }
 
         if (replyMac == null) {
-            String targetIpPrefix = targetIp.toString().split("\\.")[1];
-            String nodePrefix = NODE_IP_PREFIX + "." + targetIpPrefix;
 
-            String exBridgeCidr = k8sNodeService.completeNodes().stream()
-                    .map(n -> n.extBridgeIp().toString()).findAny().orElse(null);
+            if (targetIp.toString().startsWith(NODE_IP_PREFIX)) {
+                String targetIpPrefix = targetIp.toString().split("\\.")[1];
+                String nodePrefix = NODE_IP_PREFIX + "." + targetIpPrefix;
 
-            if (exBridgeCidr != null) {
-                String extBridgeIp = unshiftIpDomain(targetIp.toString(),
-                nodePrefix, exBridgeCidr);
+                String exBridgeCidr = k8sNodeService.completeNodes().stream()
+                        .map(n -> n.extBridgeIp().toString()).findAny().orElse(null);
 
-                replyMac = k8sNodeService.completeNodes().stream()
-                        .filter(n -> extBridgeIp.equals(n.extBridgeIp().toString()))
-                        .map(K8sNode::extBridgeMac).findAny().orElse(null);
+                if (exBridgeCidr != null) {
+                    String extBridgeIp = unshiftIpDomain(targetIp.toString(),
+                            nodePrefix, exBridgeCidr);
+
+                    replyMac = k8sNodeService.completeNodes().stream()
+                            .filter(n -> extBridgeIp.equals(n.extBridgeIp().toString()))
+                            .map(K8sNode::extBridgeMac).findAny().orElse(null);
+
+                    if (replyMac == null) {
+                        replyMac = extHostMacStore.asJavaMap().get(
+                                IpAddress.valueOf(extBridgeIp));
+                    }
+
+                    // if the source hosts are not in k8s cluster range,
+                    // we need to manually learn their MAC addresses
+                    if (replyMac == null) {
+                        ConnectPoint cp = context.inPacket().receivedFrom();
+                        K8sNode k8sNode = k8sNodeService.node(cp.deviceId());
+
+                        if (k8sNode != null) {
+                            setArpRequest(k8sNode.extBridgeMac().toBytes(),
+                                    k8sNode.extBridgeIp().toOctets(),
+                                    IpAddress.valueOf(extBridgeIp).toOctets(),
+                                    k8sNode);
+                            context.block();
+                            return;
+                        }
+                    }
+                }
             }
         }
 
@@ -296,6 +347,40 @@
         context.block();
     }
 
+    private void processArpReply(PacketContext context, Ethernet ethPacket) {
+        ARP arpPacket = (ARP) ethPacket.getPayload();
+        ConnectPoint cp = context.inPacket().receivedFrom();
+        K8sNode k8sNode = k8sNodeService.node(cp.deviceId());
+
+        if (k8sNode != null &&
+                ethPacket.getDestinationMAC().equals(k8sNode.extBridgeMac())) {
+            IpAddress srcIp = IpAddress.valueOf(IpAddress.Version.INET,
+                    arpPacket.getSenderProtocolAddress());
+            MacAddress srcMac = MacAddress.valueOf(arpPacket.getSenderHardwareAddress());
+
+            // we only add the host IP - MAC map store once,
+            // mutable MAP scenario is not considered for now
+            if (!extHostMacStore.containsKey(srcIp)) {
+                extHostMacStore.put(srcIp, srcMac);
+            }
+        }
+    }
+
+    private void setArpRequest(byte[] senderMac, byte[] senderIp,
+                               byte[] targetIp, K8sNode k8sNode) {
+        Ethernet ethRequest = ARP.buildArpRequest(senderMac,
+                                                  senderIp, targetIp, VlanId.NO_VID);
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(k8sNode.intgToExtPatchPortNum())
+                .build();
+
+        packetService.emit(new DefaultOutboundPacket(
+                k8sNode.intgBridge(),
+                treatment,
+                ByteBuffer.wrap(ethRequest.serialize())));
+    }
+
     private String getArpMode() {
         Set<ConfigProperty> properties = configService.getProperties(this.getClass().getName());
         return getPropertyValue(properties, ARP_MODE);