Support to inject external bridge into k8s node for external routing

1. Add group bucket related rules on receiving endpoint events
   rather than POD events.

Change-Id: I1152343cf8ff6bbccaed3dc34908a3affbc70980
diff --git a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
index d91b0be..adc040a 100644
--- a/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
+++ b/apps/k8s-networking/api/src/main/java/org/onosproject/k8snetworking/api/K8sGroupRuleService.java
@@ -51,6 +51,9 @@
 
     /**
      * Configures buckets to the existing group.
+     * With install flag true, this method will add buckets to existing buckets,
+     * while with install flag false, this method will remove buckets from
+     * existing buckets.
      *
      * @param appId         application ID
      * @param deviceId      device ID
@@ -60,4 +63,15 @@
      */
     void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
                     List<GroupBucket> buckets, boolean install);
+
+    /**
+     * Configures buckets.
+     *
+     * @param appId         application ID
+     * @param deviceId      device ID
+     * @param groupId       group ID
+     * @param buckets       a lit of group buckets
+     */
+    void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
+                    List<GroupBucket> buckets);
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
index 51dc9c8..cb2b864 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sGroupRuleManager.java
@@ -97,4 +97,11 @@
                     new GroupBuckets(buckets), getGroupKey(groupId), appId);
         }
     }
+
+    @Override
+    public void setBuckets(ApplicationId appId, DeviceId deviceId, int groupId,
+                           List<GroupBucket> buckets) {
+        groupService.setBucketsForGroup(deviceId, getGroupKey(groupId),
+                new GroupBuckets(buckets), getGroupKey(groupId), appId);
+    }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index 76785df..799ecb8 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -21,7 +21,6 @@
 import io.fabric8.kubernetes.api.model.EndpointPort;
 import io.fabric8.kubernetes.api.model.EndpointSubset;
 import io.fabric8.kubernetes.api.model.Endpoints;
-import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServicePort;
 import org.onlab.packet.Ethernet;
@@ -47,9 +46,6 @@
 import org.onosproject.k8snetworking.api.K8sNetworkEvent;
 import org.onosproject.k8snetworking.api.K8sNetworkListener;
 import org.onosproject.k8snetworking.api.K8sNetworkService;
-import org.onosproject.k8snetworking.api.K8sPodEvent;
-import org.onosproject.k8snetworking.api.K8sPodListener;
-import org.onosproject.k8snetworking.api.K8sPodService;
 import org.onosproject.k8snetworking.api.K8sServiceEvent;
 import org.onosproject.k8snetworking.api.K8sServiceListener;
 import org.onosproject.k8snetworking.api.K8sServiceService;
@@ -70,7 +66,6 @@
 import org.onosproject.net.flow.criteria.ExtensionSelector;
 import org.onosproject.net.flow.instructions.ExtensionTreatment;
 import org.onosproject.net.group.GroupBucket;
-import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.StorageService;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -142,17 +137,11 @@
 
     private static final int HOST_CIDR_NUM = 32;
 
-    private static final String NONE = "None";
     private static final String CLUSTER_IP = "ClusterIP";
     private static final String TCP = "TCP";
     private static final String UDP = "UDP";
     private static final String SERVICE_IP_NAT_MODE = "serviceIpNatMode";
 
-    private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
-
-    private static final String IP_ADDRESS = "ipAddress";
-    private static final String KUBERNETES = "kubernetes";
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
@@ -192,8 +181,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected K8sServiceService k8sServiceService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected K8sPodService k8sPodService;
 
     /** Service IP address translation mode. */
     private String serviceIpNatMode = SERVICE_IP_NAT_MODE_DEFAULT;
@@ -207,15 +194,11 @@
             new InternalNodeEventListener();
     private final InternalK8sServiceListener internalK8sServiceListener =
             new InternalK8sServiceListener();
-    private final InternalK8sPodListener internalK8sPodListener =
-            new InternalK8sPodListener();
     private final InternalK8sEndpointsListener internalK8sEndpointsListener =
             new InternalK8sEndpointsListener();
     private final InternalK8sNetworkListener internalK8sNetworkListener =
             new InternalK8sNetworkListener();
 
-    private AtomicCounter groupIdCounter;
-
     private ApplicationId appId;
     private NodeId localNodeId;
 
@@ -227,19 +210,15 @@
         leadershipService.runForLeadership(appId.name());
         k8sNodeService.addListener(internalNodeEventListener);
         k8sServiceService.addListener(internalK8sServiceListener);
-        k8sPodService.addListener(internalK8sPodListener);
         k8sEndpointsService.addListener(internalK8sEndpointsListener);
         k8sNetworkService.addListener(internalK8sNetworkListener);
 
-        groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
-
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
         leadershipService.withdraw(appId.name());
-        k8sPodService.removeListener(internalK8sPodListener);
         k8sNodeService.removeListener(internalNodeEventListener);
         k8sServiceService.removeListener(internalK8sServiceListener);
         k8sEndpointsService.removeListener(internalK8sEndpointsListener);
@@ -376,64 +355,55 @@
         return map;
     }
 
-    private void setGroupBucketsFromPod(DeviceId deviceId, Service service,
-                                        Pod pod, boolean install) {
-        if (pod.getMetadata().getAnnotations() == null) {
-            return;
-        }
-
-        String podIpStr = pod.getMetadata().getAnnotations().get(IP_ADDRESS);
-
-        setGroupBuckets(deviceId, service, podIpStr, install);
-    }
-
-    private void setGroupBuckets(DeviceId deviceId, Service service,
-                                 String podIp, boolean install) {
+    private void setGroupBuckets(Service service, boolean install) {
         Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
         Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
         Map<String, String> nodeIpGatewayIpMap =
                 nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
 
-        spEpasMap.forEach((sp, epas) -> {
-            List<GroupBucket> bkts = Lists.newArrayList();
+        for (K8sNode node : k8sNodeService.completeNodes()) {
+            spEpasMap.forEach((sp, epas) -> {
+                List<GroupBucket> bkts = Lists.newArrayList();
 
-            if (install) {
-                if (epas.contains(podIp)) {
-                    bkts = buildBuckets(deviceId,
-                            nodeIpGatewayIpMap.getOrDefault(podIp, podIp), sp);
-                }
-            } else {
-                bkts = buildBuckets(deviceId,
-                        nodeIpGatewayIpMap.getOrDefault(podIp, podIp), sp);
-            }
-
-            spGrpBkts.put(sp, bkts);
-        });
-
-        String serviceIp = service.getSpec().getClusterIP();
-        spGrpBkts.forEach((sp, bkts) -> {
-            String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
-            int groupId = svcStr.hashCode();
-
-            k8sGroupRuleService.setBuckets(appId, deviceId, groupId, bkts, install);
-        });
-
-        spEpasMap.forEach((sp, epas) ->
-            // add flow rules for unshifting IP domain
-            epas.forEach(epa -> {
-                        setUnshiftDomainRules(deviceId, POD_TABLE,
-                                PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
-                                sp.getProtocol(), nodeIpGatewayIpMap.getOrDefault(epa, epa),
-                                sp.getTargetPort().getIntVal(), install);
+                for (String ip : epas) {
+                    if (install) {
+                        bkts.add(buildBuckets(node.intgBridge(),
+                                nodeIpGatewayIpMap.getOrDefault(ip, ip), sp));
+                    } else {
+                        bkts.add(buildBuckets(node.intgBridge(),
+                                nodeIpGatewayIpMap.getOrDefault(ip, ip), sp));
                     }
-        ));
+                }
+
+                spGrpBkts.put(sp, bkts);
+            });
+
+            String serviceIp = service.getSpec().getClusterIP();
+            spGrpBkts.forEach((sp, bkts) -> {
+                String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
+                int groupId = svcStr.hashCode();
+
+                if (bkts.size() > 0) {
+                    k8sGroupRuleService.setBuckets(appId, node.intgBridge(), groupId, bkts);
+                }
+            });
+
+            spEpasMap.forEach((sp, epas) ->
+                // add flow rules for unshifting IP domain
+                epas.forEach(epa -> {
+                            setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
+                                    PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+                                    sp.getProtocol(), nodeIpGatewayIpMap.getOrDefault(epa, epa),
+                                    sp.getTargetPort().getIntVal(), install);
+                        }
+                )
+            );
+        }
     }
 
-    private List<GroupBucket> buildBuckets(DeviceId deviceId,
+    private GroupBucket buildBuckets(DeviceId deviceId,
                                            String podIpStr,
                                            ServicePort sp) {
-        List<GroupBucket> bkts = Lists.newArrayList();
-
         TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
                 .setIpDst(IpAddress.valueOf(podIpStr));
 
@@ -447,9 +417,7 @@
                 deviceService.getDevice(deviceId), ROUTING_TABLE);
         tBuilder.extension(resubmitTreatment, deviceId);
 
-        bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
-
-        return bkts;
+        return buildGroupBucket(tBuilder.build(), SELECT, (short) -1);
     }
 
     private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
@@ -622,8 +590,6 @@
     private void setStatefulGroupFlowRules(DeviceId deviceId, long ctState,
                                            long ctMask, Service service,
                                            boolean install) {
-        int groupId = (int) groupIdCounter.incrementAndGet();
-
         List<GroupBucket> buckets = Lists.newArrayList();
 
         String serviceName = service.getMetadata().getName();
@@ -631,6 +597,10 @@
 
         // TODO: multi-ports case should be addressed
         Integer servicePort = service.getSpec().getPorts().get(0).getPort();
+        String serviceProtocol = service.getSpec().getPorts().get(0).getProtocol();
+
+        String svcStr = servicePortStr(serviceIp, servicePort, serviceProtocol);
+        int groupId = svcStr.hashCode();
 
         List<Endpoints> endpointses = k8sEndpointsService.endpointses()
                 .stream()
@@ -763,29 +733,17 @@
                 install);
     }
 
-    private void setK8sApiRules(K8sNode k8sNode, Endpoints endpoints, boolean install) {
-        if (KUBERNETES.equals(endpoints.getMetadata().getName())) {
-            Service service = k8sServiceService.services().stream().filter(s ->
-                    KUBERNETES.equals(s.getMetadata().getName()))
-                    .findFirst().orElse(null);
-            if (service == null) {
-                return;
-            }
+    private void setEndpointsRules(Endpoints endpoints, boolean install) {
+        String appName = endpoints.getMetadata().getName();
+        Service service = k8sServiceService.services().stream().filter(s ->
+                appName.equals(s.getMetadata().getName()))
+                .findFirst().orElse(null);
 
-            if (k8sNode != null) {
-                endpoints.getSubsets().forEach(s -> {
-                    s.getAddresses().forEach(a -> setGroupBuckets(
-                            k8sNode.intgBridge(), service, a.getIp(), install));
-                });
-            }
-
-            k8sNodeService.completeNodes().forEach(n -> {
-                endpoints.getSubsets().forEach(s -> {
-                    s.getAddresses().forEach(a -> setGroupBuckets(
-                            n.intgBridge(), service, a.getIp(), install));
-                });
-            });
+        if (service == null) {
+            return;
         }
+
+        setGroupBuckets(service, install);
     }
 
     private String servicePortStr(String ip, int port, String protocol) {
@@ -820,20 +778,6 @@
         }
     }
 
-    private void setServiceRuleFromPod(Pod pod, boolean install) {
-        k8sServiceService.services().forEach(s -> {
-            pod.getMetadata().getLabels().forEach((pk, pv) -> {
-                Map<String, String> selectors = s.getSpec().getSelector();
-                if (selectors != null && selectors.containsKey(pk)) {
-                    if (pv.equals(selectors.get(pk))) {
-                        k8sNodeService.completeNodes().forEach(n ->
-                                setGroupBucketsFromPod(n.intgBridge(), s, pod, install));
-                    }
-                }
-            });
-        });
-    }
-
     private class InternalK8sServiceListener implements K8sServiceListener {
 
         private boolean isRelevantHelper() {
@@ -893,44 +837,6 @@
         }
     }
 
-    private class InternalK8sPodListener implements K8sPodListener {
-
-        private boolean isRelevantHelper() {
-            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
-        }
-
-        @Override
-        public void event(K8sPodEvent event) {
-            switch (event.type()) {
-                case K8S_POD_CREATED:
-                case K8S_POD_UPDATED:
-                    eventExecutor.execute(() -> processPodDetection(event.subject()));
-                    break;
-                case K8S_POD_REMOVED:
-                    eventExecutor.execute(() -> processPodRemoval(event.subject()));
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        private void processPodDetection(Pod pod) {
-            if (!isRelevantHelper()) {
-                return;
-            }
-
-            setServiceRuleFromPod(pod, true);
-        }
-
-        private void processPodRemoval(Pod pod) {
-            if (!isRelevantHelper()) {
-                return;
-            }
-
-            setServiceRuleFromPod(pod, false);
-        }
-    }
-
     private class InternalK8sEndpointsListener implements K8sEndpointsListener {
 
         private boolean isRelevantHelper() {
@@ -943,6 +849,7 @@
 
             switch (event.type()) {
                 case K8S_ENDPOINTS_CREATED:
+                case K8S_ENDPOINTS_UPDATED:
                     eventExecutor.execute(() -> processEndpointsCreation(endpoints));
                     break;
                 case K8S_ENDPOINTS_REMOVED:
@@ -958,7 +865,7 @@
                 return;
             }
 
-            setK8sApiRules(null, endpoints, true);
+            setEndpointsRules(endpoints, true);
         }
 
         private void processEndpointsRemoval(Endpoints endpoints) {
@@ -966,7 +873,7 @@
                 return;
             }
 
-            setK8sApiRules(null, endpoints, false);
+            setEndpointsRules(endpoints, false);
         }
     }
 
@@ -996,7 +903,7 @@
             }
 
             setServiceNatRules(node.intgBridge(), true);
-            k8sEndpointsService.endpointses().forEach(e -> setK8sApiRules(node, e, true));
+            k8sEndpointsService.endpointses().forEach(e -> setEndpointsRules(e, true));
             k8sNetworkService.networks().forEach(n -> setupServiceDefaultRule(n, true));
         }
     }