Initial implementation of  svc IP to pod IP translation using DNAT

Change-Id: I6e2f6936636e929ad60150cc67aa6316eef32911
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
index 763bae6..647c8e7 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
@@ -70,6 +70,7 @@
     public static final int CT_TABLE = 45;
     public static final int ACL_RECIRC_TABLE = 43;
     public static final int JUMP_TABLE = 50;
+    public static final int NAT_TABLE = 51;
     public static final int ROUTING_TABLE = 60;
     public static final int STAT_OUTBOUND_TABLE = 70;
     public static final int VTAP_OUTBOUND_TABLE = 71;
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
new file mode 100644
index 0000000..fc935c0
--- /dev/null
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.api;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupDescription.Type;
+
+import java.util.List;
+
+/**
+ * Service for setting group rules.
+ */
+public interface K8sGroupRuleService {
+
+    /**
+     * Configures the group table rule.
+     *
+     * @param appId         application ID
+     * @param deviceId      device ID
+     * @param groupId       group ID
+     * @param type          group type
+     * @param buckets       a list of group buckets
+     * @param install       true for rule addition, false for rule removal
+     */
+    void setRule(ApplicationId appId, DeviceId deviceId, int groupId,
+                 Type type, List<GroupBucket> buckets, boolean install);
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sEndpointsListCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sEndpointsListCommand.java
index 6b4c997..018cfb2 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sEndpointsListCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sEndpointsListCommand.java
@@ -34,6 +34,7 @@
 public class K8sEndpointsListCommand extends AbstractShellCommand {
 
     private static final String FORMAT = "%-50s%-50s%-20s";
+    private static final String PORT_PROTOCOL_SEPARATOR = "/";
 
     @Override
     protected void doExecute() {
@@ -46,17 +47,18 @@
         for (Endpoints endpoints : endpointses) {
 
             List<String> ips = Lists.newArrayList();
-            List<Integer> ports = Lists.newArrayList();
+            List<String> portWithProtocol = Lists.newArrayList();
 
             endpoints.getSubsets().forEach(e -> {
                 e.getAddresses().forEach(a -> ips.add(a.getIp()));
-                e.getPorts().forEach(p -> ports.add(p.getPort()));
+                e.getPorts().forEach(p -> portWithProtocol.add(p.getPort() +
+                        PORT_PROTOCOL_SEPARATOR + p.getProtocol()));
             });
 
             print(FORMAT,
                     endpoints.getMetadata().getName(),
                     ips.isEmpty() ? "" : ips,
-                    ports.isEmpty() ? "" : ports);
+                    portWithProtocol.isEmpty() ? "" : portWithProtocol);
         }
     }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPodListCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPodListCommand.java
index c1bb526..a144397 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPodListCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPodListCommand.java
@@ -41,7 +41,7 @@
         List<Pod> pods = Lists.newArrayList(service.pods());
         pods.sort(Comparator.comparing(p -> p.getMetadata().getName()));
 
-        print(FORMAT, "Name", "Namespace", "IP", "Containers");
+        print(FORMAT, "Name", "Namespace", "IP Address", "Containers");
 
         for (Pod pod : pods) {
 
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPortListCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPortListCommand.java
index 28cd8a7..e344971 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPortListCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sPortListCommand.java
@@ -59,7 +59,7 @@
         if (outputJson()) {
             print("%s", json(ports));
         } else {
-            print(FORMAT, "ID", "Network", "MAC", "Fixed IPs");
+            print(FORMAT, "ID", "Network", "MAC Address", "Fixed IPs");
             for (K8sPort port: ports) {
                 K8sNetwork k8sNet = service.network(port.networkId());
                 print(FORMAT, port.portId(),
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sServiceListCommand.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sServiceListCommand.java
index 800a1ab..ade0b01 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sServiceListCommand.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/cli/K8sServiceListCommand.java
@@ -33,6 +33,7 @@
 public class K8sServiceListCommand extends AbstractShellCommand {
 
     private static final String FORMAT = "%-50s%-30s%-30s";
+    private static final String PORT_PROTOCOL_SEPARATOR = "/";
 
     @Override
     protected void doExecute() {
@@ -45,14 +46,16 @@
 
         for (io.fabric8.kubernetes.api.model.Service svc : services) {
 
-            List<Integer> ports = Lists.newArrayList();
+            List<String> portWithProtocol = Lists.newArrayList();
 
-            svc.getSpec().getPorts().forEach(p -> ports.add(p.getPort()));
+            svc.getSpec().getPorts().forEach(p ->
+                    portWithProtocol.add(p.getPort() +
+                    PORT_PROTOCOL_SEPARATOR + p.getProtocol()));
 
             print(FORMAT,
                     svc.getMetadata().getName(),
                     svc.getSpec().getClusterIP(),
-                    ports.isEmpty() ? "" : ports);
+                    portWithProtocol.isEmpty() ? "" : portWithProtocol);
         }
     }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sApiServerProxyHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sApiServerProxyHandler.java
deleted file mode 100644
index ef1acaa..0000000
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sApiServerProxyHandler.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.k8snetworking.impl;
-
-import org.onlab.packet.Ethernet;
-import org.onlab.packet.IPv4;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.TpPort;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.k8snetworking.api.K8sFlowRuleService;
-import org.onosproject.k8snode.api.K8sNode;
-import org.onosproject.k8snode.api.K8sNodeEvent;
-import org.onosproject.k8snode.api.K8sNodeListener;
-import org.onosproject.k8snode.api.K8sNodeService;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.DefaultTrafficSelector;
-import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.TrafficSelector;
-import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.packet.PacketService;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.k8snetworking.api.Constants.FORWARDING_TABLE;
-import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
-import static org.onosproject.k8snetworking.api.Constants.PRIORITY_TRANSLATION_RULE;
-import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Handles kubernetes API server requests from pods.
- */
-@Component(immediate = true)
-public class K8sApiServerProxyHandler {
-    protected final Logger log = getLogger(getClass());
-
-    private static final String API_SERVER_CLUSTER_IP = "10.96.0.1";
-    private static final int API_SERVER_CLUSTER_PORT = 443;
-    private static final String API_SERVER_IP = "10.10.10.1";
-    private static final int API_SERVER_PORT = 6443;
-    private static final int PREFIX_LENGTH = 32;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected CoreService coreService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected PacketService packetService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected LeadershipService leadershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected K8sNodeService k8sNodeService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected K8sFlowRuleService k8sFlowRuleService;
-
-    private final ExecutorService eventExecutor = newSingleThreadExecutor(
-            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
-    private final K8sNodeListener k8sNodeListener = new InternalNodeEventListener();
-
-    private ApplicationId appId;
-    private NodeId localNodeId;
-
-    @Activate
-    protected void activate() {
-        appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
-        localNodeId = clusterService.getLocalNode().id();
-        k8sNodeService.addListener(k8sNodeListener);
-        leadershipService.runForLeadership(appId.name());
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        k8sNodeService.removeListener(k8sNodeListener);
-        leadershipService.withdraw(appId.name());
-        eventExecutor.shutdown();
-
-        log.info("Stopped");
-    }
-
-    private class InternalNodeEventListener implements K8sNodeListener {
-
-        private boolean isRelevantHelper() {
-            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
-        }
-
-        @Override
-        public void event(K8sNodeEvent event) {
-            K8sNode k8sNode = event.subject();
-            switch (event.type()) {
-                case K8S_NODE_COMPLETE:
-                    eventExecutor.execute(() -> processNodeCompletion(k8sNode));
-                    break;
-                case K8S_NODE_INCOMPLETE:
-                    eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        private void processNodeCompletion(K8sNode k8sNode) {
-            if (!isRelevantHelper()) {
-                return;
-            }
-
-            setRequestTranslationRule(k8sNode, true);
-            setResponseTranslationRule(k8sNode, true);
-        }
-
-        private void processNodeIncompletion(K8sNode k8sNode) {
-            if (!isRelevantHelper()) {
-                return;
-            }
-
-            setRequestTranslationRule(k8sNode, false);
-            setResponseTranslationRule(k8sNode, false);
-        }
-
-        /**
-         * Installs k8s API server rule for receiving all API request packets.
-         *
-         * @param k8sNode    kubernetes node
-         * @param install    installation flag
-         */
-        private void setRequestTranslationRule(K8sNode k8sNode, boolean install) {
-            TrafficSelector selector = DefaultTrafficSelector.builder()
-                    .matchEthType(Ethernet.TYPE_IPV4)
-                    .matchIPProtocol(IPv4.PROTOCOL_TCP)
-                    .matchIPDst(IpPrefix.valueOf(
-                            IpAddress.valueOf(API_SERVER_CLUSTER_IP), PREFIX_LENGTH))
-                    .matchTcpDst(TpPort.tpPort(API_SERVER_CLUSTER_PORT))
-                    .build();
-
-            TrafficTreatment treatment = DefaultTrafficTreatment.builder()
-                    .setIpDst(IpAddress.valueOf(API_SERVER_IP))
-                    .setTcpDst(TpPort.tpPort(API_SERVER_PORT))
-                    .setOutput(PortNumber.LOCAL)
-                    .build();
-
-            k8sFlowRuleService.setRule(
-                    appId,
-                    k8sNode.intgBridge(),
-                    selector,
-                    treatment,
-                    PRIORITY_TRANSLATION_RULE,
-                    STAT_OUTBOUND_TABLE,
-                    install
-            );
-        }
-
-        /**
-         * Installs k8s API server rule for receiving all API response packets.
-         *
-         * @param k8sNode    kubernetes node
-         * @param install    installation flag
-         */
-        private void setResponseTranslationRule(K8sNode k8sNode, boolean install) {
-            TrafficSelector selector = DefaultTrafficSelector.builder()
-                    .matchEthType(Ethernet.TYPE_IPV4)
-                    .matchIPProtocol(IPv4.PROTOCOL_TCP)
-                    .matchIPSrc(IpPrefix.valueOf(
-                            IpAddress.valueOf(API_SERVER_IP), PREFIX_LENGTH))
-                    .matchTcpSrc(TpPort.tpPort(API_SERVER_PORT))
-                    .build();
-
-            TrafficTreatment treatment = DefaultTrafficTreatment.builder()
-                    .setIpSrc(IpAddress.valueOf(API_SERVER_CLUSTER_IP))
-                    .setTcpSrc(TpPort.tpPort(API_SERVER_CLUSTER_PORT))
-                    .transition(FORWARDING_TABLE)
-                    .build();
-
-            k8sFlowRuleService.setRule(
-                    appId,
-                    k8sNode.intgBridge(),
-                    selector,
-                    treatment,
-                    PRIORITY_TRANSLATION_RULE,
-                    STAT_OUTBOUND_TABLE,
-                    install
-            );
-        }
-    }
-}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
index 9bfd8ea..e62b6a1 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
@@ -230,7 +230,10 @@
         // we need JUMP table for bypassing routing table which contains large
         // amount of flow rules which might cause performance degradation during
         // table lookup
-        setupJumpTable(k8sNode);
+        // setupJumpTable(k8sNode);
+
+        // for routing and outbound table transition
+        connectTables(deviceId, ROUTING_TABLE, STAT_OUTBOUND_TABLE);
 
         // for outbound table transition
         connectTables(deviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE);
@@ -276,34 +279,14 @@
         applyRule(flowRule, true);
     }
 
-    private void setupHostGwRule(K8sNetwork k8sNetwork) {
-        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
-        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
-                .matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32));
-
-        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
-        tBuilder.setOutput(PortNumber.LOCAL);
-
-        for (K8sNode node : k8sNodeService.completeNodes()) {
-            FlowRule flowRule = DefaultFlowRule.builder()
-                    .forDevice(node.intgBridge())
-                    .withSelector(sBuilder.build())
-                    .withTreatment(tBuilder.build())
-                    .withPriority(HIGH_PRIORITY)
-                    .fromApp(appId)
-                    .makePermanent()
-                    .forTable(JUMP_TABLE)
-                    .build();
-            applyRule(flowRule, true);
-        }
-
-        sBuilder = DefaultTrafficSelector.builder();
-        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+    private void setupHostRoutingRule(K8sNetwork k8sNetwork) {
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
                 .matchIPSrc(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32))
                 .matchIPDst(IpPrefix.valueOf(k8sNetwork.cidr()));
 
-        tBuilder = DefaultTrafficTreatment.builder();
-        tBuilder.setTunnelId(Long.valueOf(k8sNetwork.segmentId()))
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+                .setTunnelId(Long.valueOf(k8sNetwork.segmentId()))
                 .transition(STAT_OUTBOUND_TABLE);
 
         for (K8sNode node : k8sNodeService.completeNodes()) {
@@ -314,7 +297,29 @@
                     .withPriority(HIGH_PRIORITY)
                     .fromApp(appId)
                     .makePermanent()
-                    .forTable(JUMP_TABLE)
+                    .forTable(ROUTING_TABLE)
+                    .build();
+            applyRule(flowRule, true);
+        }
+    }
+
+    private void setupGatewayRoutingRule(K8sNetwork k8sNetwork) {
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32));
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+                .setOutput(PortNumber.LOCAL);
+
+        for (K8sNode node : k8sNodeService.completeNodes()) {
+            FlowRule flowRule = DefaultFlowRule.builder()
+                    .forDevice(node.intgBridge())
+                    .withSelector(sBuilder.build())
+                    .withTreatment(tBuilder.build())
+                    .withPriority(HIGH_PRIORITY)
+                    .fromApp(appId)
+                    .makePermanent()
+                    .forTable(ROUTING_TABLE)
                     .build();
             applyRule(flowRule, true);
         }
@@ -346,7 +351,10 @@
             }
 
             initializePipeline(node);
-            k8sNetworkService.networks().forEach(K8sFlowRuleManager.this::setupHostGwRule);
+            k8sNetworkService.networks().forEach(n -> {
+                setupHostRoutingRule(n);
+                setupGatewayRoutingRule(n);
+            });
         }
     }
 
@@ -375,7 +383,8 @@
                 return;
             }
 
-            setupHostGwRule(network);
+            setupHostRoutingRule(network);
+            setupGatewayRoutingRule(network);
         }
     }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
new file mode 100644
index 0000000..aa2b90f
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.k8snetworking.api.K8sGroupRuleService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupDescription.Type;
+import org.onosproject.net.group.GroupService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getGroupKey;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Sets group table rules directly using GroupService.
+ */
+@Component(immediate = true, service = K8sGroupRuleService.class)
+public class K8sGroupRuleManager implements K8sGroupRuleService {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected GroupService groupService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    private ApplicationId appId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+        coreService.registerApplication(K8S_NETWORKING_APP_ID);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public void setRule(ApplicationId appId, DeviceId deviceId, int groupId,
+                        Type type, List<GroupBucket> buckets, boolean install) {
+        GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
+                type, new GroupBuckets(buckets), getGroupKey(groupId), groupId, appId);
+
+        if (install) {
+            groupService.addGroup(groupDesc);
+        } else {
+            groupService.removeGroup(deviceId, getGroupKey(groupId), appId);
+        }
+    }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
new file mode 100644
index 0000000..dfdba64
--- /dev/null
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -0,0 +1,437 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.k8snetworking.impl;
+
+import com.google.common.collect.Lists;
+import io.fabric8.kubernetes.api.model.EndpointAddress;
+import io.fabric8.kubernetes.api.model.EndpointPort;
+import io.fabric8.kubernetes.api.model.EndpointSubset;
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.Service;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.TpPort;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.GroupId;
+import org.onosproject.k8snetworking.api.K8sEndpointsService;
+import org.onosproject.k8snetworking.api.K8sFlowRuleService;
+import org.onosproject.k8snetworking.api.K8sGroupRuleService;
+import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sServiceEvent;
+import org.onosproject.k8snetworking.api.K8sServiceListener;
+import org.onosproject.k8snetworking.api.K8sServiceService;
+import org.onosproject.k8snetworking.util.RulePopulatorUtil;
+import org.onosproject.k8snetworking.util.RulePopulatorUtil.NiciraConnTrackTreatmentBuilder;
+import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeEvent;
+import org.onosproject.k8snode.api.K8sNodeListener;
+import org.onosproject.k8snode.api.K8sNodeService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.ExtensionSelector;
+import org.onosproject.net.flow.instructions.ExtensionTreatment;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
+import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
+import static org.onosproject.net.group.GroupDescription.Type.SELECT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles the service IP to pod IP related translation traffic.
+ */
+@Component(immediate = true)
+public class K8sServiceHandler {
+
+    private final Logger log = getLogger(getClass());
+
+    // TODO: need to inject service IP CIDR through REST
+    private static final String SERVICE_IP_CIDR = "10.96.0.0/24";
+
+    private static final int HOST_CIDR_NUM = 32;
+
+    private static final String NONE = "None";
+    private static final String CLUSTER_IP = "ClusterIP";
+    private static final String TCP = "TCP";
+
+    private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DriverService driverService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNetworkService k8sNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sFlowRuleService k8sFlowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sGroupRuleService k8sGroupRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sNodeService k8sNodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sEndpointsService k8sEndpointsService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sServiceService k8sServiceService;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+    private final InternalNodeEventListener internalNodeEventListener =
+            new InternalNodeEventListener();
+    private final InternalK8sServiceListener internalK8sServiceListener =
+            new InternalK8sServiceListener();
+
+    private AtomicCounter groupIdCounter;
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        k8sNodeService.addListener(internalNodeEventListener);
+        k8sServiceService.addListener(internalK8sServiceListener);
+
+        groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        leadershipService.withdraw(appId.name());
+        k8sNodeService.removeListener(internalNodeEventListener);
+        k8sServiceService.removeListener(internalK8sServiceListener);
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    private void setServiceNatRules(DeviceId deviceId, boolean install) {
+        // -trk CT rules
+        long ctUntrack = computeCtStateFlag(false, false, false);
+        long ctMaskUntrack = computeCtMaskFlag(true, false, false);
+
+        k8sNetworkService.networks().forEach(n -> {
+            // TODO: need to provide a way to add multiple service IP CIDR ranges
+            setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), SERVICE_IP_CIDR,
+                    JUMP_TABLE, NAT_TABLE, PRIORITY_CT_RULE, install);
+            setUntrack(deviceId, ctUntrack, ctMaskUntrack, n.cidr(), n.cidr(),
+                    JUMP_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
+        });
+
+        // +trk-new CT rules
+        long ctTrackUnnew = computeCtStateFlag(true, false, false);
+        long ctMaskTrackUnnew = computeCtMaskFlag(true, true, false);
+
+        setTrackEstablish(deviceId, ctTrackUnnew, ctMaskTrackUnnew,
+                NAT_TABLE, ROUTING_TABLE, PRIORITY_CT_RULE, install);
+
+        // +trk+new CT rules
+        long ctTrackNew = computeCtStateFlag(true, true, false);
+        long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
+
+        k8sServiceService.services().stream()
+                .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
+                .forEach(s -> setGroupFlowRules(deviceId, ctTrackNew,
+                        ctMaskTrackNew, s, install));
+    }
+
+    private void setGroupFlowRules(DeviceId deviceId, long ctState, long ctMask,
+                                   Service service, boolean install) {
+        int groupId = (int) groupIdCounter.incrementAndGet();
+
+        List<GroupBucket> buckets = Lists.newArrayList();
+
+        String serviceName = service.getMetadata().getName();
+        String serviceIp = service.getSpec().getClusterIP();
+
+        // TODO: multi-ports case should be addressed
+        Integer servicePort = service.getSpec().getPorts().get(0).getPort();
+
+        List<Endpoints> endpointses = k8sEndpointsService.endpointses()
+                .stream()
+                .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
+                .collect(Collectors.toList());
+
+        Map<String, String> nodeIpGatewayIpMap =
+                nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
+
+        for (Endpoints endpoints : endpointses) {
+            for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
+                List<EndpointPort> ports = endpointSubset.getPorts()
+                        .stream()
+                        .filter(p -> p.getProtocol().equals(TCP))
+                        .collect(Collectors.toList());
+
+                for (EndpointAddress address : endpointSubset.getAddresses()) {
+                    String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
+                            nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
+
+                    NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
+                            niciraConnTrackTreatmentBuilder(driverService, deviceId)
+                                    .commit(true)
+                                    .natAction(true)
+                                    .natIp(IpAddress.valueOf(podIp))
+                                    .natFlag(CT_NAT_DST_FLAG);
+
+                    ports.forEach(p -> {
+                        ExtensionTreatment ctNatTreatment = connTreatmentBuilder
+                                .natPort(TpPort.tpPort(p.getPort())).build();
+                        ExtensionTreatment resubmitTreatment = buildResubmitExtension(
+                                deviceService.getDevice(deviceId), ROUTING_TABLE);
+                        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                                .extension(ctNatTreatment, deviceId)
+                                .extension(resubmitTreatment, deviceId)
+                                .build();
+                        buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
+                    });
+                }
+            }
+        }
+
+        if (!buckets.isEmpty()) {
+            k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
+
+            setTrackNew(deviceId, ctState, ctMask, IpAddress.valueOf(serviceIp),
+                    TpPort.tpPort(servicePort), NAT_TABLE, groupId,
+                    PRIORITY_CT_RULE, install);
+        }
+    }
+
+    private void setUntrack(DeviceId deviceId, long ctState, long ctMask,
+                            String srcCidr, String dstCidr, int installTable,
+                            int transitTable, int priority, boolean install) {
+        ExtensionSelector esCtSate = RulePopulatorUtil
+                .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPSrc(IpPrefix.valueOf(srcCidr))
+                .matchIPDst(IpPrefix.valueOf(dstCidr))
+                .extension(esCtSate, deviceId)
+                .build();
+
+        NiciraConnTrackTreatmentBuilder connTreatmentBuilder =
+                niciraConnTrackTreatmentBuilder(driverService, deviceId)
+                        .natAction(false)
+                        .commit(false)
+                        .table((short) transitTable);
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .extension(connTreatmentBuilder.build(), deviceId)
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                deviceId,
+                selector,
+                treatment,
+                priority,
+                installTable,
+                install);
+    }
+
+    private void setTrackNew(DeviceId deviceId, long ctState, long ctMask,
+                             IpAddress dstIp, TpPort dstPort, int installTable,
+                             int groupId, int priority, boolean install) {
+        ExtensionSelector esCtSate = RulePopulatorUtil
+                .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(IpPrefix.valueOf(dstIp, HOST_CIDR_NUM))
+                .matchIPProtocol(IPv4.PROTOCOL_TCP)
+                .matchTcpDst(dstPort)
+                .extension(esCtSate, deviceId)
+                .build();
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .group(GroupId.valueOf(groupId))
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                deviceId,
+                selector,
+                treatment,
+                priority,
+                installTable,
+                install);
+    }
+
+    private void setTrackEstablish(DeviceId deviceId, long ctState, long ctMask,
+                                   int installTable, int transitTable,
+                                   int priority, boolean install) {
+        ExtensionSelector esCtSate = RulePopulatorUtil
+                .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .extension(esCtSate, deviceId)
+                .build();
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .transition(transitTable)
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                deviceId,
+                selector,
+                treatment,
+                priority,
+                installTable,
+                install);
+    }
+
+    private class InternalK8sServiceListener implements K8sServiceListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sServiceEvent event) {
+            switch (event.type()) {
+                case K8S_SERVICE_CREATED:
+                    eventExecutor.execute(() -> processServiceCreation(event.subject()));
+                    break;
+                case K8S_SERVICE_REMOVED:
+                    eventExecutor.execute(() -> processServiceRemoval(event.subject()));
+                    break;
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+
+        private void processServiceCreation(Service service) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            long ctTrackNew = computeCtStateFlag(true, true, false);
+            long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
+
+            k8sNodeService.completeNodes().forEach(n ->
+                    setGroupFlowRules(n.intgBridge(), ctTrackNew,
+                            ctMaskTrackNew, service, true));
+        }
+
+        private void processServiceRemoval(Service service) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            long ctTrackNew = computeCtStateFlag(true, true, false);
+            long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
+
+            k8sNodeService.completeNodes().forEach(n ->
+                    setGroupFlowRules(n.intgBridge(), ctTrackNew,
+                            ctMaskTrackNew, service, false));        }
+    }
+
+    private class InternalNodeEventListener implements K8sNodeListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sNodeEvent event) {
+            K8sNode k8sNode = event.subject();
+            switch (event.type()) {
+                case K8S_NODE_COMPLETE:
+                    eventExecutor.execute(() -> processNodeCompletion(k8sNode));
+                    break;
+                case K8S_NODE_INCOMPLETE:
+                    eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processNodeCompletion(K8sNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setServiceNatRules(node.intgBridge(), true);
+        }
+
+        private void processNodeIncompletion(K8sNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setServiceNatRules(node.intgBridge(), false);
+        }
+    }
+}
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
index 53af926..88cb589 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingHandler.java
@@ -17,6 +17,7 @@
 
 import com.google.common.base.Strings;
 import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpPrefix;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cfg.ConfigProperty;
 import org.onosproject.cluster.LeadershipService;
@@ -116,6 +117,8 @@
         appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
         k8sNetworkService.addListener(k8sNetworkListener);
 
+        setGatewayRulesForTunnel(true);
+
         log.info("Started");
     }
 
@@ -124,6 +127,8 @@
         k8sNetworkService.removeListener(k8sNetworkListener);
         eventExecutor.shutdown();
 
+        setGatewayRulesForTunnel(false);
+
         log.info("Stopped");
     }
 
@@ -240,6 +245,35 @@
                 install);
     }
 
+    private void setGatewayRulesForTunnel(boolean install) {
+        k8sNetworkService.networks().forEach(n -> {
+            // switching rules for the instPorts in the same node
+            TrafficSelector selector = DefaultTrafficSelector.builder()
+                    // TODO: need to handle IPv6 in near future
+                    .matchEthType(Ethernet.TYPE_IPV4)
+                    .matchIPDst(IpPrefix.valueOf(n.gatewayIp(), 32))
+                    .matchTunnelId(Long.valueOf(n.segmentId()))
+                    .build();
+
+            TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                    .setOutput(PortNumber.LOCAL)
+                    .build();
+
+            // FIXME: need to find a way to install the gateway rules into
+            // right OVS
+            k8sNodeService.completeNodes().forEach(node -> {
+                k8sFlowRuleService.setRule(
+                        appId,
+                        node.intgBridge(),
+                        selector,
+                        treatment,
+                        PRIORITY_SWITCHING_RULE,
+                        FORWARDING_TABLE,
+                        install);
+            });
+        });
+    }
+
     /**
      * Obtains the VNI from the given kubernetes port.
      *
@@ -261,7 +295,8 @@
         K8sNetwork k8sNet = k8sNetworkService.network(port.networkId());
 
         if (k8sNet == null) {
-            log.warn("Network {} is not found from port {}.", port.networkId(), port.portId());
+            log.warn("Network {} is not found from port {}.",
+                    port.networkId(), port.portId());
             return;
         }
 
@@ -272,7 +307,8 @@
                 setNetworkRulesForTunnel(port, install);
                 break;
             default:
-                log.warn("The given network type {} is not supported.", k8sNet.type().name());
+                log.warn("The given network type {} is not supported.",
+                        k8sNet.type().name());
                 break;
         }
     }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
index 2e838e8..2647d70 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
@@ -19,6 +19,7 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -31,13 +32,17 @@
 import org.onosproject.k8snode.api.K8sApiConfig;
 import org.onosproject.k8snode.api.K8sApiConfigService;
 import org.onosproject.k8snode.api.K8sNode;
+import org.onosproject.k8snode.api.K8sNodeService;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.GroupKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -168,6 +173,16 @@
     }
 
     /**
+     * Obtains flow group key from the given id.
+     *
+     * @param groupId flow group identifier
+     * @return flow group key
+     */
+    public static GroupKey getGroupKey(int groupId) {
+        return new DefaultGroupKey((Integer.toString(groupId)).getBytes());
+    }
+
+    /**
      * Generates endpoint URL by referring to scheme, ipAddress and port.
      *
      * @param scheme        scheme
@@ -248,4 +263,25 @@
 
         return client;
     }
+
+    /**
+     * Obtains the kubernetes node IP and kubernetes network gateway IP map.
+     *
+     * @param nodeService       kubernetes node service
+     * @param networkService    kubernetes network service
+     * @return kubernetes node IP and kubernetes network gateway IP map
+     */
+    public static Map<String, String> nodeIpGatewayIpMap(K8sNodeService nodeService,
+                                                         K8sNetworkService networkService) {
+        Map<String, String> ipMap = Maps.newConcurrentMap();
+
+        nodeService.completeNodes().forEach(n -> {
+            K8sNetwork network = networkService.network(n.hostname());
+            if (network != null) {
+                ipMap.put(n.dataIp().toString(), network.gatewayIp().toString());
+            }
+        });
+
+        return ipMap;
+    }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
index d5366ae..8192015 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
@@ -16,14 +16,30 @@
 package org.onosproject.k8snetworking.util;
 
 import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.ExtensionSelectorResolver;
 import org.onosproject.net.behaviour.ExtensionTreatmentResolver;
 import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.ExtensionSelector;
+import org.onosproject.net.flow.criteria.ExtensionSelectorType;
 import org.onosproject.net.flow.instructions.ExtensionPropertyException;
 import org.onosproject.net.flow.instructions.ExtensionTreatment;
+import org.onosproject.net.flow.instructions.ExtensionTreatmentType;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupDescription.Type;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_RESUBMIT_TABLE;
 import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_SET_TUNNEL_DST;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -36,10 +52,145 @@
 
     private static final String TUNNEL_DST = "tunnelDst";
 
+    private static final String CT_FLAGS = "flags";
+    private static final String CT_ZONE = "zone";
+    private static final String CT_TABLE = "recircTable";
+    private static final String CT_STATE = "ctState";
+    private static final String CT_STATE_MASK = "ctStateMask";
+    private static final String CT_PRESENT_FLAGS = "presentFlags";
+    private static final String CT_IPADDRESS_MIN = "ipAddressMin";
+    private static final String CT_IPADDRESS_MAX = "ipAddressMax";
+    private static final String CT_PORT_MIN = "portMin";
+    private static final String CT_PORT_MAX = "portMax";
+    private static final String CT_NESTED_ACTIONS = "nestedActions";
+
+    public static final int CT_NAT_SRC_FLAG = 0;
+    public static final int CT_NAT_DST_FLAG = 1;
+    public static final int CT_NAT_PERSISTENT_FLAG = 2;
+    public static final int CT_NAT_PROTO_HASH_FLAG = 3;
+    public static final int CT_NAT_PROTO_RANDOM_FLAG = 4;
+
+    private static final int ADDRESS_V4_MIN_FLAG = 0;
+    private static final int ADDRESS_V4_MAX_FLAG = 1;
+    private static final int ADDRESS_V6_MIN_FLAG = 2;
+    private static final int ADDRESS_V6_MAX_FLAG = 3;
+    private static final int PORT_MIN_FLAG = 4;
+    private static final int PORT_MAX_FLAG = 5;
+
+    public static final long CT_STATE_NONE = 0;
+    public static final long CT_STATE_NEW = 0x01;
+    public static final long CT_STATE_EST = 0x02;
+    public static final long CT_STATE_NOT_TRK = 0x20;
+    public static final long CT_STATE_TRK = 0x20;
+
+    private static final String TABLE_EXTENSION = "table";
+
+    // not intended for direct invocation from external
     private RulePopulatorUtil() {
     }
 
     /**
+     * Returns a builder for OVS Connection Tracking feature actions.
+     *
+     * @param ds DriverService
+     * @param id DeviceId
+     * @return a builder for OVS Connection Tracking feature actions
+     */
+    public static NiciraConnTrackTreatmentBuilder
+                    niciraConnTrackTreatmentBuilder(DriverService ds, DeviceId id) {
+        return new NiciraConnTrackTreatmentBuilder(ds, id);
+    }
+
+    /**
+     * Builds OVS ConnTrack matches.
+     *
+     * @param driverService driver service
+     * @param deviceId device ID
+     * @param ctState connection tracking sate masking value
+     * @param ctSateMask connection tracking sate masking value
+     * @return OVS ConnTrack extension match
+     */
+    public static ExtensionSelector buildCtExtensionSelector(DriverService driverService,
+                                                             DeviceId deviceId,
+                                                             long ctState,
+                                                             long ctSateMask) {
+        DriverHandler handler = driverService.createHandler(deviceId);
+        ExtensionSelectorResolver esr = handler.behaviour(ExtensionSelectorResolver.class);
+
+        ExtensionSelector extensionSelector = esr.getExtensionSelector(
+                ExtensionSelectorType.ExtensionSelectorTypes.NICIRA_MATCH_CONNTRACK_STATE.type());
+        try {
+            extensionSelector.setPropertyValue(CT_STATE, ctState);
+            extensionSelector.setPropertyValue(CT_STATE_MASK, ctSateMask);
+        } catch (Exception e) {
+            log.error("Failed to set nicira match CT state because of {}", e);
+            return null;
+        }
+
+        return extensionSelector;
+    }
+
+    /**
+     * Computes ConnTack State flag values.
+     *
+     * @param isTracking true for +trk, false for -trk
+     * @param isNew true for +new, false for -new
+     * @param isEstablished true for +est, false for -est
+     * @return ConnTrack State flags
+     */
+    public static long computeCtStateFlag(boolean isTracking,
+                                          boolean isNew,
+                                          boolean isEstablished) {
+        long ctStateFlag = 0x00;
+
+        if (isTracking) {
+            ctStateFlag = ctStateFlag | CT_STATE_TRK;
+        }
+
+        if (isNew) {
+            ctStateFlag = ctStateFlag | CT_STATE_TRK;
+            ctStateFlag = ctStateFlag | CT_STATE_NEW;
+        }
+
+        if (isEstablished) {
+            ctStateFlag = ctStateFlag | CT_STATE_TRK;
+            ctStateFlag = ctStateFlag | CT_STATE_EST;
+        }
+
+        return ctStateFlag;
+    }
+
+    /**
+     * Computes ConnTrack State mask values.
+     *
+     * @param isTracking true for setting +trk/-trk value, false for otherwise
+     * @param isNew true for setting +new/-new value, false for otherwise
+     * @param isEstablished true for setting +est/-est value, false for otherwise
+     * @return ConnTrack State Mask value
+     */
+    public static long computeCtMaskFlag(boolean isTracking,
+                                         boolean isNew,
+                                         boolean isEstablished) {
+        long ctMaskFlag = 0x00;
+
+        if (isTracking) {
+            ctMaskFlag = ctMaskFlag | CT_STATE_TRK;
+        }
+
+        if (isNew) {
+            ctMaskFlag = ctMaskFlag | CT_STATE_TRK;
+            ctMaskFlag = ctMaskFlag | CT_STATE_NEW;
+        }
+
+        if (isEstablished) {
+            ctMaskFlag = ctMaskFlag | CT_STATE_TRK;
+            ctMaskFlag = ctMaskFlag | CT_STATE_EST;
+        }
+
+        return ctMaskFlag;
+    }
+
+    /**
      * Returns tunnel destination extension treatment object.
      *
      * @param deviceService driver service
@@ -72,4 +223,231 @@
             return null;
         }
     }
+
+    /**
+     * Returns the group bucket with given traffic treatment and group type.
+     *
+     * @param treatment     traffic treatment
+     * @param type          group type
+     * @param weight        weight (only for select type)
+     * @return group bucket
+     */
+    public static GroupBucket buildGroupBucket(TrafficTreatment treatment,
+                                               Type type, short weight) {
+        switch (type) {
+            case ALL:
+                return DefaultGroupBucket.createAllGroupBucket(treatment);
+            case SELECT:
+                if (weight == -1) {
+                    return DefaultGroupBucket.createSelectGroupBucket(treatment);
+                } else {
+                    return DefaultGroupBucket.createSelectGroupBucket(treatment, weight);
+                }
+            case INDIRECT:
+                return DefaultGroupBucket.createIndirectGroupBucket(treatment);
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Returns the nicira resubmit extension treatment with given table ID.
+     *
+     * @param device        device identifier
+     * @param tableId       table identifier
+     * @return resubmit extension treatment
+     */
+    public static ExtensionTreatment buildResubmitExtension(Device device, int tableId) {
+        if (device == null || !device.is(ExtensionTreatmentResolver.class)) {
+            log.warn("Nicira extension treatment is not supported");
+            return null;
+        }
+
+        ExtensionTreatmentResolver resolver = device.as(ExtensionTreatmentResolver.class);
+        ExtensionTreatment treatment =
+                resolver.getExtensionInstruction(NICIRA_RESUBMIT_TABLE.type());
+
+        try {
+            treatment.setPropertyValue(TABLE_EXTENSION, ((short) tableId));
+            return treatment;
+        } catch (ExtensionPropertyException e) {
+            log.error("Failed to set nicira resubmit extension treatment for {}",
+                    device.id());
+            return null;
+        }
+    }
+
+    /**
+     * Builder class for OVS Connection Tracking feature actions.
+     */
+    public static final class NiciraConnTrackTreatmentBuilder {
+
+        private DriverService driverService;
+        private DeviceId deviceId;
+        private IpAddress natAddress = null;
+        private TpPort natPort = null;
+        private int zone;
+        private boolean commit;
+        private short table = -1;
+        private boolean natAction;
+        private int natFlag;
+
+        // private constructor
+        private NiciraConnTrackTreatmentBuilder(DriverService driverService,
+                                                DeviceId deviceId) {
+            this.driverService = driverService;
+            this.deviceId = deviceId;
+        }
+
+        /**
+         * Sets commit flag.
+         *
+         * @param c true if commit, false if not.
+         * @return NiciraConnTrackTreatmentBuilder object
+         */
+        public NiciraConnTrackTreatmentBuilder commit(boolean c) {
+            this.commit = c;
+            return this;
+        }
+
+        /**
+         * Sets zone number.
+         *
+         * @param z zone number
+         * @return NiciraConnTrackTreatmentBuilder object
+         */
+        public NiciraConnTrackTreatmentBuilder zone(int z) {
+            this.zone = z;
+            return this;
+        }
+
+        /**
+         * Sets recirculation table number.
+         *
+         * @param t table number to restart
+         * @return NiciraConnTrackTreatmentBuilder object
+         */
+        public NiciraConnTrackTreatmentBuilder table(short t) {
+            this.table = t;
+            return this;
+        }
+
+        /**
+         * Sets IP address for NAT.
+         *
+         * @param ip NAT IP address
+         * @return NiciraConnTrackTreatmentBuilder object
+         */
+        public NiciraConnTrackTreatmentBuilder natIp(IpAddress ip) {
+            this.natAddress = ip;
+            return this;
+        }
+
+        /**
+         * Sets port for NAT.
+         *
+         * @param port port number
+         * @return NiciraConnTrackTreatmentBuilder object
+         */
+        public NiciraConnTrackTreatmentBuilder natPort(TpPort port) {
+            this.natPort = port;
+            return this;
+        }
+
+        /**
+         * Sets NAT flags.
+         * SRC NAT: 1 << 0
+         * DST NAT: 1 << 1
+         * PERSISTENT NAT: 1 << 2
+         * PROTO_HASH NAT: 1 << 3
+         * PROTO_RANDOM NAT : 1 << 4
+         *
+         * @param flag flag value
+         * @return NiciraConnTrackTreatmentBuilder object
+         */
+        public NiciraConnTrackTreatmentBuilder natFlag(int flag) {
+            this.natFlag = 1 << flag;
+            return this;
+        }
+
+        /**
+         * Sets the flag for NAT action.
+         *
+         * @param nat nat action is included if true, no nat action otherwise
+         * @return NiciraConnTrackTreatmentBuilder object
+         */
+        public NiciraConnTrackTreatmentBuilder natAction(boolean nat) {
+            this.natAction = nat;
+            return this;
+        }
+
+        /**
+         * Builds extension treatment for OVS ConnTack and NAT feature.
+         *
+         * @return ExtensionTreatment object
+         */
+        public ExtensionTreatment build() {
+            DriverHandler handler = driverService.createHandler(deviceId);
+            ExtensionTreatmentResolver etr =
+                    handler.behaviour(ExtensionTreatmentResolver.class);
+
+            ExtensionTreatment natTreatment = etr.getExtensionInstruction(
+                    ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_NAT.type());
+            try {
+
+                natTreatment.setPropertyValue(CT_FLAGS, this.natFlag);
+
+                natTreatment.setPropertyValue(CT_PRESENT_FLAGS,
+                        buildPresentFlag(natPort != null, natAddress != null));
+
+                if (natAddress != null) {
+                    natTreatment.setPropertyValue(CT_IPADDRESS_MIN, natAddress);
+                    natTreatment.setPropertyValue(CT_IPADDRESS_MAX, natAddress);
+                }
+
+                if (natPort != null) {
+                    natTreatment.setPropertyValue(CT_PORT_MIN, natPort.toInt());
+                    natTreatment.setPropertyValue(CT_PORT_MAX, natPort.toInt());
+                }
+
+            } catch (Exception e) {
+                log.error("Failed to set NAT due to error : {}", e);
+                return null;
+            }
+
+            ExtensionTreatment ctTreatment = etr.getExtensionInstruction(
+                    ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_CT.type());
+            try {
+                List<ExtensionTreatment> nat = new ArrayList<>();
+                if (natAction) {
+                    nat.add(natTreatment);
+                }
+                ctTreatment.setPropertyValue(CT_FLAGS, commit ? 1 : 0);
+                ctTreatment.setPropertyValue(CT_ZONE, zone);
+                ctTreatment.setPropertyValue(CT_TABLE, table > -1 ? table : 0xff);
+                ctTreatment.setPropertyValue(CT_NESTED_ACTIONS, nat);
+            } catch (Exception e) {
+                log.error("Failed to set CT due to error : {}", e);
+                return null;
+            }
+
+            return ctTreatment;
+        }
+
+        private int buildPresentFlag(boolean isPortPresent, boolean isAddressPresent) {
+
+            int presentFlag = 0;
+
+            if (isPortPresent) {
+                presentFlag = 1 << PORT_MIN_FLAG | 1 << PORT_MAX_FLAG;
+            }
+
+            if (isAddressPresent) {
+                // TODO: need to support IPv6 address
+                presentFlag =  presentFlag | 1 << ADDRESS_V4_MIN_FLAG | 1 << ADDRESS_V4_MAX_FLAG;
+            }
+
+            return presentFlag;
+        }
+    }
 }