Fix for ONOS-5035.Cherry picked from commit f1621f163d1b48d7056ef676cdcece3822bde0f3
Change-Id: I99cce9565f479a03e9e472713efc6d4ddd749108
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index bfb60c7..437474f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -48,18 +48,25 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.DistributedPrimitive.Status;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
@@ -80,10 +87,15 @@
private ConsistentMap<HostId, DefaultHost> hostsConsistentMap;
private Map<HostId, DefaultHost> hosts;
+ private Map<IpAddress, Set<Host>> hostsByIp;
private MapEventListener<HostId, DefaultHost> hostLocationTracker =
new HostLocationTracker();
+ private ScheduledExecutorService executor;
+
+ private Consumer<Status> statusChangeListener;
+
@Activate
public void activate() {
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
@@ -100,6 +112,14 @@
hostsConsistentMap.addListener(hostLocationTracker);
+ executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log));
+ statusChangeListener = status -> {
+ if (status == Status.ACTIVE) {
+ executor.execute(this::loadHostsByIp);
+ }
+ };
+ hostsConsistentMap.addStatusChangeListener(statusChangeListener);
+ loadHostsByIp();
log.info("Started");
}
@@ -110,6 +130,20 @@
log.info("Stopped");
}
+ private void loadHostsByIp() {
+ hostsByIp = new ConcurrentHashMap<IpAddress, Set<Host>>();
+ hostsConsistentMap.asJavaMap().values().forEach(host -> {
+ host.ipAddresses().forEach(ip -> {
+ Set<Host> existingHosts = hostsByIp.get(ip);
+ if (existingHosts == null) {
+ hostsByIp.put(ip, addHosts(host));
+ } else {
+ existingHosts.add(host);
+ }
+ });
+ });
+ }
+
private boolean shouldUpdate(DefaultHost existingHost,
ProviderId providerId,
HostId hostId,
@@ -145,8 +179,8 @@
// check to see if any of the annotations provided by hostDescription
// differ from those in the existing host
return hostDescription.annotations().keys().stream()
- .anyMatch(k -> !Objects.equals(hostDescription.annotations().value(k),
- existingHost.annotations().value(k)));
+ .anyMatch(k -> !Objects.equals(hostDescription.annotations().value(k),
+ existingHost.annotations().value(k)));
}
@@ -215,6 +249,7 @@
if (addresses != null && addresses.contains(ipAddress)) {
addresses = new HashSet<>(existingHost.ipAddresses());
addresses.remove(ipAddress);
+ removeIpFromHostsByIp(existingHost, ipAddress);
return new DefaultHost(existingHost.providerId(),
hostId,
existingHost.mac(),
@@ -258,7 +293,8 @@
@Override
public Set<Host> getHosts(IpAddress ip) {
- return filter(hosts.values(), host -> host.ipAddresses().contains(ip));
+ Set<Host> hosts = hostsByIp.get(ip);
+ return hosts != null ? ImmutableSet.copyOf(hosts) : ImmutableSet.of();
}
@Override
@@ -283,18 +319,70 @@
return collection.stream().filter(predicate).collect(Collectors.toSet());
}
+ private Set<Host> addHosts(Host host) {
+ Set<Host> hosts = Sets.newConcurrentHashSet();
+ hosts.add(host);
+ return hosts;
+ }
+
+ private Set<Host> updateHosts(Set<Host> existingHosts, Host host) {
+ Iterator<Host> iterator = existingHosts.iterator();
+ while (iterator.hasNext()) {
+ Host existingHost = iterator.next();
+ if (existingHost.id().equals(host.id())) {
+ iterator.remove();
+ }
+ }
+ existingHosts.add(host);
+ return existingHosts;
+ }
+
+ private Set<Host> removeHosts(Set<Host> existingHosts, Host host) {
+ if (existingHosts != null) {
+ Iterator<Host> iterator = existingHosts.iterator();
+ while (iterator.hasNext()) {
+ Host existingHost = iterator.next();
+ if (existingHost.id().equals(host.id())) {
+ iterator.remove();
+ }
+ }
+ }
+
+ if (existingHosts.isEmpty()) {
+ return null;
+ }
+ return existingHosts;
+ }
+
+ private void updateHostsByIp(DefaultHost host) {
+ host.ipAddresses().forEach(ip -> {
+ hostsByIp.compute(ip, (k, v) -> v == null ? addHosts(host)
+ : updateHosts(v, host));
+ });
+ }
+
+ private void removeHostsByIp(DefaultHost host) {
+ host.ipAddresses().forEach(ip -> {
+ hostsByIp.computeIfPresent(ip, (k, v) -> removeHosts(v, host));
+ });
+ }
+
+ private void removeIpFromHostsByIp(DefaultHost host, IpAddress ip) {
+ hostsByIp.computeIfPresent(ip, (k, v) -> removeHosts(v, host));
+ }
+
private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> {
@Override
public void event(MapEvent<HostId, DefaultHost> event) {
- Host host;
+ DefaultHost host = checkNotNull(event.value().value());
switch (event.type()) {
case INSERT:
- host = checkNotNull(event.newValue().value());
+ updateHostsByIp(host);
notifyDelegate(new HostEvent(HOST_ADDED, host));
break;
case UPDATE:
- host = checkNotNull(event.newValue().value());
- Host prevHost = checkNotNull(event.oldValue().value());
+ updateHostsByIp(host);
+ DefaultHost prevHost = checkNotNull(event.oldValue().value());
if (!Objects.equals(prevHost.location(), host.location())) {
notifyDelegate(new HostEvent(HOST_MOVED, host, prevHost));
} else if (!Objects.equals(prevHost, host)) {
@@ -302,7 +390,7 @@
}
break;
case REMOVE:
- host = checkNotNull(event.oldValue().value());
+ updateHostsByIp(host);
notifyDelegate(new HostEvent(HOST_REMOVED, host));
break;
default: