GossipHostStore: allow location change + update

- Actively sync with peer on anti-entropy message
  to improve convergence speed
- Timestamp not only location
- Refresh timestamp on delta update

Might fix ONOS-436

Change-Id: I271f9af04b87d78124d055e79b93413deaf1fa3c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
index 10bf9ec..821cdc5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
@@ -15,13 +15,16 @@
  */
 package org.onosproject.store.host.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.collect.Multimaps.newSetMultimap;
+import static com.google.common.collect.Sets.newConcurrentHashSet;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
 import static org.onosproject.net.DefaultAnnotations.merge;
 import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
 import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
+import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
 import static org.onlab.util.Tools.namedThreads;
 import static org.onlab.util.Tools.minPriority;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -50,6 +53,7 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.net.Annotations;
+import org.onosproject.net.AnnotationsUtil;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DefaultHost;
@@ -61,6 +65,7 @@
 import org.onosproject.net.host.HostClockService;
 import org.onosproject.net.host.HostDescription;
 import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.host.HostEvent.Type;
 import org.onosproject.net.host.HostStore;
 import org.onosproject.net.host.HostStoreDelegate;
 import org.onosproject.net.host.PortAddresses;
@@ -109,7 +114,9 @@
     private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
 
     // Hosts tracked by their location
-    private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
+    private final Multimap<ConnectPoint, Host> locations
+        = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
+                                                 () -> newConcurrentHashSet()));
 
     private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
             Multimaps.synchronizedSetMultimap(
@@ -142,16 +149,20 @@
 
     private ScheduledExecutorService backgroundExecutor;
 
+    // TODO: Make these anti-entropy params configurable
+    private long initialDelaySec = 5;
+    private long periodSec = 5;
+
     @Activate
     public void activate() {
         clusterCommunicator.addSubscriber(
-                GossipHostStoreMessageSubjects.HOST_UPDATED,
+                HOST_UPDATED_MSG,
                 new InternalHostEventListener());
         clusterCommunicator.addSubscriber(
-                GossipHostStoreMessageSubjects.HOST_REMOVED,
+                HOST_REMOVED_MSG,
                 new InternalHostRemovedEventListener());
         clusterCommunicator.addSubscriber(
-                GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
+                HOST_ANTI_ENTROPY_ADVERTISEMENT,
                 new InternalHostAntiEntropyAdvertisementListener());
 
         executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
@@ -159,9 +170,6 @@
         backgroundExecutor =
                 newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
 
-        // TODO: Make these configurable
-        long initialDelaySec = 5;
-        long periodSec = 5;
         // start anti-entropy thread
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                     initialDelaySec, periodSec, TimeUnit.SECONDS);
@@ -209,66 +217,112 @@
 
     private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
                                         HostDescription hostDescription, Timestamp timestamp) {
+        // If this host was previously removed, first ensure
+        // this new request is "newer"
+        if (isHostRemoved(hostId, timestamp)) {
+            log.debug("Ignoring update for removed host {}@{}",
+                      hostDescription, timestamp);
+            return null;
+        }
         StoredHost host = hosts.get(hostId);
         if (host == null) {
             return createHost(providerId, hostId, hostDescription, timestamp);
         }
-        return updateHost(providerId, host, hostDescription, timestamp);
+        return updateHost(providerId, hostId, host, hostDescription, timestamp);
+    }
+
+    /**
+     * @param hostId host identifier
+     * @param timestamp timstamp to compare with
+     * @return true if given timestamp is more recent timestamp compared to
+     *         the timestamp Host was removed.
+     */
+    private boolean isHostRemoved(HostId hostId, Timestamp timestamp) {
+        Timestamped<Host> removedInfo = removedHosts.get(hostId);
+        if (removedInfo != null) {
+            if (removedInfo.isNewer(timestamp)) {
+                return true;
+            }
+            removedHosts.remove(hostId, removedInfo);
+        }
+        return false;
     }
 
     // creates a new host and sends HOST_ADDED
     private HostEvent createHost(ProviderId providerId, HostId hostId,
                                  HostDescription descr, Timestamp timestamp) {
         synchronized (this) {
-            // If this host was previously removed, first ensure
-            // this new request is "newer"
-            if (removedHosts.containsKey(hostId)) {
-                if (removedHosts.get(hostId).isNewer(timestamp)) {
-                    return null;
-                } else {
-                    removedHosts.remove(hostId);
-                }
-            }
-            StoredHost newhost = new StoredHost(providerId, hostId,
+            StoredHost newhost = new StoredHost(timestamp, providerId, hostId,
                     descr.hwAddress(),
                     descr.vlan(),
-                    new Timestamped<>(descr.location(), timestamp),
+                    descr.location(),
                     ImmutableSet.copyOf(descr.ipAddress()));
-            hosts.put(hostId, newhost);
+            StoredHost concAdd = hosts.putIfAbsent(hostId, newhost);
+            if (concAdd != null) {
+                // concurrent add detected, retry from start
+                return updateHost(providerId, hostId, concAdd, descr, timestamp);
+            }
             locations.put(descr.location(), newhost);
             return new HostEvent(HOST_ADDED, newhost);
         }
     }
 
     // checks for type of update to host, sends appropriate event
-    private HostEvent updateHost(ProviderId providerId, StoredHost host,
+    private HostEvent updateHost(ProviderId providerId, HostId hostId, StoredHost oldHost,
                                  HostDescription descr, Timestamp timestamp) {
-        HostEvent event;
-        if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
-            host.setLocation(new Timestamped<>(descr.location(), timestamp));
-            return new HostEvent(HOST_MOVED, host);
-        }
 
-        if (host.ipAddresses().containsAll(descr.ipAddress()) &&
-                descr.annotations().keys().isEmpty()) {
+        if (timestamp.compareTo(oldHost.timestamp()) < 0) {
+            // new timestamp is older
+            log.debug("Ignoring outdated host update {}@{}", descr, timestamp);
             return null;
         }
 
-        Set<IpAddress> addresses = new HashSet<>(host.ipAddresses());
-        addresses.addAll(descr.ipAddress());
-        Annotations annotations = merge((DefaultAnnotations) host.annotations(),
-                                        descr.annotations());
-        StoredHost updated = new StoredHost(providerId, host.id(),
-                                            host.mac(), host.vlan(),
-                                            host.location, addresses,
-                                            annotations);
-        event = new HostEvent(HOST_UPDATED, updated);
-        synchronized (this) {
-            hosts.put(host.id(), updated);
-            locations.remove(host.location(), host);
-            locations.put(updated.location(), updated);
+        final boolean hostMoved = !oldHost.location().equals(descr.location());
+        if (hostMoved ||
+                !oldHost.ipAddresses().containsAll(descr.ipAddress()) ||
+                !descr.annotations().keys().isEmpty()) {
+
+            Set<IpAddress> addresses = new HashSet<>(oldHost.ipAddresses());
+            addresses.addAll(descr.ipAddress());
+            Annotations annotations = merge((DefaultAnnotations) oldHost.annotations(),
+                                            descr.annotations());
+
+            Timestamp newTimestamp = timestamp;
+            // if merged Set/Annotation differ from description...
+            final boolean deltaUpdate = !descr.ipAddress().equals(addresses) ||
+                    !AnnotationsUtil.isEqual(descr.annotations(), annotations);
+            if (deltaUpdate) {
+                // ..then local existing info had something description didn't
+                newTimestamp = hostClockService.getTimestamp(hostId);
+                log.debug("delta update detected on {}, substepping timestamp to {}",
+                          hostId, newTimestamp);
+            }
+
+            StoredHost updated = new StoredHost(newTimestamp,
+                                                providerId, oldHost.id(),
+                                                oldHost.mac(), oldHost.vlan(),
+                                                descr.location(),
+                                                addresses,
+                                                annotations);
+            synchronized (this) {
+                boolean replaced = hosts.replace(hostId, oldHost, updated);
+                if (!replaced) {
+                    // concurrent update, retry
+                    return createOrUpdateHostInternal(providerId, hostId, descr, timestamp);
+                }
+                locations.remove(oldHost.location(), oldHost);
+                locations.put(updated.location(), updated);
+
+                HostEvent.Type eventType;
+                if (hostMoved) {
+                    eventType = Type.HOST_MOVED;
+                } else {
+                    eventType = Type.HOST_UPDATED;
+                }
+                return new HostEvent(eventType, updated);
+            }
         }
-        return event;
+        return null;
     }
 
     @Override
@@ -397,9 +451,8 @@
         }
     }
 
-    // Auxiliary extension to allow location to mutate.
     private static final class StoredHost extends DefaultHost {
-        private Timestamped<HostLocation> location;
+        private final Timestamp timestamp;
 
         /**
          * Creates an end-station host using the supplied information.
@@ -412,33 +465,24 @@
          * @param ips         host IP addresses
          * @param annotations optional key/value annotations
          */
-        public StoredHost(ProviderId providerId, HostId id,
-                          MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
+        public StoredHost(Timestamp timestamp, ProviderId providerId, HostId id,
+                          MacAddress mac, VlanId vlan, HostLocation location,
                           Set<IpAddress> ips, Annotations... annotations) {
-            super(providerId, id, mac, vlan, location.value(), ips, annotations);
-            this.location = location;
-        }
-
-        void setLocation(Timestamped<HostLocation> location) {
-            this.location = location;
-        }
-
-        @Override
-        public HostLocation location() {
-            return location.value();
+            super(providerId, id, mac, vlan, location, ips, annotations);
+            this.timestamp = checkNotNull(timestamp);
         }
 
         public Timestamp timestamp() {
-            return location.timestamp();
+            return timestamp;
         }
     }
 
     private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
-        broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
+        broadcastMessage(HOST_REMOVED_MSG, event);
     }
 
     private void notifyPeers(InternalHostEvent event) throws IOException {
-        broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
+        broadcastMessage(HOST_UPDATED_MSG, event);
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
@@ -556,7 +600,7 @@
                 }
 
                 try {
-                    unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
+                    unicastMessage(peer, HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
                 } catch (IOException e) {
                     log.debug("Failed to send anti-entropy advertisement to {}", peer);
                     return;
@@ -613,7 +657,7 @@
                             localHost.location(),
                             localHost.ipAddresses());
                 try {
-                    unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
+                    unicastMessage(sender, HOST_UPDATED_MSG,
                             new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
                 } catch (IOException e1) {
                     log.debug("Failed to send advertisement response", e1);
@@ -642,7 +686,7 @@
                 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
                 // sender has zombie, push
                 try {
-                    unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
+                    unicastMessage(sender, HOST_REMOVED_MSG,
                             new InternalHostRemovedEvent(hostId, localDeadTimestamp));
                 } catch (IOException e1) {
                     log.debug("Failed to send advertisement response", e1);
@@ -650,7 +694,6 @@
             }
         }
 
-
         for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
             // for each remote tombstone advertisement...
             final HostId hostId = e.getKey();
@@ -665,6 +708,19 @@
                 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
             }
         }
+
+        // if remote ad has something unknown, actively sync
+        for (HostFragmentId key : ad.timestamps().keySet()) {
+            if (!hosts.containsKey(key.hostId())) {
+                HostAntiEntropyAdvertisement myAd = createAdvertisement();
+                try {
+                    unicastMessage(sender, HOST_ANTI_ENTROPY_ADVERTISEMENT, myAd);
+                    break;
+                } catch (IOException e) {
+                    log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
+                }
+            }
+        }
     }
 
     private final class InternalHostAntiEntropyAdvertisementListener
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java
index 52dc31d..82c0859 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java
@@ -20,9 +20,9 @@
 public final class GossipHostStoreMessageSubjects {
     private GossipHostStoreMessageSubjects() {}
 
-    public static final MessageSubject HOST_UPDATED
+    public static final MessageSubject HOST_UPDATED_MSG
         = new MessageSubject("peer-host-updated");
-    public static final MessageSubject HOST_REMOVED
+    public static final MessageSubject HOST_REMOVED_MSG
         = new MessageSubject("peer-host-removed");
     public static final MessageSubject HOST_ANTI_ENTROPY_ADVERTISEMENT
         = new MessageSubject("host-enti-entropy-advertisement");;