Fix: set dummy group rules on receiving service add/remove events

Change-Id: I5e33bfaedc827b9563d08f60adf73ddf4de8ca73
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index 5809aab..26d8400 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -37,6 +37,8 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.GroupId;
+import org.onosproject.k8snetworking.api.K8sEndpointsEvent;
+import org.onosproject.k8snetworking.api.K8sEndpointsListener;
 import org.onosproject.k8snetworking.api.K8sEndpointsService;
 import org.onosproject.k8snetworking.api.K8sFlowRuleService;
 import org.onosproject.k8snetworking.api.K8sGroupRuleService;
@@ -137,6 +139,7 @@
     private static final String GROUP_ID_COUNTER_NAME = "group-id-counter";
 
     private static final String IP_ADDRESS = "ipAddress";
+    private static final String KUBERNETES = "kubernetes";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
@@ -191,6 +194,8 @@
             new InternalK8sServiceListener();
     private final InternalK8sPodListener internalK8sPodListener =
             new InternalK8sPodListener();
+    private final InternalK8sEndpointsListener internalK8sEndpointsListener =
+            new InternalK8sEndpointsListener();
 
     private AtomicCounter groupIdCounter;
 
@@ -206,6 +211,7 @@
         k8sNodeService.addListener(internalNodeEventListener);
         k8sServiceService.addListener(internalK8sServiceListener);
         k8sPodService.addListener(internalK8sPodListener);
+        k8sEndpointsService.addListener(internalK8sEndpointsListener);
 
         groupIdCounter = storageService.getAtomicCounter(GROUP_ID_COUNTER_NAME);
 
@@ -218,6 +224,7 @@
         k8sPodService.removeListener(internalK8sPodListener);
         k8sNodeService.removeListener(internalNodeEventListener);
         k8sServiceService.removeListener(internalK8sServiceListener);
+        k8sEndpointsService.removeListener(internalK8sEndpointsListener);
         configService.unregisterProperties(getClass(), false);
         eventExecutor.shutdown();
 
@@ -354,26 +361,35 @@
         return map;
     }
 
-    private void setGroupBuckets(DeviceId deviceId, Service service, Pod pod, boolean install) {
-
+    private void setGroupBucketsFromPod(DeviceId deviceId, Service service,
+                                        Pod pod, boolean install) {
         if (pod.getMetadata().getAnnotations() == null) {
             return;
         }
 
         String podIpStr = pod.getMetadata().getAnnotations().get(IP_ADDRESS);
 
+        setGroupBuckets(deviceId, service, podIpStr, install);
+    }
+
+    private void setGroupBuckets(DeviceId deviceId, Service service,
+                                 String podIp, boolean install) {
         Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
         Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
+        Map<String, String> nodeIpGatewayIpMap =
+                nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
 
         spEpasMap.forEach((sp, epas) -> {
             List<GroupBucket> bkts = Lists.newArrayList();
 
             if (install) {
-                if (epas.contains(podIpStr)) {
-                    bkts = buildBuckets(deviceId, podIpStr, sp);
+                if (epas.contains(podIp)) {
+                    bkts = buildBuckets(deviceId,
+                            nodeIpGatewayIpMap.getOrDefault(podIp, podIp), sp);
                 }
             } else {
-                bkts = buildBuckets(deviceId, podIpStr, sp);
+                bkts = buildBuckets(deviceId,
+                        nodeIpGatewayIpMap.getOrDefault(podIp, podIp), sp);
             }
 
             spGrpBkts.put(sp, bkts);
@@ -386,6 +402,16 @@
 
             k8sGroupRuleService.setBuckets(appId, deviceId, groupId, bkts, install);
         });
+
+        spEpasMap.forEach((sp, epas) ->
+            // add flow rules for unshifting IP domain
+            epas.forEach(epa -> {
+                        setUnshiftDomainRules(deviceId, POD_TABLE,
+                                PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+                                sp.getProtocol(), nodeIpGatewayIpMap.getOrDefault(epa, epa),
+                                sp.getTargetPort().getIntVal(), install);
+                    }
+        ));
     }
 
     private List<GroupBucket> buildBuckets(DeviceId deviceId,
@@ -414,36 +440,14 @@
     private synchronized void setStatelessGroupFlowRules(DeviceId deviceId,
                                                          Service service,
                                                          boolean install) {
-        Map<ServicePort, Set<String>> spEpasMap = getSportEpAddressMap(service);
-        Map<String, String> nodeIpGatewayIpMap =
-                nodeIpGatewayIpMap(k8sNodeService, k8sNetworkService);
-        Map<ServicePort, List<GroupBucket>> spGrpBkts = Maps.newConcurrentMap();
-
-        spEpasMap.forEach((sp, epas) -> {
-            List<GroupBucket> bkts = Lists.newArrayList();
-            epas.forEach(epa -> {
-                String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
-
-                TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
-                        .setIpDst(IpAddress.valueOf(podIp));
-
-                if (TCP.equals(sp.getProtocol())) {
-                    tBuilder.setTcpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
-                } else if (UDP.equals(sp.getProtocol())) {
-                    tBuilder.setUdpDst(TpPort.tpPort(sp.getTargetPort().getIntVal()));
-                }
-
-                ExtensionTreatment resubmitTreatment = buildResubmitExtension(
-                        deviceService.getDevice(deviceId), ROUTING_TABLE);
-                tBuilder.extension(resubmitTreatment, deviceId);
-
-                bkts.add(buildGroupBucket(tBuilder.build(), SELECT, (short) -1));
-            });
-            spGrpBkts.put(sp, bkts);
-        });
+        Set<ServicePort> sps = service.getSpec().getPorts().stream()
+                .filter(Objects::nonNull)
+                .filter(sp -> sp.getTargetPort() != null)
+                .filter(sp -> sp.getTargetPort().getIntVal() != null)
+                .collect(Collectors.toSet());
 
         String serviceIp = service.getSpec().getClusterIP();
-        spGrpBkts.forEach((sp, bkts) -> {
+        sps.forEach(sp -> {
             String svcStr = servicePortStr(serviceIp, sp.getPort(), sp.getProtocol());
             int groupId = svcStr.hashCode();
 
@@ -451,7 +455,7 @@
 
                 // add group table rules
                 k8sGroupRuleService.setRule(appId, deviceId, groupId,
-                        SELECT, bkts, true);
+                        SELECT, Lists.newArrayList(), true);
 
                 log.info("Adding group rule {}", groupId);
 
@@ -471,21 +475,11 @@
 
                 // remove group table rules
                 k8sGroupRuleService.setRule(appId, deviceId, groupId,
-                        SELECT, bkts, false);
+                        SELECT, Lists.newArrayList(), false);
 
                 log.info("Removing group rule {}", groupId);
             }
         });
-
-        spEpasMap.forEach((sp, epas) ->
-            // add flow rules for unshifting IP domain
-            epas.forEach(epa -> {
-                String podIp = nodeIpGatewayIpMap.getOrDefault(epa, epa);
-                setUnshiftDomainRules(deviceId, POD_TABLE,
-                PRIORITY_NAT_RULE, serviceIp, sp.getPort(), sp.getProtocol(),
-                        podIp, sp.getTargetPort().getIntVal(), install);
-            }
-        ));
     }
 
     private void setShiftDomainRules(DeviceId deviceId, int installTable,
@@ -708,6 +702,31 @@
                 install);
     }
 
+    private void setK8sApiRules(K8sNode k8sNode, Endpoints endpoints, boolean install) {
+        if (KUBERNETES.equals(endpoints.getMetadata().getName())) {
+            Service service = k8sServiceService.services().stream().filter(s ->
+                    KUBERNETES.equals(s.getMetadata().getName()))
+                    .findFirst().orElse(null);
+            if (service == null) {
+                return;
+            }
+
+            if (k8sNode != null) {
+                endpoints.getSubsets().forEach(s -> {
+                    s.getAddresses().forEach(a -> setGroupBuckets(
+                            k8sNode.intgBridge(), service, a.getIp(), install));
+                });
+            }
+
+            k8sNodeService.completeNodes().forEach(n -> {
+                endpoints.getSubsets().forEach(s -> {
+                    s.getAddresses().forEach(a -> setGroupBuckets(
+                            n.intgBridge(), service, a.getIp(), install));
+                });
+            });
+        }
+    }
+
     /**
      * Extracts properties from the component configuration context.
      *
@@ -731,6 +750,20 @@
         }
     }
 
+    private void setServiceRuleFromPod(Pod pod, boolean install) {
+        k8sServiceService.services().forEach(s -> {
+            pod.getMetadata().getLabels().forEach((pk, pv) -> {
+                Map<String, String> selectors = s.getSpec().getSelector();
+                if (selectors != null && selectors.containsKey(pk)) {
+                    if (pv.equals(selectors.get(pk))) {
+                        k8sNodeService.completeNodes().forEach(n ->
+                                setGroupBucketsFromPod(n.intgBridge(), s, pod, install));
+                    }
+                }
+            });
+        });
+    }
+
     private class InternalK8sServiceListener implements K8sServiceListener {
 
         private boolean isRelevantHelper() {
@@ -799,8 +832,9 @@
         @Override
         public void event(K8sPodEvent event) {
             switch (event.type()) {
-                case K8S_POD_ANNOTATION_ADDED:
-                    eventExecutor.execute(() -> processPodAnnotAddition(event.subject()));
+                case K8S_POD_CREATED:
+                case K8S_POD_UPDATED:
+                    eventExecutor.execute(() -> processPodDetection(event.subject()));
                     break;
                 case K8S_POD_REMOVED:
                     eventExecutor.execute(() -> processPodRemoval(event.subject()));
@@ -810,7 +844,7 @@
             }
         }
 
-        private void processPodAnnotAddition(Pod pod) {
+        private void processPodDetection(Pod pod) {
             if (!isRelevantHelper()) {
                 return;
             }
@@ -825,19 +859,44 @@
 
             setServiceRuleFromPod(pod, false);
         }
+    }
 
-        private void setServiceRuleFromPod(Pod pod, boolean install) {
-            k8sServiceService.services().forEach(s -> {
-                pod.getMetadata().getLabels().forEach((pk, pv) -> {
-                    Map<String, String> selectors = s.getSpec().getSelector();
-                    if (selectors != null && selectors.containsKey(pk)) {
-                        if (pv.equals(selectors.get(pk))) {
-                            k8sNodeService.completeNodes().forEach(n ->
-                                setGroupBuckets(n.intgBridge(), s, pod, install));
-                        }
-                    }
-                });
-            });
+    private class InternalK8sEndpointsListener implements K8sEndpointsListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sEndpointsEvent event) {
+            Endpoints endpoints = event.subject();
+
+            switch (event.type()) {
+                case K8S_ENDPOINTS_CREATED:
+                    eventExecutor.execute(() -> processEndpointsCreation(endpoints));
+                    break;
+                case K8S_ENDPOINTS_REMOVED:
+                    eventExecutor.execute(() -> processEndpointsRemoval(endpoints));
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processEndpointsCreation(Endpoints endpoints) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setK8sApiRules(null, endpoints, true);
+        }
+
+        private void processEndpointsRemoval(Endpoints endpoints) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            setK8sApiRules(null, endpoints, false);
         }
     }
 
@@ -867,6 +926,8 @@
             }
 
             setServiceNatRules(node.intgBridge(), true);
+
+            k8sEndpointsService.endpointses().forEach(e -> setK8sApiRules(node, e, true));
         }
     }
 }