Implement east-west communication for k8s network

Change-Id: Ibac91b7a856e35a26cf0e0f23d6d01f65197625d
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sNetworkStore.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sNetworkStore.java
index 8e54bef..9117e31 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sNetworkStore.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/DistributedK8sNetworkStore.java
@@ -51,9 +51,13 @@
 import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_NETWORK_CREATED;
 import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_NETWORK_REMOVED;
 import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_NETWORK_UPDATED;
+import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_ACTIVATED;
 import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_CREATED;
+import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_INACTIVATED;
 import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_REMOVED;
 import static org.onosproject.k8snetworking.api.K8sNetworkEvent.Type.K8S_PORT_UPDATED;
+import static org.onosproject.k8snetworking.api.K8sPort.State.ACTIVE;
+import static org.onosproject.k8snetworking.api.K8sPort.State.INACTIVE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -254,11 +258,7 @@
                     break;
                 case UPDATE:
                     log.debug("Kubernetes port updated");
-                    eventExecutor.execute(() ->
-                            notifyDelegate(new K8sNetworkEvent(
-                                K8S_PORT_UPDATED,
-                                network(event.newValue().value().networkId()),
-                                event.newValue().value())));
+                    eventExecutor.execute(() -> processPortUpdate(event));
                     break;
                 case REMOVE:
                     log.debug("Kubernetes port removed");
@@ -272,5 +272,30 @@
                     break;
             }
         }
+
+        private void processPortUpdate(MapEvent<String, K8sPort> event) {
+            K8sPort.State oldState = event.oldValue().value().state();
+            K8sPort.State newState = event.newValue().value().state();
+
+            eventExecutor.execute(() ->
+                    notifyDelegate(new K8sNetworkEvent(
+                            K8S_PORT_UPDATED,
+                            network(event.newValue().value().networkId()),
+                            event.newValue().value())));
+
+            if (oldState == INACTIVE && newState == ACTIVE) {
+                notifyDelegate(new K8sNetworkEvent(
+                        K8S_PORT_ACTIVATED,
+                        network(event.newValue().value().networkId()),
+                        event.newValue().value()));
+            }
+
+            if (oldState == ACTIVE && newState == INACTIVE) {
+                notifyDelegate(new K8sNetworkEvent(
+                        K8S_PORT_INACTIVATED,
+                        network(event.newValue().value().networkId()),
+                        event.newValue().value()));
+            }
+        }
     }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
index 7cdf44e..52844d5 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
@@ -15,17 +15,24 @@
  */
 package org.onosproject.k8snetworking.impl;
 
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpPrefix;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snetworking.api.K8sNetwork;
+import org.onosproject.k8snetworking.api.K8sNetworkEvent;
+import org.onosproject.k8snetworking.api.K8sNetworkListener;
+import org.onosproject.k8snetworking.api.K8sNetworkService;
 import org.onosproject.k8snode.api.K8sNode;
 import org.onosproject.k8snode.api.K8sNodeEvent;
 import org.onosproject.k8snode.api.K8sNodeListener;
 import org.onosproject.k8snode.api.K8sNodeService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -48,7 +55,6 @@
 
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_TABLE;
-import static org.onosproject.k8snetworking.api.Constants.ACL_INGRESS_TABLE;
 import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
 import static org.onosproject.k8snetworking.api.Constants.DEFAULT_GATEWAY_MAC;
 import static org.onosproject.k8snetworking.api.Constants.DHCP_TABLE;
@@ -89,11 +95,15 @@
     protected LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNetworkService k8sNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected K8sNodeService k8sNodeService;
 
     private final ExecutorService deviceEventExecutor =
             Executors.newSingleThreadExecutor(groupedThreads(
                     getClass().getSimpleName(), "device-event"));
+    private final K8sNetworkListener internalNetworkListener = new InternalK8sNetworkListener();
     private final K8sNodeListener internalNodeListener = new InternalK8sNodeListener();
 
     private ApplicationId appId;
@@ -104,10 +114,10 @@
         appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
         coreService.registerApplication(K8S_NETWORKING_APP_ID);
         k8sNodeService.addListener(internalNodeListener);
+        k8sNetworkService.addListener(internalNetworkListener);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
-        k8sNodeService.completeNodes().forEach(node ->
-                                        initializePipeline(node.intgBridge()));
+        k8sNodeService.completeNodes().forEach(this::initializePipeline);
 
         log.info("Started");
     }
@@ -115,6 +125,7 @@
     @Deactivate
     protected void deactivate() {
         k8sNodeService.removeListener(internalNodeListener);
+        k8sNetworkService.removeListener(internalNetworkListener);
         leadershipService.withdraw(appId.name());
         deviceEventExecutor.shutdown();
 
@@ -200,7 +211,10 @@
         }));
     }
 
-    protected void initializePipeline(DeviceId deviceId) {
+    protected void initializePipeline(K8sNode k8sNode) {
+
+        DeviceId deviceId = k8sNode.intgBridge();
+
         // for inbound table transition
         connectTables(deviceId, STAT_INBOUND_TABLE, VTAP_INBOUND_TABLE);
         connectTables(deviceId, VTAP_INBOUND_TABLE, DHCP_TABLE);
@@ -211,24 +225,25 @@
         // for vTag and ARP table transition
         connectTables(deviceId, VTAG_TABLE, ARP_TABLE);
 
-        // for ARP and ACL table transition
-        connectTables(deviceId, ARP_TABLE, ACL_INGRESS_TABLE);
-
-        // for ACL and JUMP table transition
         connectTables(deviceId, ACL_EGRESS_TABLE, JUMP_TABLE);
 
+        // for ARP and ACL table transition
+        connectTables(deviceId, ARP_TABLE, JUMP_TABLE);
+
         // for JUMP table transition
         // we need JUMP table for bypassing routing table which contains large
         // amount of flow rules which might cause performance degradation during
         // table lookup
-        setupJumpTable(deviceId);
+        setupJumpTable(k8sNode);
 
         // for outbound table transition
         connectTables(deviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE);
         connectTables(deviceId, VTAP_OUTBOUND_TABLE, FORWARDING_TABLE);
     }
 
-    private void setupJumpTable(DeviceId deviceId) {
+    private void setupJumpTable(K8sNode k8sNode) {
+        DeviceId deviceId = k8sNode.intgBridge();
+
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
         TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
 
@@ -265,6 +280,50 @@
         applyRule(flowRule, true);
     }
 
+    private void setupHostGwRule(K8sNetwork k8sNetwork) {
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32));
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+        tBuilder.setOutput(PortNumber.LOCAL);
+
+        for (K8sNode node : k8sNodeService.completeNodes()) {
+            FlowRule flowRule = DefaultFlowRule.builder()
+                    .forDevice(node.intgBridge())
+                    .withSelector(sBuilder.build())
+                    .withTreatment(tBuilder.build())
+                    .withPriority(HIGH_PRIORITY)
+                    .fromApp(appId)
+                    .makePermanent()
+                    .forTable(JUMP_TABLE)
+                    .build();
+            applyRule(flowRule, true);
+        }
+
+        sBuilder = DefaultTrafficSelector.builder();
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPSrc(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32))
+                .matchIPDst(IpPrefix.valueOf(k8sNetwork.cidr()));
+
+        tBuilder = DefaultTrafficTreatment.builder();
+        tBuilder.setTunnelId(Long.valueOf(k8sNetwork.segmentId()))
+                .transition(STAT_OUTBOUND_TABLE);
+
+        for (K8sNode node : k8sNodeService.completeNodes()) {
+            FlowRule flowRule = DefaultFlowRule.builder()
+                    .forDevice(node.intgBridge())
+                    .withSelector(sBuilder.build())
+                    .withTreatment(tBuilder.build())
+                    .withPriority(HIGH_PRIORITY)
+                    .fromApp(appId)
+                    .makePermanent()
+                    .forTable(JUMP_TABLE)
+                    .build();
+            applyRule(flowRule, true);
+        }
+    }
+
     private class InternalK8sNodeListener implements K8sNodeListener {
         private boolean isRelevantHelper() {
             return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
@@ -283,7 +342,7 @@
                             return;
                         }
 
-                        initializePipeline(k8sNode.intgBridge());
+                        initializePipeline(k8sNode);
                     });
                     break;
                 case K8S_NODE_CREATED:
@@ -293,4 +352,33 @@
             }
         }
     }
+
+    private class InternalK8sNetworkListener implements K8sNetworkListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sNetworkEvent event) {
+
+            switch (event.type()) {
+                case K8S_NETWORK_CREATED:
+                    deviceEventExecutor.execute(() -> processNetworkCreation(event.subject()));
+                    break;
+                case K8S_NETWORK_REMOVED:
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processNetworkCreation(K8sNetwork network) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setupHostGwRule(network);
+        }
+    }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
new file mode 100644
index 0000000..90ad174
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
@@ -0,0 +1,361 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import org.onlab.packet.ARP;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPort;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeEvent;
+import org.onosproject.k8snode.api.K8sNodeListener;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE;
+import static org.onosproject.k8snetworking.api.Constants.ARP_PROXY_MODE;
+import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE_DEFAULT;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC_DEFAULT;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
+
+/**
+ * Handles ARP packet from containers.
+ */
+@Component(
+        immediate = true,
+        property = {
+                GATEWAY_MAC + "=" + GATEWAY_MAC_DEFAULT,
+                ARP_MODE + "=" + ARP_MODE_DEFAULT
+        }
+)
+public class K8sSwitchingArpHandler {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PacketService packetService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService configService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNodeService k8sNodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNetworkService k8sNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sFlowRuleService k8sFlowRuleService;
+
+    /** Fake MAC address for virtual network subnet gateway. */
+    private String gatewayMac = GATEWAY_MAC_DEFAULT;
+
+    /** ARP processing mode, broadcast | proxy (default). */
+    protected String arpMode = ARP_MODE_DEFAULT;
+
+    private MacAddress gwMacAddress;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+    private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
+    private final InternalNodeEventListener k8sNodeListener = new InternalNodeEventListener();
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    void activate() {
+        appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+        configService.registerProperties(getClass());
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        k8sNodeService.addListener(k8sNodeListener);
+        packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    void deactivate() {
+        packetService.removeProcessor(packetProcessor);
+        k8sNodeService.removeListener(k8sNodeListener);
+        leadershipService.withdraw(appId.name());
+        configService.unregisterProperties(getClass(), false);
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    @Modified
+    void modified(ComponentContext context) {
+        readComponentConfiguration(context);
+
+        log.info("Modified");
+    }
+
+    /**
+     * Processes ARP request packets.
+     *
+     * @param context   packet context
+     * @param ethPacket ethernet packet
+     */
+    private void processPacketIn(PacketContext context, Ethernet ethPacket) {
+        // if the ARP mode is configured as broadcast mode, we simply ignore ARP packet_in
+        if (ARP_BROADCAST_MODE.equals(getArpMode())) {
+            return;
+        }
+
+        ARP arpPacket = (ARP) ethPacket.getPayload();
+        if (arpPacket.getOpCode() != ARP.OP_REQUEST) {
+            return;
+        }
+
+        K8sPort srcPort = k8sNetworkService.ports().stream()
+                .filter(p -> p.macAddress().equals(ethPacket.getSourceMAC()))
+                .findAny().orElse(null);
+
+        if (srcPort == null && !context.inPacket().receivedFrom().port()
+                .equals(PortNumber.LOCAL)) {
+            log.warn("Failed to find source port(MAC:{})", ethPacket.getSourceMAC());
+            return;
+        }
+
+        // FIXME: this is a workaround for storing host GW MAC address,
+        // need to find a way to store the MAC address in persistent way
+        if (context.inPacket().receivedFrom().port().equals(PortNumber.LOCAL)) {
+            gwMacAddress = ethPacket.getSourceMAC();
+        }
+
+        IpAddress targetIp = Ip4Address.valueOf(arpPacket.getTargetProtocolAddress());
+
+        MacAddress replyMac = k8sNetworkService.ports().stream()
+        //        .filter(p -> p.networkId().equals(srcPort.networkId()))
+                .filter(p -> p.ipAddress().equals(targetIp))
+                .map(K8sPort::macAddress)
+                .findAny().orElse(null);
+
+        long gwIpCnt = k8sNetworkService.networks().stream()
+                .filter(n -> n.gatewayIp().equals(targetIp))
+                .count();
+
+        if (gwIpCnt > 0) {
+            replyMac = gwMacAddress;
+        }
+
+        if (replyMac == null) {
+            log.debug("Failed to find MAC address for {}", targetIp);
+            return;
+        }
+
+        Ethernet ethReply = ARP.buildArpReply(
+                targetIp.getIp4Address(),
+                replyMac,
+                ethPacket);
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setOutput(context.inPacket().receivedFrom().port())
+                .build();
+
+        packetService.emit(new DefaultOutboundPacket(
+                context.inPacket().receivedFrom().deviceId(),
+                treatment,
+                ByteBuffer.wrap(ethReply.serialize())));
+    }
+
+    private String getArpMode() {
+        Set<ConfigProperty> properties = configService.getProperties(this.getClass().getName());
+        return getPropertyValue(properties, ARP_MODE);
+    }
+
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        String updatedMac = Tools.get(properties, GATEWAY_MAC);
+        gatewayMac = updatedMac != null ? updatedMac : GATEWAY_MAC_DEFAULT;
+        log.info("Configured. Gateway MAC is {}", gatewayMac);
+    }
+
+    /**
+     * An internal packet processor which processes ARP request, and results in
+     * packet-out ARP reply.
+     */
+    private class InternalPacketProcessor implements PacketProcessor {
+
+        @Override
+        public void process(PacketContext context) {
+            if (context.isHandled()) {
+                return;
+            }
+
+            Ethernet ethPacket = context.inPacket().parsed();
+            if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
+                return;
+            }
+
+            eventExecutor.execute(() -> processPacketIn(context, ethPacket));
+        }
+    }
+
+    /**
+     * An internal kubernetes node listener which is used for listening kubernetes
+     * node activity. As long as a node is in complete state, we will install
+     * default ARP rule to handle ARP request.
+     */
+    private class InternalNodeEventListener implements K8sNodeListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sNodeEvent event) {
+            K8sNode k8sNode = event.subject();
+            switch (event.type()) {
+                case K8S_NODE_COMPLETE:
+                    eventExecutor.execute(() -> processNodeCompletion(k8sNode));
+                    break;
+                case K8S_NODE_INCOMPLETE:
+                    eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processNodeCompletion(K8sNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setDefaultArpRule(node, true);
+        }
+
+        private void processNodeIncompletion(K8sNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setDefaultArpRule(node, false);
+        }
+
+        private void setDefaultArpRule(K8sNode node, boolean install) {
+
+            if (getArpMode() == null) {
+                return;
+            }
+
+            switch (getArpMode()) {
+                case ARP_PROXY_MODE:
+                    setDefaultArpRuleForProxyMode(node, install);
+                    break;
+                case ARP_BROADCAST_MODE:
+                    // TODO: need to implement broadcast mode
+                    log.warn("Not implemented yet.");
+                    break;
+                default:
+                    log.warn("Invalid ARP mode {}. Please use either " +
+                            "broadcast or proxy mode.", getArpMode());
+                    break;
+            }
+        }
+
+        private void setDefaultArpRuleForProxyMode(K8sNode node, boolean install) {
+            TrafficSelector selector = DefaultTrafficSelector.builder()
+                    .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+                    .build();
+
+            TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                    .punt()
+                    .build();
+
+            k8sFlowRuleService.setRule(
+                    appId,
+                    node.intgBridge(),
+                    selector,
+                    treatment,
+                    PRIORITY_ARP_CONTROL_RULE,
+                    ARP_TABLE,
+                    install
+            );
+        }
+    }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
new file mode 100644
index 0000000..53af926
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import com.google.common.base.Strings;
+import org.onlab.packet.Ethernet;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snetworking.api.K8sNetwork;
+import org.onosproject.k8snetworking.api.K8sNetworkEvent;
+import org.onosproject.k8snetworking.api.K8sNetworkListener;
+import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPort;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.ACL_EGRESS_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE;
+import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_SWITCHING_RULE;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_TUNNEL_TAG_RULE;
+import static org.onosproject.k8snetworking.api.Constants.VTAG_TABLE;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Populates switching flow rules on OVS for the basic connectivity among the
+ * container in the same network.
+ */
+@Component(immediate = true)
+public class K8sSwitchingHandler {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String ARP_MODE = "arpMode";
+    private static final String ERR_SET_FLOWS_VNI = "Failed to set flows for " +
+            "%s: Failed to get VNI for %s";
+    private static final String STR_NONE = "<none>";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DriverService driverService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService configService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sFlowRuleService k8sFlowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNetworkService k8sNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNodeService k8sNodeService;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+    private final InternalK8sNetworkListener k8sNetworkListener =
+            new InternalK8sNetworkListener();
+
+    private ApplicationId appId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+        k8sNetworkService.addListener(k8sNetworkListener);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        k8sNetworkService.removeListener(k8sNetworkListener);
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    /**
+     * Configures the flow rules which are used for L2 packet switching.
+     * Note that these rules will be inserted in switching table (table 5).
+     *
+     * @param port      kubernetes port object
+     * @param install   install flag, add the rule if true, remove it otherwise
+     */
+    private void setForwardingRulesForTunnel(K8sPort port, boolean install) {
+        // switching rules for the instPorts in the same node
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                // TODO: need to handle IPv6 in near future
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(port.ipAddress().toIpPrefix())
+                .matchTunnelId(getVni(port))
+                .build();
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setEthDst(port.macAddress())
+                .setOutput(port.portNumber())
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                port.deviceId(),
+                selector,
+                treatment,
+                PRIORITY_SWITCHING_RULE,
+                FORWARDING_TABLE,
+                install);
+
+        // switching rules for the node in the remote node
+        K8sNode localNode = k8sNodeService.node(port.deviceId());
+        if (localNode == null) {
+            final String error = String.format("Cannot find kubernetes node for %s",
+                    port.deviceId());
+            throw new IllegalStateException(error);
+        }
+        k8sNodeService.completeNodes().stream()
+                .filter(remoteNode -> !remoteNode.intgBridge().equals(localNode.intgBridge()))
+                .forEach(remoteNode -> {
+                    PortNumber portNum = tunnelPortNumByNetId(port.networkId(),
+                            k8sNetworkService, remoteNode);
+                    TrafficTreatment treatmentToRemote = DefaultTrafficTreatment.builder()
+                            .extension(buildExtension(
+                                    deviceService,
+                                    remoteNode.intgBridge(),
+                                    localNode.dataIp().getIp4Address()),
+                                    remoteNode.intgBridge())
+                            .setOutput(portNum)
+                            .build();
+
+                    k8sFlowRuleService.setRule(
+                            appId,
+                            remoteNode.intgBridge(),
+                            selector,
+                            treatmentToRemote,
+                            PRIORITY_SWITCHING_RULE,
+                            FORWARDING_TABLE,
+                            install);
+                });
+    }
+
+    private void setTunnelTagArpFlowRules(K8sPort port, boolean install) {
+        setTunnelTagFlowRules(port, Ethernet.TYPE_ARP, install);
+    }
+
+    private void setTunnelTagIpFlowRules(K8sPort port, boolean install) {
+        setTunnelTagFlowRules(port, Ethernet.TYPE_IPV4, install);
+    }
+
+    private void setNetworkRulesForTunnel(K8sPort port, boolean install) {
+        setTunnelTagIpFlowRules(port, install);
+        setForwardingRulesForTunnel(port, install);
+
+        if (ARP_BROADCAST_MODE.equals(getArpMode())) {
+            setTunnelTagArpFlowRules(port, install);
+        }
+    }
+
+    /**
+     * Configures the flow rule which is for using VXLAN/GRE/GENEVE to tag the packet
+     * based on the in_port number of a virtual instance.
+     * Note that this rule will be inserted in vTag table.
+     *
+     * @param port kubernetes port object
+     * @param install install flag, add the rule if true, remove it otherwise
+     */
+    private void setTunnelTagFlowRules(K8sPort port, short ethType, boolean install) {
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(ethType)
+                .matchInPort(port.portNumber())
+                .build();
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+                .setTunnelId(getVni(port));
+
+
+        if (ethType == Ethernet.TYPE_ARP) {
+            tBuilder.transition(ARP_TABLE);
+        } else if (ethType == Ethernet.TYPE_IPV4) {
+            tBuilder.transition(ACL_EGRESS_TABLE);
+        }
+
+        k8sFlowRuleService.setRule(
+                appId,
+                port.deviceId(),
+                selector,
+                tBuilder.build(),
+                PRIORITY_TUNNEL_TAG_RULE,
+                VTAG_TABLE,
+                install);
+    }
+
+    /**
+     * Obtains the VNI from the given kubernetes port.
+     *
+     * @param port kubernetes port object
+     * @return Virtual Network Identifier (VNI)
+     */
+    private Long getVni(K8sPort port) {
+        K8sNetwork k8sNet = k8sNetworkService.network(port.networkId());
+        if (k8sNet == null || Strings.isNullOrEmpty(k8sNet.segmentId())) {
+            final String error =
+                    String.format(ERR_SET_FLOWS_VNI,
+                            port, k8sNet == null ? STR_NONE : k8sNet.name());
+            throw new IllegalStateException(error);
+        }
+        return Long.valueOf(k8sNet.segmentId());
+    }
+
+    private void setNetworkRules(K8sPort port, boolean install) {
+        K8sNetwork k8sNet = k8sNetworkService.network(port.networkId());
+
+        if (k8sNet == null) {
+            log.warn("Network {} is not found from port {}.", port.networkId(), port.portId());
+            return;
+        }
+
+        switch (k8sNet.type()) {
+            case VXLAN:
+            case GRE:
+            case GENEVE:
+                setNetworkRulesForTunnel(port, install);
+                break;
+            default:
+                log.warn("The given network type {} is not supported.", k8sNet.type().name());
+                break;
+        }
+    }
+
+    private String getArpMode() {
+        Set<ConfigProperty> properties =
+                configService.getProperties(K8sSwitchingArpHandler.class.getName());
+        return getPropertyValue(properties, ARP_MODE);
+    }
+
+    private class InternalK8sNetworkListener implements K8sNetworkListener {
+
+        private boolean isRelevantHelper(K8sNetworkEvent event) {
+            return mastershipService.isLocalMaster(event.port().deviceId());
+        }
+
+        @Override
+        public void event(K8sNetworkEvent event) {
+            switch (event.type()) {
+                case K8S_PORT_ACTIVATED:
+                    eventExecutor.execute(() -> processInstanceDetection(event));
+                    break;
+                case K8S_PORT_REMOVED:
+                    eventExecutor.execute(() -> processInstanceRemoval(event));
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processInstanceDetection(K8sNetworkEvent event) {
+            if (!isRelevantHelper(event)) {
+                return;
+            }
+
+            setNetworkRules(event.port(), true);
+        }
+
+        private void processInstanceRemoval(K8sNetworkEvent event) {
+            if (!isRelevantHelper(event)) {
+                return;
+            }
+
+            setNetworkRules(event.port(), false);
+        }
+    }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
index f7f78c9..1dec9d1 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHostProvider.java
@@ -22,7 +22,7 @@
 import org.onlab.util.Tools;
 import org.onosproject.core.CoreService;
 import org.onosproject.k8snetworking.api.K8sNetwork;
-import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
 import org.onosproject.k8snetworking.api.K8sPort;
 import org.onosproject.k8snode.api.K8sNode;
 import org.onosproject.k8snode.api.K8sNodeEvent;
@@ -79,8 +79,8 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String ERR_ADD_HOST = "Failed to add host: ";
-    private static final String SONA_HOST_SCHEME = "sona";
-    private static final int PORT_PREFIX_LENGTH = 3;
+    private static final String SONA_HOST_SCHEME = "sona-k8s";
+    private static final int PORT_PREFIX_LENGTH = 4;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
@@ -98,7 +98,7 @@
     protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected K8sNetworkService k8sNetworkService;
+    protected K8sNetworkAdminService k8sNetworkService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected K8sNodeService k8sNodeService;
@@ -174,6 +174,11 @@
 
         long createTime = System.currentTimeMillis();
 
+        // update k8s port number by referring to ONOS port number
+
+        k8sNetworkService.updatePort(k8sPort.updatePortNumber(port.number())
+                                            .updateState(K8sPort.State.ACTIVE));
+
         // we check whether the host already attached to same locations
         Host host = hostService.getHost(hostId);
 
@@ -291,6 +296,10 @@
                 return;
             }
 
+            log.debug("K8s port {} is updated at {}",
+                    event.port().annotations().value(PORT_NAME),
+                    event.subject().id());
+
             if (!event.port().isEnabled()) {
                 processPortRemoval(event);
             } else if (event.port().isEnabled()) {
@@ -303,7 +312,11 @@
                 return;
             }
 
-            processPortAddition(event);
+            log.debug("K8s port {} is detected from {}",
+                    event.port().annotations().value(PORT_NAME),
+                    event.subject().id());
+
+            processPortAdded(event.port());
         }
 
         private void processPortRemoval(DeviceEvent event) {
@@ -311,7 +324,11 @@
                 return;
             }
 
-            processPortRemoval(event);
+            log.debug("K8s port {} is removed from {}",
+                    event.port().annotations().value(PORT_NAME),
+                    event.subject().id());
+
+            processPortRemoved(event.port());
         }
     }
 
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/OsgiPropertyConstants.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..734b8e5
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/OsgiPropertyConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+/**
+ * Name/Value constants for properties.
+ */
+public final class OsgiPropertyConstants {
+
+    private OsgiPropertyConstants() {
+    }
+
+    static final String GATEWAY_MAC = "gatewayMac";
+    static final String GATEWAY_MAC_DEFAULT = "fe:00:00:00:00:02";
+
+    static final String ARP_MODE = "arpMode";
+    static final String ARP_MODE_DEFAULT = "proxy";
+
+    static final String DHCP_SERVER_MAC = "dhcpServerMac";
+    static final String DHCP_SERVER_MAC_DEFAULT = "fe:00:00:00:00:02";
+}