GossipLinkStore AE bugfix + cleanup

Change-Id: If4cbaa65f980f10713488e6bf1be5d01c3131780
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index a6c1660..21e52b2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -1,7 +1,6 @@
 package org.onlab.onos.store.link.impl;
 
 import com.google.common.base.Function;
-import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
@@ -27,7 +26,6 @@
 import org.onlab.onos.net.SparseAnnotations;
 import org.onlab.onos.net.Link.Type;
 import org.onlab.onos.net.LinkKey;
-import org.onlab.onos.net.Provided;
 import org.onlab.onos.net.device.DeviceClockService;
 import org.onlab.onos.net.link.DefaultLinkDescription;
 import org.onlab.onos.net.link.LinkDescription;
@@ -70,7 +68,9 @@
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Predicates.notNull;
+import static org.onlab.onos.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
 
 /**
  * Manages inventory of infrastructure links in distributed data store
@@ -239,9 +239,9 @@
         LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
         final LinkEvent event;
         final Timestamped<LinkDescription> mergedDesc;
-        synchronized (getLinkDescriptions(key)) {
+        synchronized (getOrCreateLinkDescriptions(key)) {
             event = createOrUpdateLinkInternal(providerId, deltaDesc);
-            mergedDesc = getLinkDescriptions(key).get(providerId);
+            mergedDesc = getOrCreateLinkDescriptions(key).get(providerId);
         }
 
         if (event != null) {
@@ -265,7 +265,7 @@
 
         LinkKey key = linkKey(linkDescription.value().src(),
                               linkDescription.value().dst());
-        Map<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key);
+        Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
 
         synchronized (descs) {
             // if the link was previously removed, we should proceed if and
@@ -296,7 +296,7 @@
             ProviderId providerId,
             Timestamped<LinkDescription> linkDescription) {
 
-        // merge existing attributes and merge
+        // merge existing annotations
         Timestamped<LinkDescription> existingLinkDescription = descs.get(providerId);
         if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
             return null;
@@ -377,14 +377,54 @@
         return event;
     }
 
-    private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
-        Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions =
-                getLinkDescriptions(key);
+    private static Timestamped<LinkDescription> getPrimaryDescription(
+                Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
+
         synchronized (linkDescriptions) {
+            for (Entry<ProviderId, Timestamped<LinkDescription>>
+                    e : linkDescriptions.entrySet()) {
+
+                if (!e.getKey().isAncillary()) {
+                    return e.getValue();
+                }
+            }
+        }
+        return null;
+    }
+
+
+    // TODO: consider slicing out as Timestamp utils
+    /**
+     * Checks is timestamp is more recent than timestamped object.
+     *
+     * @param timestamp to check if this is more recent then other
+     * @param timestamped object to be tested against
+     * @return true if {@code timestamp} is more recent than {@code timestamped}
+     *         or {@code timestamped is null}
+     */
+    private static boolean isMoreRecent(Timestamp timestamp, Timestamped<?> timestamped) {
+        checkNotNull(timestamp);
+        if (timestamped == null) {
+            return true;
+        }
+        return timestamp.compareTo(timestamped.timestamp()) > 0;
+    }
+
+    private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
+        Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions
+            = getOrCreateLinkDescriptions(key);
+
+        synchronized (linkDescriptions) {
+            if (linkDescriptions.isEmpty()) {
+                // never seen such link before. keeping timestamp for record
+                removedLinks.put(key, timestamp);
+                return null;
+            }
             // accept removal request if given timestamp is newer than
             // the latest Timestamp from Primary provider
-            ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
-            if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) {
+            Timestamped<LinkDescription> prim = getPrimaryDescription(linkDescriptions);
+            if (!isMoreRecent(timestamp, prim)) {
+                // outdated remove request, ignore
                 return null;
             }
             removedLinks.put(key, timestamp);
@@ -406,12 +446,13 @@
     /**
      * @return primary ProviderID, or randomly chosen one if none exists
      */
-    private ProviderId pickPrimaryProviderId(
+    private static ProviderId pickBaseProviderId(
             Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
 
         ProviderId fallBackPrimary = null;
         for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
             if (!e.getKey().isAncillary()) {
+                // found primary
                 return e.getKey();
             } else if (fallBackPrimary == null) {
                 // pick randomly as a fallback in case there is no primary
@@ -421,9 +462,10 @@
         return fallBackPrimary;
     }
 
+    // Guarded by linkDescs value (=locking each Link)
     private Link composeLink(Map<ProviderId, Timestamped<LinkDescription>> descs) {
-        ProviderId primaryProviderId = pickPrimaryProviderId(descs);
-        Timestamped<LinkDescription> base = descs.get(primaryProviderId);
+        ProviderId baseProviderId = pickBaseProviderId(descs);
+        Timestamped<LinkDescription> base = descs.get(baseProviderId);
 
         ConnectPoint src = base.value().src();
         ConnectPoint dst = base.value().dst();
@@ -432,7 +474,7 @@
         annotations = merge(annotations, base.value().annotations());
 
         for (Entry<ProviderId, Timestamped<LinkDescription>> e : descs.entrySet()) {
-            if (primaryProviderId.equals(e.getKey())) {
+            if (baseProviderId.equals(e.getKey())) {
                 continue;
             }
 
@@ -445,10 +487,10 @@
             annotations = merge(annotations, e.getValue().value().annotations());
         }
 
-        return new DefaultLink(primaryProviderId , src, dst, type, annotations);
+        return new DefaultLink(baseProviderId, src, dst, type, annotations);
     }
 
-    private Map<ProviderId, Timestamped<LinkDescription>> getLinkDescriptions(LinkKey key) {
+    private Map<ProviderId, Timestamped<LinkDescription>> getOrCreateLinkDescriptions(LinkKey key) {
         Map<ProviderId, Timestamped<LinkDescription>> r;
         r = linkDescs.get(key);
         if (r != null) {
@@ -464,11 +506,11 @@
         }
     }
 
-    private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
-        return getLinkDescriptions(key).get(providerId);
-    }
-
     private final Function<LinkKey, Link> lookupLink = new LookupLink();
+    /**
+     * Returns a Function to lookup Link instance using LinkKey from cache.
+     * @return
+     */
     private Function<LinkKey, Link> lookupLink() {
         return lookupLink;
     }
@@ -480,26 +522,12 @@
         }
     }
 
-    private static final class IsPrimary implements Predicate<Provided> {
-
-        private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
-        public static final Predicate<Provided> isPrimary() {
-            return IS_PRIMARY;
-        }
-
-        @Override
-        public boolean apply(Provided input) {
-            return !input.providerId().isAncillary();
-        }
-    }
-
     private void notifyDelegateIfNotNull(LinkEvent event) {
         if (event != null) {
             notifyDelegate(event);
         }
     }
 
-    // TODO: should we be throwing exception?
     private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
@@ -508,17 +536,12 @@
         clusterCommunicator.broadcast(message);
     }
 
-    // TODO: should we be throwing exception?
-    private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
-        try {
-            ClusterMessage message = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    subject,
-                    SERIALIZER.encode(event));
-            clusterCommunicator.unicast(message, recipient);
-        } catch (IOException e) {
-            log.error("Failed to send a {} message to {}", subject.value(), recipient);
-        }
+    private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                SERIALIZER.encode(event));
+        clusterCommunicator.unicast(message, recipient);
     }
 
     private void notifyPeers(InternalLinkEvent event) throws IOException {
@@ -529,12 +552,22 @@
         broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
     }
 
+    // notify peer, silently ignoring error
     private void notifyPeer(NodeId peer, InternalLinkEvent event) {
-        unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
+        try {
+            unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
+        } catch (IOException e) {
+            log.debug("Failed to notify peer {} with message {}", peer, event);
+        }
     }
 
+    // notify peer, silently ignoring error
     private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
-        unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
+        try {
+            unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
+        } catch (IOException e) {
+            log.debug("Failed to notify peer {} with message {}", peer, event);
+        }
     }
 
     private final class SendAdvertisementTask implements Runnable {
@@ -573,9 +606,9 @@
                 }
 
                 try {
-                    unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
-                } catch (Exception e) {
-                    log.error("Failed to send anti-entropy advertisement", e);
+                    unicastMessage(peer, LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
+                } catch (IOException e) {
+                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
                     return;
                 }
             } catch (Exception e) {
@@ -608,42 +641,75 @@
         return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
     }
 
-    private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
+    private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement ad) {
 
-        NodeId peer = advertisement.sender();
+        final NodeId sender = ad.sender();
+        boolean localOutdated = false;
 
-        Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
-        Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
-        for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
-            LinkFragmentId linkFragmentId = entry.getKey();
-            Timestamp peerTimestamp = entry.getValue();
+        for (Entry<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>>
+                l : linkDescs.entrySet()) {
 
-            LinkKey key = linkFragmentId.linkKey();
-            ProviderId providerId = linkFragmentId.providerId();
+            final LinkKey key = l.getKey();
+            final Map<ProviderId, Timestamped<LinkDescription>> link = l.getValue();
+            synchronized (link) {
+                Timestamp localLatest = removedLinks.get(key);
 
-            Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
-            if (linkDescription.isNewer(peerTimestamp)) {
-                // I have more recent link description. update peer.
-                notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
-            }
-            // else TODO: Peer has more recent link description. request it.
+                for (Entry<ProviderId, Timestamped<LinkDescription>> p : link.entrySet()) {
+                    final ProviderId providerId = p.getKey();
+                    final Timestamped<LinkDescription> pDesc = p.getValue();
 
-            Timestamp linkRemovedTimestamp = removedLinks.get(key);
-            if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
-                // peer has a zombie link. update peer.
-                notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
+                    final LinkFragmentId fragId = new LinkFragmentId(key, providerId);
+                    // remote
+                    Timestamp remoteTimestamp = ad.linkTimestamps().get(fragId);
+                    if (remoteTimestamp == null) {
+                        remoteTimestamp = ad.linkTombstones().get(key);
+                    }
+                    if (remoteTimestamp == null ||
+                        pDesc.isNewer(remoteTimestamp)) {
+                        // I have more recent link description. update peer.
+                        notifyPeer(sender, new InternalLinkEvent(providerId, pDesc));
+                    } else {
+                        final Timestamp remoteLive = ad.linkTimestamps().get(fragId);
+                        if (remoteLive != null &&
+                            remoteLive.compareTo(pDesc.timestamp()) > 0) {
+                            // I have something outdated
+                            localOutdated = true;
+                        }
+                    }
+
+                    // search local latest along the way
+                    if (localLatest == null ||
+                        pDesc.isNewer(localLatest)) {
+                        localLatest = pDesc.timestamp();
+                    }
+                }
+                // Tests if remote remove is more recent then local latest.
+                final Timestamp remoteRemove = ad.linkTombstones().get(key);
+                if (remoteRemove != null) {
+                    if (localLatest != null &&
+                        localLatest.compareTo(remoteRemove) < 0) {
+                        // remote remove is more recent
+                        notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
+                    }
+                }
             }
         }
 
-        for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
-            LinkKey key = entry.getKey();
-            Timestamp peerTimestamp = entry.getValue();
+        // populate remove info if not known locally
+        for (Entry<LinkKey, Timestamp> remoteRm : ad.linkTombstones().entrySet()) {
+            final LinkKey key = remoteRm.getKey();
+            final Timestamp remoteRemove = remoteRm.getValue();
+            // relying on removeLinkInternal to ignore stale info
+            notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
+        }
 
-            ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
-            if (primaryProviderId != null) {
-                if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
-                    notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
-                }
+        if (localOutdated) {
+            // send back advertisement to speed up convergence
+            try {
+                unicastMessage(sender, LINK_ANTI_ENTROPY_ADVERTISEMENT,
+                                createAdvertisement());
+            } catch (IOException e) {
+                log.debug("Failed to send back active advertisement");
             }
         }
     }