CORD-1416 Implement multi-homing probing in HostLocationProvider

Also include following refactoring
    - Refactor the way we generate ARP probe
    - Remove some unused code

Change-Id: I96b1c47bd5731b7b38ef4d19a941d231b5d0054c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index bc0dd00..b5dcc76 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -15,6 +15,9 @@
  */
 package org.onosproject.store.host.impl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
@@ -49,6 +52,7 @@
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
 import java.util.Collection;
@@ -58,7 +62,10 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -88,29 +95,67 @@
     private ConsistentMap<HostId, DefaultHost> hostsConsistentMap;
     private Map<HostId, DefaultHost> hosts;
     private Map<IpAddress, Set<Host>> hostsByIp;
-
     private MapEventListener<HostId, DefaultHost> hostLocationTracker =
             new HostLocationTracker();
 
+    private ConsistentMap<MacAddress, PendingHostLocation> pendingHostsConsistentMap;
+    private Map<MacAddress, PendingHostLocation> pendingHosts;
+    private MapEventListener<MacAddress, PendingHostLocation> pendingHostListener =
+            new PendingHostListener();
+
     private ScheduledExecutorService executor;
 
     private Consumer<Status> statusChangeListener;
 
+    // TODO make this configurable
+    private static final int PROBE_TIMEOUT_MS = 1000;
+
+    private Cache<MacAddress, PendingHostLocation> pendingHostsCache = CacheBuilder.newBuilder()
+            .expireAfterWrite(PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+            .removalListener((RemovalNotification<MacAddress, PendingHostLocation> notification) -> {
+                switch (notification.getCause()) {
+                    case EXPIRED:
+                        PendingHostLocation expired = notification.getValue();
+                        if (expired != null) {
+                            log.info("Evict timeout probe {} from pendingHostLocations", notification.getValue());
+                            timeoutPendingHostLocation(notification.getKey());
+                        }
+                        break;
+                    case EXPLICIT:
+                        break;
+                    default:
+                        log.warn("Remove {} from pendingHostLocations for unexpected reason {}",
+                                notification.getKey(), notification.getCause());
+                }
+            }).build();
+
+    private ScheduledExecutorService cacheCleaner = Executors.newSingleThreadScheduledExecutor();
+
     @Activate
     public void activate() {
         KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API);
-
         hostsConsistentMap = storageService.<HostId, DefaultHost>consistentMapBuilder()
                 .withName("onos-hosts")
                 .withRelaxedReadConsistency()
                 .withSerializer(Serializer.using(hostSerializer.build()))
                 .build();
-
+        hostsConsistentMap.addListener(hostLocationTracker);
         hosts = hostsConsistentMap.asJavaMap();
 
+        KryoNamespace.Builder pendingHostSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(PendingHostLocation.class);
+        pendingHostsConsistentMap = storageService.<MacAddress, PendingHostLocation>consistentMapBuilder()
+                .withName("onos-hosts-pending")
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(pendingHostSerializer.build()))
+                .build();
+        pendingHostsConsistentMap.addListener(pendingHostListener);
+        pendingHosts = pendingHostsConsistentMap.asJavaMap();
 
-        hostsConsistentMap.addListener(hostLocationTracker);
+        cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0,
+                PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
 
         executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log));
         statusChangeListener = status -> {
@@ -127,6 +172,8 @@
     public void deactivate() {
         hostsConsistentMap.removeListener(hostLocationTracker);
 
+        cacheCleaner.shutdown();
+
         log.info("Stopped");
     }
 
@@ -335,6 +382,32 @@
         return ImmutableSet.copyOf(filtered);
     }
 
+    @Override
+    public MacAddress addPendingHostLocation(HostId hostId, HostLocation hostLocation) {
+        // Use ONLab OUI (3 bytes) + atomic counter (3 bytes) as the source MAC of the probe
+        long nextIndex = storageService.getAtomicCounter("onos-hosts-probe-index").getAndIncrement();
+        MacAddress probeMac = MacAddress.valueOf(MacAddress.NONE.toLong() + nextIndex);
+        PendingHostLocation phl = new PendingHostLocation(hostId, hostLocation);
+
+        pendingHostsCache.put(probeMac, phl);
+        pendingHosts.put(probeMac, phl);
+
+        return probeMac;
+    }
+
+    @Override
+    public void removePendingHostLocation(MacAddress probeMac) {
+        pendingHostsCache.invalidate(probeMac);
+        pendingHosts.remove(probeMac);
+    }
+
+    private void timeoutPendingHostLocation(MacAddress probeMac) {
+        pendingHosts.computeIfPresent(probeMac, (k, v) -> {
+            v.setExpired(true);
+            return v;
+        });
+    }
+
     private Set<Host> filter(Collection<DefaultHost> collection, Predicate<DefaultHost> predicate) {
         return collection.stream().filter(predicate).collect(Collectors.toSet());
     }
@@ -418,4 +491,28 @@
             }
         }
     }
+
+    private class PendingHostListener implements MapEventListener<MacAddress, PendingHostLocation> {
+        @Override
+        public void event(MapEvent<MacAddress, PendingHostLocation> event) {
+            Versioned<PendingHostLocation> newValue = event.newValue();
+            switch (event.type()) {
+                case INSERT:
+                    break;
+                case UPDATE:
+                    if (newValue.value().expired()) {
+                        Executor locationRemover = Executors.newSingleThreadScheduledExecutor();
+                        locationRemover.execute(() -> {
+                            pendingHosts.remove(event.key());
+                            removeLocation(newValue.value().hostId(), newValue.value().location());
+                        });
+                    }
+                    break;
+                case REMOVE:
+                    break;
+                default:
+                    log.warn("Unknown map event type: {}", event.type());
+            }
+        }
+    }
 }