Implement k8s service IP to pod IP translation using Nicira load ext

Change-Id: I147a9adb68b2bf597c1876a64bf77dedca9698b3
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
index 647c8e7..076d5d6 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/Constants.java
@@ -30,11 +30,20 @@
     public static final String ARP_BROADCAST_MODE = "broadcast";
     public static final String ARP_PROXY_MODE = "proxy";
 
+    public static final String NAT_STATEFUL = "stateful";
+    public static final String NAT_STATELESS = "stateless";
+
     public static final String DEFAULT_GATEWAY_MAC_STR = "fe:00:00:00:00:02";
 
     public static final MacAddress DEFAULT_GATEWAY_MAC =
                         MacAddress.valueOf(DEFAULT_GATEWAY_MAC_STR);
 
+    public static final String SHIFTED_IP_CIDR = "172.10.0.0/16";
+    public static final String SHIFTED_IP_PREFIX = "172.10";
+
+    // TODO: need to inject service IP CIDR through REST
+    public static final String SERVICE_IP_CIDR = "10.96.0.0/24";
+
     public static final String PORT_NAME_PREFIX_CONTAINER = "veth";
 
     public static final String ANNOTATION_NETWORK_ID = "networkId";
@@ -49,6 +58,7 @@
     public static final int PRIORITY_CT_HOOK_RULE = 30500;
     public static final int PRIORITY_CT_RULE = 32000;
     public static final int PRIORITY_CT_DROP_RULE = 32500;
+    public static final int PRIORITY_NAT_RULE = 30000;
     public static final int PRIORITY_SWITCHING_RULE = 30000;
     public static final int PRIORITY_ARP_GATEWAY_RULE = 41000;
     public static final int PRIORITY_ARP_SUBNET_RULE = 40000;
@@ -71,6 +81,8 @@
     public static final int ACL_RECIRC_TABLE = 43;
     public static final int JUMP_TABLE = 50;
     public static final int NAT_TABLE = 51;
+    public static final int SERVICE_TABLE = 52;
+    public static final int POD_TABLE = 53;
     public static final int ROUTING_TABLE = 60;
     public static final int STAT_OUTBOUND_TABLE = 70;
     public static final int VTAP_OUTBOUND_TABLE = 71;
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
index e62b6a1..678a226 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sFlowRuleManager.java
@@ -62,6 +62,7 @@
 import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
 import static org.onosproject.k8snetworking.api.Constants.PRIORITY_SNAT_RULE;
 import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR;
 import static org.onosproject.k8snetworking.api.Constants.STAT_INBOUND_TABLE;
 import static org.onosproject.k8snetworking.api.Constants.STAT_OUTBOUND_TABLE;
 import static org.onosproject.k8snetworking.api.Constants.VTAG_TABLE;
@@ -279,10 +280,10 @@
         applyRule(flowRule, true);
     }
 
-    private void setupHostRoutingRule(K8sNetwork k8sNetwork) {
+    private void setAnyRoutingRule(IpPrefix srcIpPrefix, K8sNetwork k8sNetwork) {
         TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
                 .matchEthType(Ethernet.TYPE_IPV4)
-                .matchIPSrc(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32))
+                .matchIPSrc(srcIpPrefix)
                 .matchIPDst(IpPrefix.valueOf(k8sNetwork.cidr()));
 
         TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
@@ -303,6 +304,14 @@
         }
     }
 
+    private void setupServiceRoutingRule(K8sNetwork k8sNetwork) {
+        setAnyRoutingRule(IpPrefix.valueOf(SERVICE_IP_CIDR), k8sNetwork);
+    }
+
+    private void setupHostRoutingRule(K8sNetwork k8sNetwork) {
+        setAnyRoutingRule(IpPrefix.valueOf(k8sNetwork.gatewayIp(), 32), k8sNetwork);
+    }
+
     private void setupGatewayRoutingRule(K8sNetwork k8sNetwork) {
         TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
                 .matchEthType(Ethernet.TYPE_IPV4)
@@ -353,6 +362,7 @@
             initializePipeline(node);
             k8sNetworkService.networks().forEach(n -> {
                 setupHostRoutingRule(n);
+                setupServiceRoutingRule(n);
                 setupGatewayRoutingRule(n);
             });
         }
@@ -385,6 +395,7 @@
 
             setupHostRoutingRule(network);
             setupGatewayRoutingRule(network);
+            setupServiceRoutingRule(network);
         }
     }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index dfdba64..555a144 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -16,6 +16,8 @@
 package org.onosproject.k8snetworking.impl;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import io.fabric8.kubernetes.api.model.EndpointAddress;
 import io.fabric8.kubernetes.api.model.EndpointPort;
 import io.fabric8.kubernetes.api.model.EndpointSubset;
@@ -26,6 +28,8 @@
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.TpPort;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
@@ -36,6 +40,7 @@
 import org.onosproject.k8snetworking.api.K8sFlowRuleService;
 import org.onosproject.k8snetworking.api.K8sGroupRuleService;
 import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPodService;
 import org.onosproject.k8snetworking.api.K8sServiceEvent;
 import org.onosproject.k8snetworking.api.K8sServiceListener;
 import org.onosproject.k8snetworking.api.K8sServiceService;
@@ -57,16 +62,20 @@
 import org.onosproject.net.group.GroupBucket;
 import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
@@ -74,12 +83,23 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.k8snetworking.api.Constants.JUMP_TABLE;
 import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
+import static org.onosproject.k8snetworking.api.Constants.NAT_STATEFUL;
+import static org.onosproject.k8snetworking.api.Constants.NAT_STATELESS;
 import static org.onosproject.k8snetworking.api.Constants.NAT_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.POD_TABLE;
 import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CT_RULE;
+import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NAT_RULE;
 import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.SERVICE_IP_CIDR;
+import static org.onosproject.k8snetworking.api.Constants.SERVICE_TABLE;
+import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_CIDR;
+import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE;
+import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.SERVICE_IP_NAT_MODE_DEFAULT;
 import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.nodeIpGatewayIpMap;
 import static org.onosproject.k8snetworking.util.RulePopulatorUtil.CT_NAT_DST_FLAG;
 import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildGroupBucket;
+import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
 import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildResubmitExtension;
 import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtMaskFlag;
 import static org.onosproject.k8snetworking.util.RulePopulatorUtil.computeCtStateFlag;
@@ -90,14 +110,16 @@
 /**
  * Handles the service IP to pod IP related translation traffic.
  */
-@Component(immediate = true)
+@Component(
+    immediate = true,
+    property = {
+            SERVICE_IP_NAT_MODE + "=" + SERVICE_IP_NAT_MODE_DEFAULT
+    }
+)
 public class K8sServiceHandler {
 
     private final Logger log = getLogger(getClass());
 
-    // TODO: need to inject service IP CIDR through REST
-    private static final String SERVICE_IP_CIDR = "10.96.0.0/24";
-
     private static final int HOST_CIDR_NUM = 32;
 
     private static final String NONE = "None";
@@ -122,6 +144,9 @@
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService configService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -142,6 +167,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected K8sServiceService k8sServiceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sPodService k8sPodService;
+
+    /** Service IP address translation mode. */
+    private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
+
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
     private final InternalNodeEventListener internalNodeEventListener =
@@ -157,6 +188,7 @@
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
+        configService.registerProperties(getClass());
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
         k8sNodeService.addListener(internalNodeEventListener);
@@ -172,12 +204,20 @@
         leadershipService.withdraw(appId.name());
         k8sNodeService.removeListener(internalNodeEventListener);
         k8sServiceService.removeListener(internalK8sServiceListener);
+        configService.unregisterProperties(getClass(), false);
         eventExecutor.shutdown();
 
         log.info("Stopped");
     }
 
-    private void setServiceNatRules(DeviceId deviceId, boolean install) {
+    @Modified
+    void modified(ComponentContext context) {
+        readComponentConfiguration(context);
+
+        log.info("Modified");
+    }
+
+    private void setStatefulServiceNatRules(DeviceId deviceId, boolean install) {
         // -trk CT rules
         long ctUntrack = computeCtStateFlag(false, false, false);
         long ctMaskUntrack = computeCtMaskFlag(true, false, false);
@@ -203,12 +243,180 @@
 
         k8sServiceService.services().stream()
                 .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
-                .forEach(s -> setGroupFlowRules(deviceId, ctTrackNew,
-                        ctMaskTrackNew, s, install));
+                .forEach(s -> setStatefulGroupFlowRules(deviceId, ctTrackNew,
+                ctMaskTrackNew, s, install));
     }
 
-    private void setGroupFlowRules(DeviceId deviceId, long ctState, long ctMask,
-                                   Service service, boolean install) {
+    private void setStatelessServiceNatRules(DeviceId deviceId, boolean install) {
+
+        k8sNetworkService.networks().forEach(n -> {
+            setSrcDstCidrRules(deviceId, n.cidr(), SERVICE_IP_CIDR, JUMP_TABLE,
+                    SERVICE_TABLE, PRIORITY_CT_RULE, install);
+            setSrcDstCidrRules(deviceId, n.cidr(), SHIFTED_IP_CIDR, JUMP_TABLE,
+                    POD_TABLE, PRIORITY_CT_RULE, install);
+            setSrcDstCidrRules(deviceId, n.cidr(), n.cidr(), JUMP_TABLE,
+                    ROUTING_TABLE, PRIORITY_CT_RULE, install);
+        });
+
+        // setup load balancing rules using group table
+        k8sServiceService.services().stream()
+                .filter(s -> CLUSTER_IP.equals(s.getSpec().getType()))
+                .forEach(s -> setStatelessGroupFlowRules(deviceId, s, install));
+    }
+
+    private void setSrcDstCidrRules(DeviceId deviceId, String srcCidr,
+                                    String dstCidr, int installTable,
+                                    int transitTable, int priority, boolean install) {
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPSrc(IpPrefix.valueOf(srcCidr))
+                .matchIPDst(IpPrefix.valueOf(dstCidr))
+                .build();
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .transition(transitTable)
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                deviceId,
+                selector,
+                treatment,
+                priority,
+                installTable,
+                install);
+    }
+
+    private void setStatelessGroupFlowRules(DeviceId deviceId,
+                                            Service service, boolean install) {
+        int groupId = (int) groupIdCounter.incrementAndGet();
+
+        List<GroupBucket> buckets = Lists.newArrayList();
+
+        String serviceName = service.getMetadata().getName();
+
+        List<Endpoints> endpointses = k8sEndpointsService.endpointses()
+                .stream()
+                .filter(ep -> serviceName.equals(ep.getMetadata().getName()))
+                .collect(Collectors.toList());
+
+        Map<String, String> nodeIpGatewayIpMap =
+                nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
+
+        Map<String, Set<Integer>> podIpPorts = Maps.newConcurrentMap();
+
+        for (Endpoints endpoints : endpointses) {
+            for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
+                List<EndpointPort> ports = endpointSubset.getPorts()
+                        .stream()
+                        .filter(p -> p.getProtocol().equals(TCP))
+                        .collect(Collectors.toList());
+
+                for (EndpointAddress address : endpointSubset.getAddresses()) {
+                    String podIp = nodeIpGatewayIpMap.containsKey(address.getIp()) ?
+                            nodeIpGatewayIpMap.get(address.getIp()) : address.getIp();
+
+                    ports.forEach(p -> {
+                        ExtensionTreatment resubmitTreatment = buildResubmitExtension(
+                                deviceService.getDevice(deviceId), ROUTING_TABLE);
+                        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                                .setIpDst(IpAddress.valueOf(podIp))
+                                .setTcpDst(TpPort.tpPort(p.getPort()))
+                                .extension(resubmitTreatment, deviceId)
+                                .build();
+                        buckets.add(buildGroupBucket(treatment, SELECT, (short) -1));
+
+                        Set<Integer> existPorts = podIpPorts.get(podIp);
+                        if (existPorts == null || existPorts.isEmpty()) {
+                            existPorts = Sets.newConcurrentHashSet();
+                        }
+                        existPorts.add(p.getPort());
+                        podIpPorts.put(podIp, existPorts);
+                    });
+                }
+            }
+        }
+
+        if (!buckets.isEmpty()) {
+            k8sGroupRuleService.setRule(appId, deviceId, groupId, SELECT, buckets, install);
+            setShiftDomainRules(deviceId, SERVICE_TABLE, groupId,
+                    PRIORITY_NAT_RULE, service, install);
+
+            podIpPorts.forEach((k, v) ->
+                v.forEach(p -> setUnshiftDomainRules(deviceId, POD_TABLE,
+                        PRIORITY_NAT_RULE, service, k, p, install)));
+        }
+    }
+
+    private void setShiftDomainRules(DeviceId deviceId, int installTable,
+                                     int groupId, int priority,
+                                     Service service, boolean install) {
+        String serviceIp = service.getSpec().getClusterIP();
+        // TODO: multi-ports case should be addressed
+        Integer servicePort = service.getSpec().getPorts().get(0).getPort();
+
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPProtocol(IPv4.PROTOCOL_TCP)
+                .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(serviceIp), 32))
+                .matchTcpDst(TpPort.tpPort(servicePort))
+                .build();
+
+        ExtensionTreatment loadTreatment = buildLoadExtension(
+                deviceService.getDevice(deviceId), "src", SHIFTED_IP_PREFIX);
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .extension(loadTreatment, deviceId)
+                .group(GroupId.valueOf(groupId))
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                deviceId,
+                selector,
+                treatment,
+                priority,
+                installTable,
+                install);
+    }
+
+    private void setUnshiftDomainRules(DeviceId deviceId, int installTable,
+                                       int priority, Service service, String podIp,
+                                       int podPort, boolean install) {
+        String serviceIp = service.getSpec().getClusterIP();
+        // TODO: multi-ports case should be addressed
+        Integer servicePort = service.getSpec().getPorts().get(0).getPort();
+
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPProtocol(IPv4.PROTOCOL_TCP)
+                .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(podIp), 32))
+                .matchTcpSrc(TpPort.tpPort(podPort))
+                .build();
+
+        ExtensionTreatment loadTreatment = buildLoadExtension(
+                deviceService.getDevice(deviceId), "dst", "10.10");
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .extension(loadTreatment, deviceId)
+                .setIpSrc(IpAddress.valueOf(serviceIp))
+                .setTcpSrc(TpPort.tpPort(servicePort))
+                .transition(ROUTING_TABLE)
+                .build();
+
+        k8sFlowRuleService.setRule(
+                appId,
+                deviceId,
+                selector,
+                treatment,
+                priority,
+                installTable,
+                install);
+    }
+
+    private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
+                                           long ctMask, Service service,
+                                           boolean install) {
         int groupId = (int) groupIdCounter.incrementAndGet();
 
         List<GroupBucket> buckets = Lists.newArrayList();
@@ -350,6 +558,19 @@
                 install);
     }
 
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        String updatedNatMode = Tools.get(properties, SERVICE_IP_NAT_MODE);
+        serviceIpNatMode = updatedNatMode != null ? updatedNatMode : SERVICE_IP_NAT_MODE_DEFAULT;
+        log.info("Configured. Service IP NAT mode is {}", serviceIpNatMode);
+    }
+
     private class InternalK8sServiceListener implements K8sServiceListener {
 
         private boolean isRelevantHelper() {
@@ -379,9 +600,14 @@
             long ctTrackNew = computeCtStateFlag(true, true, false);
             long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
 
-            k8sNodeService.completeNodes().forEach(n ->
-                    setGroupFlowRules(n.intgBridge(), ctTrackNew,
-                            ctMaskTrackNew, service, true));
+            k8sNodeService.completeNodes().forEach(n -> {
+                if (NAT_STATEFUL.equals(serviceIpNatMode)) {
+                    setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
+                            ctMaskTrackNew, service, true);
+                } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
+                    setStatelessGroupFlowRules(n.intgBridge(), service, true);
+                }
+            });
         }
 
         private void processServiceRemoval(Service service) {
@@ -392,9 +618,15 @@
             long ctTrackNew = computeCtStateFlag(true, true, false);
             long ctMaskTrackNew = computeCtMaskFlag(true, true, false);
 
-            k8sNodeService.completeNodes().forEach(n ->
-                    setGroupFlowRules(n.intgBridge(), ctTrackNew,
-                            ctMaskTrackNew, service, false));        }
+            k8sNodeService.completeNodes().forEach(n -> {
+                if (NAT_STATEFUL.equals(serviceIpNatMode)) {
+                    setStatefulGroupFlowRules(n.intgBridge(), ctTrackNew,
+                            ctMaskTrackNew, service, false);
+                } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
+                    setStatelessGroupFlowRules(n.intgBridge(), service, false);
+                }
+            });
+        }
     }
 
     private class InternalNodeEventListener implements K8sNodeListener {
@@ -423,7 +655,14 @@
                 return;
             }
 
-            setServiceNatRules(node.intgBridge(), true);
+            if (NAT_STATEFUL.equals(serviceIpNatMode)) {
+                setStatefulServiceNatRules(node.intgBridge(), true);
+            } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
+                setStatelessServiceNatRules(node.intgBridge(), true);
+            } else {
+                log.warn("Service IP NAT mode was not configured!");
+            }
+
         }
 
         private void processNodeIncompletion(K8sNode node) {
@@ -431,7 +670,14 @@
                 return;
             }
 
-            setServiceNatRules(node.intgBridge(), false);
+            if (NAT_STATEFUL.equals(serviceIpNatMode)) {
+                setStatefulServiceNatRules(node.intgBridge(), false);
+            } else if (NAT_STATELESS.equals(serviceIpNatMode)) {
+                setStatelessServiceNatRules(node.intgBridge(), false);
+            } else {
+                log.warn("Service IP NAT mode was not configured!");
+            }
+
         }
     }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
index 1bf2150..a518971 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sSwitchingArpHandler.java
@@ -75,6 +75,7 @@
 import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC;
 import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC_DEFAULT;
 import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
+import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.unshiftIpDomain;
 
 /**
  * Handles ARP packet from containers.
@@ -219,6 +220,20 @@
         }
 
         if (replyMac == null) {
+            Set<String> unshiftedIps = unshiftIpDomain(targetIp.toString(), k8sNetworkService);
+            for (String ip : unshiftedIps) {
+                replyMac = k8sNetworkService.ports().stream()
+                        .filter(p -> p.ipAddress().equals(IpAddress.valueOf(ip)))
+                        .map(K8sPort::macAddress)
+                        .findAny().orElse(null);
+
+                if (replyMac != null) {
+                    break;
+                }
+            }
+        }
+
+        if (replyMac == null) {
             log.debug("Failed to find MAC address for {}", targetIp);
             return;
         }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/OsgiPropertyConstants.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/OsgiPropertyConstants.java
index 734b8e5..04705f3 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/OsgiPropertyConstants.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/OsgiPropertyConstants.java
@@ -31,4 +31,7 @@
 
     static final String DHCP_SERVER_MAC = "dhcpServerMac";
     static final String DHCP_SERVER_MAC_DEFAULT = "fe:00:00:00:00:02";
+
+    static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
+    static final String SERVICE_IP_NAT_MODE_DEFAULT = "stateless";
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
index 2647d70..7325976 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
@@ -20,6 +20,7 @@
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -48,6 +49,7 @@
 import java.util.stream.Collectors;
 
 import static org.onosproject.k8snetworking.api.Constants.PORT_NAME_PREFIX_CONTAINER;
+import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
 
 /**
  * An utility that used in kubernetes networking app.
@@ -284,4 +286,24 @@
 
         return ipMap;
     }
+
+    /**
+     * Returns a set of unshifted IP addresses.
+     *
+     * @param ipAddress     shifted IP address
+     * @param service       kubernetes network service
+     * @return unshifted IP addresses
+     */
+    public static Set<String> unshiftIpDomain(String ipAddress, K8sNetworkService service) {
+
+        Set<String> unshiftedIps = Sets.newConcurrentHashSet();
+
+        service.networks().forEach(n -> {
+            String cidr = n.cidr();
+            String origIpPrefix = cidr.split("\\.")[0] + "." + cidr.split("\\.")[1];
+            unshiftedIps.add(StringUtils.replace(ipAddress, SHIFTED_IP_PREFIX, origIpPrefix));
+        });
+
+        return unshiftedIps;
+    }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
index 8192015..7605a5e 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/RulePopulatorUtil.java
@@ -39,6 +39,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_LOAD;
 import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_RESUBMIT_TABLE;
 import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_SET_TUNNEL_DST;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -77,14 +78,27 @@
     private static final int PORT_MIN_FLAG = 4;
     private static final int PORT_MAX_FLAG = 5;
 
-    public static final long CT_STATE_NONE = 0;
-    public static final long CT_STATE_NEW = 0x01;
-    public static final long CT_STATE_EST = 0x02;
-    public static final long CT_STATE_NOT_TRK = 0x20;
-    public static final long CT_STATE_TRK = 0x20;
+    private static final long CT_STATE_NONE = 0;
+    private static final long CT_STATE_NEW = 0x01;
+    private static final long CT_STATE_EST = 0x02;
+    private static final long CT_STATE_NOT_TRK = 0x20;
+    private static final long CT_STATE_TRK = 0x20;
 
     private static final String TABLE_EXTENSION = "table";
 
+    private static final String OFF_SET_N_BITS = "ofsNbits";
+    private static final String DESTINATION = "dst";
+    private static final String VALUE = "value";
+
+    private static final int SRC_IP = 0x00000e04;
+    private static final int DST_IP = 0x00001004;
+
+    private static final String SRC = "src";
+    private static final String DST = "dst";
+
+    private static final int OFF_SET_BIT = 16;
+    private static final int REMAINDER_BIT = 16;
+
     // not intended for direct invocation from external
     private RulePopulatorUtil() {
     }
@@ -253,7 +267,7 @@
     /**
      * Returns the nicira resubmit extension treatment with given table ID.
      *
-     * @param device        device identifier
+     * @param device        device instance
      * @param tableId       table identifier
      * @return resubmit extension treatment
      */
@@ -278,6 +292,66 @@
     }
 
     /**
+     * Returns the nicira load extension treatment.
+     *
+     * @param device        device instance
+     * @param ipType        IP type (src|dst)
+     * @param shift         shift (e.g., 10.10., 20.20.,)
+     * @return load extension treatment
+     */
+    public static ExtensionTreatment buildLoadExtension(Device device,
+                                                        String ipType,
+                                                        String shift) {
+        if (device == null || !device.is(ExtensionTreatmentResolver.class)) {
+            log.warn("Nicira extension treatment is not supported");
+            return null;
+        }
+
+        ExtensionTreatmentResolver resolver = device.as(ExtensionTreatmentResolver.class);
+        ExtensionTreatment treatment =
+                resolver.getExtensionInstruction(NICIRA_LOAD.type());
+
+        long dst = 0L;
+
+        if (SRC.equalsIgnoreCase(ipType)) {
+            dst = SRC_IP;
+        } else if (DST.equals(ipType)) {
+            dst = DST_IP;
+        }
+
+        long value = calculateUpperBit(shift);
+
+        // we only rewrite the upper 16 bits with value (A.B.X.Y -> C.D.X.Y)
+        int ofsNbits = OFF_SET_BIT << 6 | (REMAINDER_BIT - 1);
+
+        try {
+            treatment.setPropertyValue(OFF_SET_N_BITS, ofsNbits);
+            treatment.setPropertyValue(DESTINATION, dst);
+            treatment.setPropertyValue(VALUE, value);
+            return treatment;
+        } catch (ExtensionPropertyException e) {
+            log.error("Failed to set nicira load extension treatment for {}",
+                    device.id());
+            return null;
+        }
+    }
+
+    /**
+     * Calculate IP address upper string into integer.
+     *
+     * @param shift IP address upper two octets with dot
+     * @return calculated integer
+     */
+    private static int calculateUpperBit(String shift) {
+        String[] strArray = shift.split("\\.");
+
+        int firstOctet = Integer.valueOf(strArray[0]);
+        int secondOctet = Integer.valueOf(strArray[1]);
+
+        return firstOctet << 8 | secondOctet;
+    }
+
+    /**
      * Builder class for OVS Connection Tracking feature actions.
      */
     public static final class NiciraConnTrackTreatmentBuilder {