Fix: guarantee to reserve an IP address from IPAM pool for k8s POD

Returns port number as zero in case pod has null spec value.
Also return null group bucket when we have zero port number returned.

Change-Id: Iad6f307e950e263732db5f6349c83a60a194cb23
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamHandler.java
index fefb952..e7d0c45 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamHandler.java
@@ -43,7 +43,6 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
-import static java.lang.Thread.sleep;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
@@ -61,8 +60,6 @@
     private static final String IP_ADDRESS = "ipAddress";
     private static final String NETWORK_ID = "networkId";
 
-    private static final int RETRY_NUM = 5;
-    private static final int RETRY_DELAY_MS = 3000;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
@@ -142,7 +139,25 @@
             }
 
             Set<IpAddress> ips = getSubnetIps(event.subject().cidr());
-            k8sIpamAdminService.initializeIpPool(event.subject().networkId(), ips);
+            String networkId = event.subject().networkId();
+            k8sIpamAdminService.initializeIpPool(networkId, ips);
+
+            k8sPodService.pods().stream()
+                    .filter(p -> p.getStatus().getPodIP() != null)
+                    .filter(p -> p.getMetadata().getAnnotations() != null)
+                    .filter(p -> networkId.equals(p.getMetadata()
+                                 .getAnnotations().get(NETWORK_ID)))
+                    .forEach(p -> {
+                        String podIp = p.getStatus().getPodIP();
+
+                        // if the POD with valid IP address has not yet been
+                        // added into IPAM IP pool, we will reserve that IP address
+                        // for the POD
+                        if (!k8sIpamAdminService.allocatedIps(networkId)
+                                .contains(IpAddress.valueOf(podIp))) {
+                            k8sIpamAdminService.reserveIp(networkId, IpAddress.valueOf(podIp));
+                        }
+                    });
         }
 
         private void processNetworkRemoval(K8sNetworkEvent event) {
@@ -201,15 +216,11 @@
 
             k8sIpamAdminService.availableIps(annotNetwork);
 
-            int cnt = 0;
-            while ((RETRY_NUM - cnt > 0) && !containIp(annotIp, annotNetwork)) {
-                try {
-                    sleep(RETRY_DELAY_MS);
-                } catch (InterruptedException e) {
-                    log.error("Exception caused during checking available IP addresses");
-                }
-
-                cnt++;
+            // if the kubernetes network has been initialized, we may have
+            // empty available IP pool, in this case, we will postpone IP reserve
+            // process until finishing kubernetes network initialization
+            if (!containIp(annotIp, annotNetwork)) {
+                return;
             }
 
             k8sIpamAdminService.reserveIp(annotNetwork, IpAddress.valueOf(podIp));
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
index 2640508..18b9a11 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sServiceHandler.java
@@ -398,6 +398,10 @@
                                 }
                             }
 
+                            if (targetPortInt == 0) {
+                                continue;
+                            }
+
                             for (EndpointPort endpointPort : endpointSubset.getPorts()) {
                                 if (targetProtocol.equals(endpointPort.getProtocol()) &&
                                         (targetPortInt.equals(endpointPort.getPort()) ||
@@ -426,12 +430,17 @@
                 List<GroupBucket> bkts = Lists.newArrayList();
 
                 for (String ip : epas) {
+                    GroupBucket bkt = buildBuckets(node.intgBridge(),
+                            nodeIpGatewayIpMap.getOrDefault(ip, ip), sp);
+
+                    if (bkt == null) {
+                        continue;
+                    }
+
                     if (install) {
-                        bkts.add(buildBuckets(node.intgBridge(),
-                                nodeIpGatewayIpMap.getOrDefault(ip, ip), sp));
+                        bkts.add(bkt);
                     } else {
-                        bkts.add(buildBuckets(node.intgBridge(),
-                                nodeIpGatewayIpMap.getOrDefault(ip, ip), sp));
+                        bkts.remove(bkt);
                     }
                 }
 
@@ -462,10 +471,12 @@
                             targetPort = sp.getTargetPort().getIntVal();
                         }
 
-                        setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
-                                PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
-                                sp.getProtocol(), podIp,
-                                targetPort, install);
+                        if (targetPort != 0) {
+                            setUnshiftDomainRules(node.intgBridge(), POD_TABLE,
+                                    PRIORITY_NAT_RULE, serviceIp, sp.getPort(),
+                                    sp.getProtocol(), podIp,
+                                    targetPort, install);
+                        }
                     })
             );
         }
@@ -483,6 +494,10 @@
             targetPort = sp.getTargetPort().getIntVal();
         }
 
+        if (targetPort == 0) {
+            return null;
+        }
+
         if (TCP.equals(sp.getProtocol())) {
             tBuilder.setTcpDst(TpPort.tpPort(targetPort));
         } else if (UDP.equals(sp.getProtocol())) {
@@ -493,6 +508,7 @@
                 deviceService.getDevice(deviceId), ACL_TABLE);
         tBuilder.extension(resubmitTreatment, deviceId);
 
+        // TODO: need to adjust group bucket weight by considering POD locality
         return buildGroupBucket(tBuilder.build(), SELECT, (short) -1);
     }
 
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
index 045b300..7d20b65 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
@@ -492,6 +492,11 @@
      *         return 0 if there is no port number mapped with the given port name
      */
     public static int portNumberByName(Pod pod, String portName) {
+
+        if (pod == null || pod.getSpec() == null) {
+            return 0;
+        }
+
         for (Container container : pod.getSpec().getContainers()) {
             for (ContainerPort cp : container.getPorts()) {
                 if (cp.getName() != null && cp.getName().equals(portName)) {