Reserve POD IP address to avoid allocating duplicated IP addresses

Change-Id: I0fa42d0d17a35184730e797e394502dfceae7525
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamHandler.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamHandler.java
index 5591d2e..4bc6f7c 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamHandler.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamHandler.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.k8snetworking.impl;
 
+import io.fabric8.kubernetes.api.model.Pod;
+import org.apache.commons.lang.StringUtils;
 import org.onlab.packet.IpAddress;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
@@ -25,6 +27,9 @@
 import org.onosproject.k8snetworking.api.K8sNetworkEvent;
 import org.onosproject.k8snetworking.api.K8sNetworkListener;
 import org.onosproject.k8snetworking.api.K8sNetworkService;
+import org.onosproject.k8snetworking.api.K8sPodEvent;
+import org.onosproject.k8snetworking.api.K8sPodListener;
+import org.onosproject.k8snetworking.api.K8sPodService;
 import org.onosproject.mastership.MastershipService;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -33,6 +38,7 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -51,6 +57,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    private static final String IP_ADDRESS = "ipAddress";
+    private static final String NETWORK_ID = "networkId";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
@@ -67,12 +76,17 @@
     protected K8sNetworkService k8sNetworkService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected K8sPodService k8sPodService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected K8sIpamAdminService k8sIpamAdminService;
 
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
     private final InternalK8sNetworkListener k8sNetworkListener =
             new InternalK8sNetworkListener();
+    private final InternalK8sPodListener k8sPodListener =
+            new InternalK8sPodListener();
 
     private ApplicationId appId;
     private NodeId localNodeId;
@@ -83,12 +97,14 @@
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.runForLeadership(appId.name());
         k8sNetworkService.addListener(k8sNetworkListener);
+        k8sPodService.addListener(k8sPodListener);
 
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        k8sPodService.removeListener(k8sPodListener);
         k8sNetworkService.removeListener(k8sNetworkListener);
         leadershipService.withdraw(appId.name());
         eventExecutor.shutdown();
@@ -133,4 +149,53 @@
             k8sIpamAdminService.purgeIpPool(event.subject().networkId());
         }
     }
+
+    private class InternalK8sPodListener implements K8sPodListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(K8sPodEvent event) {
+            switch (event.type()) {
+                case K8S_POD_CREATED:
+                    eventExecutor.execute(() -> processPodCreation(event.subject()));
+                    break;
+                case K8S_POD_REMOVED:
+                default:
+                    break;
+            }
+        }
+
+        private void processPodCreation(Pod pod) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            Map<String, String> annots = pod.getMetadata().getAnnotations();
+
+            if (annots == null || annots.isEmpty()) {
+                return;
+            }
+
+            String annotIp = annots.get(IP_ADDRESS);
+            String annotNetwork = annots.get(NETWORK_ID);
+            String podIp = pod.getStatus().getPodIP();
+
+            if (podIp == null && annotIp == null) {
+                return;
+            }
+
+            if (annotNetwork == null) {
+                return;
+            }
+
+            if (!StringUtils.equals(annotIp, podIp)) {
+                return;
+            }
+
+            k8sIpamAdminService.reserveIp(annotNetwork, IpAddress.valueOf(podIp));
+        }
+    }
 }
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamManager.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamManager.java
index 5501971..7a458c8 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamManager.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/impl/K8sIpamManager.java
@@ -92,6 +92,18 @@
     }
 
     @Override
+    public void reserveIp(String networkId, IpAddress ipAddress) {
+        if (!allocatedIps(networkId).contains(ipAddress)) {
+            String ipamId = networkId + "-" + ipAddress.toString();
+            k8sIpamStore.removeAvailableIp(ipamId);
+            k8sIpamStore.createAllocatedIp(
+                    new DefaultK8sIpam(ipamId, ipAddress, networkId));
+
+            log.info("Reserved the IP {}", ipAddress.toString());
+        }
+    }
+
+    @Override
     public boolean releaseIp(String networkId, IpAddress ipAddress) {
         IpAddress releasedIp = allocatedIps(networkId).stream()
                 .filter(ip -> ip.equals(ipAddress))
diff --git a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
index a263359..a9601d1 100644
--- a/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
+++ b/apps/k8s-networking/app/src/main/java/org/onosproject/k8snetworking/util/K8sNetworkingUtil.java
@@ -168,14 +168,14 @@
      */
     public static Set<IpAddress> getSubnetIps(String cidr) {
         SubnetUtils utils = new SubnetUtils(cidr);
-        utils.setInclusiveHostCount(true);
+        utils.setInclusiveHostCount(false);
         SubnetUtils.SubnetInfo info = utils.getInfo();
         Set<String> allAddresses =
                 new HashSet<>(Arrays.asList(info.getAllAddresses()));
 
         if (allAddresses.size() > 2) {
-            allAddresses.remove(info.getBroadcastAddress());
-            allAddresses.remove(info.getNetworkAddress());
+            allAddresses.remove(info.getLowAddress());
+            allAddresses.remove(info.getHighAddress());
         }
 
         return allAddresses.stream()