Anti-Entropy support for link store.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 7bbad93..d56ac92 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -4,9 +4,11 @@
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.SetMultimap;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.concurrent.ConcurrentUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -15,6 +17,8 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.AnnotationsUtil;
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.DefaultAnnotations;
@@ -47,18 +51,24 @@
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
 import static org.onlab.onos.net.DefaultAnnotations.union;
 import static org.onlab.onos.net.DefaultAnnotations.merge;
 import static org.onlab.onos.net.Link.Type.DIRECT;
 import static org.onlab.onos.net.Link.Type.INDIRECT;
 import static org.onlab.onos.net.link.LinkEvent.Type.*;
+import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
 import static com.google.common.base.Predicates.notNull;
@@ -110,13 +120,30 @@
         }
     };
 
+    private ScheduledExecutorService executor;
+
     @Activate
     public void activate() {
 
         clusterCommunicator.addSubscriber(
-                GossipLinkStoreMessageSubjects.LINK_UPDATE, new InternalLinkEventListener());
+                GossipLinkStoreMessageSubjects.LINK_UPDATE,
+                new InternalLinkEventListener());
         clusterCommunicator.addSubscriber(
-                GossipLinkStoreMessageSubjects.LINK_REMOVED, new InternalLinkRemovedEventListener());
+                GossipLinkStoreMessageSubjects.LINK_REMOVED,
+                new InternalLinkRemovedEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
+                new InternalLinkAntiEntropyAdvertisementListener());
+
+        executor =
+                newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+
+        // TODO: Make these configurable
+        long initialDelaySec = 5;
+        long periodSec = 5;
+        // start anti-entropy thread
+        executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                    initialDelaySec, periodSec, TimeUnit.SECONDS);
 
         log.info("Started");
     }
@@ -408,6 +435,10 @@
                 NewConcurrentHashMap.<ProviderId, Timestamped<LinkDescription>>ifNeeded());
     }
 
+    private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
+        return getLinkDescriptions(key).get(providerId);
+    }
+
     private final Function<LinkKey, Link> lookupLink = new LookupLink();
     private Function<LinkKey, Link> lookupLink() {
         return lookupLink;
@@ -448,6 +479,19 @@
         clusterCommunicator.broadcast(message);
     }
 
+    // TODO: should we be throwing exception?
+    private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
+        try {
+            ClusterMessage message = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    subject,
+                    SERIALIZER.encode(event));
+            clusterCommunicator.unicast(message, recipient);
+        } catch (IOException e) {
+            log.error("Failed to send a {} message to {}", subject.value(), recipient);
+        }
+    }
+
     private void notifyPeers(InternalLinkEvent event) throws IOException {
         broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
     }
@@ -456,6 +500,125 @@
         broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
     }
 
+    private void notifyPeer(NodeId peer, InternalLinkEvent event) {
+        unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
+    }
+
+    private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
+        unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
+    }
+
+    private final class SendAdvertisementTask implements Runnable {
+
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Interrupted, quitting");
+                return;
+            }
+
+            try {
+                final NodeId self = clusterService.getLocalNode().id();
+                Set<ControllerNode> nodes = clusterService.getNodes();
+
+                ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
+                        .transform(toNodeId())
+                        .toList();
+
+                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+                    log.info("No other peers in the cluster.");
+                    return;
+                }
+
+                NodeId peer;
+                do {
+                    int idx = RandomUtils.nextInt(0, nodeIds.size());
+                    peer = nodeIds.get(idx);
+                } while (peer.equals(self));
+
+                LinkAntiEntropyAdvertisement ad = createAdvertisement();
+
+                if (Thread.currentThread().isInterrupted()) {
+                    log.info("Interrupted, quitting");
+                    return;
+                }
+
+                try {
+                    unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
+                } catch (Exception e) {
+                    log.error("Failed to send anti-entropy advertisement", e);
+                    return;
+                }
+            } catch (Exception e) {
+                // catch all Exception to avoid Scheduled task being suppressed.
+                log.error("Exception thrown while sending advertisement", e);
+            }
+        }
+    }
+
+    private LinkAntiEntropyAdvertisement createAdvertisement() {
+        final NodeId self = clusterService.getLocalNode().id();
+
+        Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
+        Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
+
+        for (Entry<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>>
+            provs : linkDescs.entrySet()) {
+
+            final LinkKey linkKey = provs.getKey();
+            final ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDesc = provs.getValue();
+            synchronized (linkDesc) {
+                for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
+                    linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
+                }
+            }
+        }
+
+        linkTombstones.putAll(removedLinks);
+
+        return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
+    }
+
+    private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
+
+        NodeId peer = advertisement.sender();
+
+        Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
+        Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
+        for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
+            LinkFragmentId linkFragmentId = entry.getKey();
+            Timestamp peerTimestamp = entry.getValue();
+
+            LinkKey key = linkFragmentId.linkKey();
+            ProviderId providerId = linkFragmentId.providerId();
+
+            Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
+            if (linkDescription.isNewer(peerTimestamp)) {
+                // I have more recent link description. update peer.
+                notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
+            }
+            // else TODO: Peer has more recent link description. request it.
+
+            Timestamp linkRemovedTimestamp = removedLinks.get(key);
+            if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
+                // peer has a zombie link. update peer.
+                notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
+            }
+        }
+
+        for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
+            LinkKey key = entry.getKey();
+            Timestamp peerTimestamp = entry.getValue();
+
+            ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
+            if (primaryProviderId != null) {
+                if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
+                    notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
+                }
+            }
+        }
+    }
+
     private class InternalLinkEventListener implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
@@ -483,4 +646,14 @@
             notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
         }
     }
+
+    private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+            log.info("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
+            LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+            handleAntiEntropyAdvertisement(advertisement);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
index 46e7186..817efe4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
@@ -9,6 +9,10 @@
 
     private GossipLinkStoreMessageSubjects() {}
 
-    public static final MessageSubject LINK_UPDATE = new MessageSubject("peer-link-update");
-    public static final MessageSubject LINK_REMOVED = new MessageSubject("peer-link-removed");
+    public static final MessageSubject LINK_UPDATE =
+            new MessageSubject("peer-link-update");
+    public static final MessageSubject LINK_REMOVED =
+            new MessageSubject("peer-link-removed");
+    public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
+            new MessageSubject("link-enti-entropy-advertisement");
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkAntiEntropyAdvertisement.java
new file mode 100644
index 0000000..a41f9cd
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkAntiEntropyAdvertisement.java
@@ -0,0 +1,48 @@
+package org.onlab.onos.store.link.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Link AE Advertisement message.
+ */
+public class LinkAntiEntropyAdvertisement {
+
+    private final NodeId sender;
+    private final Map<LinkFragmentId, Timestamp> linkTimestamps;
+    private final Map<LinkKey, Timestamp> linkTombstones;
+
+
+    public LinkAntiEntropyAdvertisement(NodeId sender,
+                Map<LinkFragmentId, Timestamp> linkTimestamps,
+                Map<LinkKey, Timestamp> linkTombstones) {
+        this.sender = checkNotNull(sender);
+        this.linkTimestamps = checkNotNull(linkTimestamps);
+        this.linkTombstones = checkNotNull(linkTombstones);
+    }
+
+    public NodeId sender() {
+        return sender;
+    }
+
+    public Map<LinkFragmentId, Timestamp> linkTimestamps() {
+        return linkTimestamps;
+    }
+
+    public Map<LinkKey, Timestamp> linkTombstones() {
+        return linkTombstones;
+    }
+
+    // For serializer
+    @SuppressWarnings("unused")
+    private LinkAntiEntropyAdvertisement() {
+        this.sender = null;
+        this.linkTimestamps = null;
+        this.linkTombstones = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkFragmentId.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkFragmentId.java
new file mode 100644
index 0000000..f97bef6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkFragmentId.java
@@ -0,0 +1,62 @@
+package org.onlab.onos.store.link.impl;
+
+import java.util.Objects;
+
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Identifier for LinkDescription from a Provider.
+ */
+public final class LinkFragmentId {
+    public final ProviderId providerId;
+    public final LinkKey linkKey;
+
+    public LinkFragmentId(LinkKey linkKey, ProviderId providerId) {
+        this.providerId = providerId;
+        this.linkKey = linkKey;
+    }
+
+    public LinkKey linkKey() {
+        return linkKey;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(providerId, linkKey);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof LinkFragmentId)) {
+            return false;
+        }
+        LinkFragmentId that = (LinkFragmentId) obj;
+        return Objects.equals(this.linkKey, that.linkKey) &&
+               Objects.equals(this.providerId, that.providerId);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("linkKey", linkKey)
+                .toString();
+    }
+
+    // for serializer
+    @SuppressWarnings("unused")
+    private LinkFragmentId() {
+        this.providerId = null;
+        this.linkKey = null;
+    }
+}