Fix for ONOS-5035.Cherry picked from commit f1621f163d1b48d7056ef676cdcece3822bde0f3

Change-Id: I99cce9565f479a03e9e472713efc6d4ddd749108
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index bfb60c7..437474f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -48,18 +48,25 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.slf4j.Logger;
 
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.DefaultAnnotations.merge;
 import static org.onosproject.net.host.HostEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -80,10 +87,15 @@
 
     private ConsistentMap<HostId, DefaultHost> hostsConsistentMap;
     private Map<HostId, DefaultHost> hosts;
+    private Map<IpAddress, Set<Host>> hostsByIp;
 
     private MapEventListener<HostId, DefaultHost> hostLocationTracker =
             new HostLocationTracker();
 
+    private ScheduledExecutorService executor;
+
+    private Consumer<Status> statusChangeListener;
+
     @Activate
     public void activate() {
         KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
@@ -100,6 +112,14 @@
 
         hostsConsistentMap.addListener(hostLocationTracker);
 
+        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log));
+        statusChangeListener = status -> {
+            if (status == Status.ACTIVE) {
+                executor.execute(this::loadHostsByIp);
+            }
+        };
+        hostsConsistentMap.addStatusChangeListener(statusChangeListener);
+        loadHostsByIp();
         log.info("Started");
     }
 
@@ -110,6 +130,20 @@
         log.info("Stopped");
     }
 
+    private void loadHostsByIp() {
+        hostsByIp = new ConcurrentHashMap<IpAddress, Set<Host>>();
+        hostsConsistentMap.asJavaMap().values().forEach(host -> {
+            host.ipAddresses().forEach(ip -> {
+                Set<Host> existingHosts = hostsByIp.get(ip);
+                if (existingHosts == null) {
+                    hostsByIp.put(ip, addHosts(host));
+                } else {
+                    existingHosts.add(host);
+                }
+            });
+        });
+    }
+
     private boolean shouldUpdate(DefaultHost existingHost,
                                  ProviderId providerId,
                                  HostId hostId,
@@ -145,8 +179,8 @@
         // check to see if any of the annotations provided by hostDescription
         // differ from those in the existing host
         return hostDescription.annotations().keys().stream()
-                    .anyMatch(k -> !Objects.equals(hostDescription.annotations().value(k),
-                                                   existingHost.annotations().value(k)));
+                   .anyMatch(k -> !Objects.equals(hostDescription.annotations().value(k),
+                                                  existingHost.annotations().value(k)));
 
 
     }
@@ -215,6 +249,7 @@
                 if (addresses != null && addresses.contains(ipAddress)) {
                     addresses = new HashSet<>(existingHost.ipAddresses());
                     addresses.remove(ipAddress);
+                    removeIpFromHostsByIp(existingHost, ipAddress);
                     return new DefaultHost(existingHost.providerId(),
                             hostId,
                             existingHost.mac(),
@@ -258,7 +293,8 @@
 
     @Override
     public Set<Host> getHosts(IpAddress ip) {
-        return filter(hosts.values(), host -> host.ipAddresses().contains(ip));
+        Set<Host> hosts = hostsByIp.get(ip);
+        return hosts != null ? ImmutableSet.copyOf(hosts) : ImmutableSet.of();
     }
 
     @Override
@@ -283,18 +319,70 @@
         return collection.stream().filter(predicate).collect(Collectors.toSet());
     }
 
+    private Set<Host> addHosts(Host host) {
+        Set<Host> hosts = Sets.newConcurrentHashSet();
+        hosts.add(host);
+        return hosts;
+    }
+
+    private Set<Host> updateHosts(Set<Host> existingHosts, Host host) {
+        Iterator<Host> iterator = existingHosts.iterator();
+        while (iterator.hasNext()) {
+            Host existingHost = iterator.next();
+            if (existingHost.id().equals(host.id())) {
+                iterator.remove();
+            }
+        }
+        existingHosts.add(host);
+        return existingHosts;
+    }
+
+    private Set<Host> removeHosts(Set<Host> existingHosts, Host host) {
+        if (existingHosts != null) {
+            Iterator<Host> iterator = existingHosts.iterator();
+            while (iterator.hasNext()) {
+                Host existingHost = iterator.next();
+                if (existingHost.id().equals(host.id())) {
+                    iterator.remove();
+                }
+            }
+        }
+
+        if (existingHosts.isEmpty()) {
+            return null;
+        }
+        return existingHosts;
+    }
+
+    private void updateHostsByIp(DefaultHost host) {
+        host.ipAddresses().forEach(ip -> {
+            hostsByIp.compute(ip, (k, v) -> v == null ? addHosts(host)
+                    : updateHosts(v, host));
+        });
+    }
+
+    private void removeHostsByIp(DefaultHost host) {
+        host.ipAddresses().forEach(ip -> {
+            hostsByIp.computeIfPresent(ip, (k, v) -> removeHosts(v, host));
+        });
+    }
+
+    private void removeIpFromHostsByIp(DefaultHost host, IpAddress ip) {
+        hostsByIp.computeIfPresent(ip, (k, v) -> removeHosts(v, host));
+    }
+
     private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> {
         @Override
         public void event(MapEvent<HostId, DefaultHost> event) {
-            Host host;
+            DefaultHost host = checkNotNull(event.value().value());
             switch (event.type()) {
                 case INSERT:
-                    host = checkNotNull(event.newValue().value());
+                    updateHostsByIp(host);
                     notifyDelegate(new HostEvent(HOST_ADDED, host));
                     break;
                 case UPDATE:
-                    host = checkNotNull(event.newValue().value());
-                    Host prevHost = checkNotNull(event.oldValue().value());
+                    updateHostsByIp(host);
+                    DefaultHost prevHost = checkNotNull(event.oldValue().value());
                     if (!Objects.equals(prevHost.location(), host.location())) {
                         notifyDelegate(new HostEvent(HOST_MOVED, host, prevHost));
                     } else if (!Objects.equals(prevHost, host)) {
@@ -302,7 +390,7 @@
                     }
                     break;
                 case REMOVE:
-                    host = checkNotNull(event.oldValue().value());
+                    updateHostsByIp(host);
                     notifyDelegate(new HostEvent(HOST_REMOVED, host));
                     break;
                 default: