Device Anti-Entropy

- create Advertisement
- handler for Advertisement
- register handler, background thread to send advertisement

Change-Id: I99e8a7d68747970c34b3c25c6d0489769d251446
diff --git a/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java b/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
index e1dcf9e..3967030 100644
--- a/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
+++ b/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
@@ -4,6 +4,8 @@
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.SparseAnnotations;
 
+import com.google.common.base.MoreObjects;
+
 /**
  * Default implementation of immutable port description.
  */
@@ -48,6 +50,15 @@
         return isEnabled;
     }
 
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("number", number)
+                .add("isEnabled", isEnabled)
+                .add("annotations", annotations())
+                .toString();
+    }
+
     // default constructor for serialization
     private DefaultPortDescription() {
         this.number = null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
deleted file mode 100644
index 132f27a..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.onlab.onos.store.common.impl;
-
-import java.util.Map;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.Timestamp;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Anti-Entropy advertisement message.
- * <p>
- * Message to advertise the information this node holds.
- *
- * @param <ID> ID type
- */
-public class AntiEntropyAdvertisement<ID> {
-
-    private final NodeId sender;
-    private final ImmutableMap<ID, Timestamp> advertisement;
-
-    /**
-     * Creates anti-entropy advertisement message.
-     *
-     * @param sender sender of this message
-     * @param advertisement timestamp information of the data sender holds
-     */
-    public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
-        this.sender = sender;
-        this.advertisement = ImmutableMap.copyOf(advertisement);
-    }
-
-    public NodeId sender() {
-        return sender;
-    }
-
-    public ImmutableMap<ID, Timestamp> advertisement() {
-        return advertisement;
-    }
-
-    // Default constructor for serializer
-    protected AntiEntropyAdvertisement() {
-        this.sender = null;
-        this.advertisement = null;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
deleted file mode 100644
index 033a1de..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.onlab.onos.store.common.impl;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.device.impl.VersionedValue;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-/**
- * Anti-Entropy reply message.
- * <p>
- * Message to send in reply to advertisement or another reply.
- * Suggest to the sender about the more up-to-date data this node has,
- * and request for more recent data that the receiver has.
- */
-public class AntiEntropyReply<ID, V extends VersionedValue<?>> {
-
-    private final NodeId sender;
-    private final ImmutableMap<ID, V> suggestion;
-    private final ImmutableSet<ID> request;
-
-    /**
-     * Creates a reply to anti-entropy message.
-     *
-     * @param sender sender of this message
-     * @param suggestion collection of more recent values, sender had
-     * @param request Collection of identifiers
-     */
-    public AntiEntropyReply(NodeId sender,
-                            Map<ID, V> suggestion,
-                            Set<ID> request) {
-        this.sender = sender;
-        this.suggestion = ImmutableMap.copyOf(suggestion);
-        this.request = ImmutableSet.copyOf(request);
-    }
-
-    public NodeId sender() {
-        return sender;
-    }
-
-    /**
-     * Returns collection of values, which the recipient of this reply is likely
-     * to be missing or has outdated version.
-     *
-     * @return
-     */
-    public ImmutableMap<ID, V> suggestion() {
-        return suggestion;
-    }
-
-    /**
-     * Returns collection of identifier to request.
-     *
-     * @return collection of identifier to request
-     */
-    public ImmutableSet<ID> request() {
-        return request;
-    }
-
-    /**
-     * Checks if reply contains any suggestion or request.
-     *
-     * @return true if nothing is suggested and requested
-     */
-    public boolean isEmpty() {
-        return suggestion.isEmpty() && request.isEmpty();
-    }
-
-    // Default constructor for serializer
-    protected AntiEntropyReply() {
-        this.sender = null;
-        this.suggestion = null;
-        this.request = null;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/ControllerNodeToNodeId.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/ControllerNodeToNodeId.java
new file mode 100644
index 0000000..73187ee
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/ControllerNodeToNodeId.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.common.impl;
+
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
+
+import com.google.common.base.Function;
+
+/**
+ * Function to convert ControllerNode to NodeId.
+ */
+public final class ControllerNodeToNodeId
+    implements Function<ControllerNode, NodeId> {
+
+    private static final ControllerNodeToNodeId INSTANCE = new ControllerNodeToNodeId();
+
+    @Override
+    public NodeId apply(ControllerNode input) {
+        return input.id();
+    }
+
+    public static ControllerNodeToNodeId toNodeId() {
+        return INSTANCE;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
index 77b0a87..e803e74 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
@@ -30,6 +30,7 @@
 
     /**
      * Returns the value.
+     *
      * @return value
      */
     public T value() {
@@ -38,6 +39,7 @@
 
     /**
      * Returns the time stamp.
+     *
      * @return time stamp
      */
     public Timestamp timestamp() {
@@ -51,7 +53,16 @@
      * @return true if this instance is newer.
      */
     public boolean isNewer(Timestamped<T> other) {
-        return this.timestamp.compareTo(checkNotNull(other).timestamp()) > 0;
+        return isNewer(checkNotNull(other).timestamp());
+    }
+
+    /**
+     * Tests if this timestamp is newer thatn the specified timestamp.
+     * @param timestamp to compare agains
+     * @return true if this instance is newer
+     */
+    public boolean isNewer(Timestamp timestamp) {
+        return this.timestamp.compareTo(checkNotNull(timestamp)) > 0;
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
deleted file mode 100644
index d05659b..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
-
-// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
-// TODO: Handle Port as part of these messages, or separate messages for Ports?
-
-public class DeviceAntiEntropyAdvertisement
-    extends AntiEntropyAdvertisement<DeviceId> {
-
-
-    public DeviceAntiEntropyAdvertisement(NodeId sender,
-            Map<DeviceId, Timestamp> advertisement) {
-        super(sender, advertisement);
-    }
-
-    // May need to add ProviderID, etc.
-    public static DeviceAntiEntropyAdvertisement create(
-            NodeId self,
-            Collection<VersionedValue<Device>> localValues) {
-
-        Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
-        for (VersionedValue<Device> e : localValues) {
-            ads.put(e.entity().id(), e.timestamp());
-        }
-        return new DeviceAntiEntropyAdvertisement(self, ads);
-    }
-
-    // For serializer
-    protected DeviceAntiEntropyAdvertisement() {}
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
deleted file mode 100644
index e7a4d0a..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.common.impl.AntiEntropyReply;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-public class DeviceAntiEntropyReply
-    extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
-
-
-    public DeviceAntiEntropyReply(NodeId sender,
-            Map<DeviceId, VersionedValue<Device>> suggestion,
-            Set<DeviceId> request) {
-        super(sender, suggestion, request);
-    }
-
-    /**
-     * Creates a reply to Anti-Entropy advertisement.
-     *
-     * @param advertisement to respond to
-     * @param self node identifier representing local node
-     * @param localValues local values held on this node
-     * @return reply message
-     */
-    public static DeviceAntiEntropyReply reply(
-            DeviceAntiEntropyAdvertisement advertisement,
-            NodeId self,
-            Collection<VersionedValue<Device>> localValues
-            ) {
-
-        ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
-
-        ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
-            sug = ImmutableMap.builder();
-
-        Set<DeviceId> req = new HashSet<>(ads.keySet());
-
-        for (VersionedValue<Device> e : localValues) {
-            final DeviceId id = e.entity().id();
-            final Timestamp local = e.timestamp();
-            final Timestamp theirs = ads.get(id);
-            if (theirs == null) {
-                // they don't have it, suggest
-                sug.put(id, e);
-                // don't need theirs
-                req.remove(id);
-            } else if (local.compareTo(theirs) < 0) {
-                // they got older one, suggest
-                sug.put(id, e);
-                // don't need theirs
-                req.remove(id);
-            } else if (local.equals(theirs)) {
-                // same, don't need theirs
-                req.remove(id);
-            }
-        }
-
-        return new DeviceAntiEntropyReply(self, sug.build(), req);
-    }
-
-    /**
-     * Creates a reply to request for values held locally.
-     *
-     * @param requests message containing the request
-     * @param self node identifier representing local node
-     * @param localValues local valeds held on this node
-     * @return reply message
-     */
-    public static DeviceAntiEntropyReply reply(
-            DeviceAntiEntropyReply requests,
-            NodeId self,
-            Map<DeviceId, VersionedValue<Device>> localValues
-            ) {
-
-        Set<DeviceId> reqs = requests.request();
-
-        Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
-        for (DeviceId id : reqs) {
-            final VersionedValue<Device> value = localValues.get(id);
-            if (value != null) {
-                requested.put(id, value);
-            }
-        }
-
-        Set<DeviceId> empty = ImmutableSet.of();
-        return new DeviceAntiEntropyReply(self, requested, empty);
-    }
-
-    // For serializer
-    protected DeviceAntiEntropyReply() {}
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
new file mode 100644
index 0000000..f7fd7bc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
@@ -0,0 +1,91 @@
+package org.onlab.onos.store.device.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.onos.net.DefaultAnnotations.union;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/*
+ * Collection of Description of a Device and Ports, given from a Provider.
+ */
+class DeviceDescriptions {
+
+    private volatile Timestamped<DeviceDescription> deviceDesc;
+
+    private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
+
+    public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
+        this.deviceDesc = checkNotNull(desc);
+        this.portDescs = new ConcurrentHashMap<>();
+    }
+
+    public Timestamp getLatestTimestamp() {
+        Timestamp latest = deviceDesc.timestamp();
+        for (Timestamped<PortDescription> desc : portDescs.values()) {
+            if (desc.timestamp().compareTo(latest) > 0) {
+                latest = desc.timestamp();
+            }
+        }
+        return latest;
+    }
+
+    public Timestamped<DeviceDescription> getDeviceDesc() {
+        return deviceDesc;
+    }
+
+    public Timestamped<PortDescription> getPortDesc(PortNumber number) {
+        return portDescs.get(number);
+    }
+
+    public Map<PortNumber, Timestamped<PortDescription>> getPortDescs() {
+        return Collections.unmodifiableMap(portDescs);
+    }
+
+    /**
+     * Puts DeviceDescription, merging annotations as necessary.
+     *
+     * @param newDesc new DeviceDescription
+     */
+    public synchronized void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+        Timestamped<DeviceDescription> oldOne = deviceDesc;
+        Timestamped<DeviceDescription> newOne = newDesc;
+        if (oldOne != null) {
+            SparseAnnotations merged = union(oldOne.value().annotations(),
+                                             newDesc.value().annotations());
+            newOne = new Timestamped<DeviceDescription>(
+                    new DefaultDeviceDescription(newDesc.value(), merged),
+                    newDesc.timestamp());
+        }
+        deviceDesc = newOne;
+    }
+
+    /**
+     * Puts PortDescription, merging annotations as necessary.
+     *
+     * @param newDesc new PortDescription
+     */
+    public synchronized void putPortDesc(Timestamped<PortDescription> newDesc) {
+        Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
+        Timestamped<PortDescription> newOne = newDesc;
+        if (oldOne != null) {
+            SparseAnnotations merged = union(oldOne.value().annotations(),
+                                             newDesc.value().annotations());
+            newOne = new Timestamped<PortDescription>(
+                    new DefaultPortDescription(newDesc.value(), merged),
+                    newDesc.timestamp());
+        }
+        portDescs.put(newOne.value().portNumber(), newOne);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 2221ea0..5319749 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -5,8 +5,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -14,6 +13,8 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.AnnotationsUtil;
 import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DefaultDevice;
@@ -23,9 +24,6 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Port;
 import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.SparseAnnotations;
-import org.onlab.onos.net.device.DefaultDeviceDescription;
-import org.onlab.onos.net.device.DefaultPortDescription;
 import org.onlab.onos.net.device.DeviceDescription;
 import org.onlab.onos.net.device.DeviceEvent;
 import org.onlab.onos.net.device.DeviceStore;
@@ -38,18 +36,22 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.common.impl.Timestamped;
-import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.device.impl.peermsg.DeviceAntiEntropyAdvertisement;
+import org.onlab.onos.store.device.impl.peermsg.DeviceFragmentId;
+import org.onlab.onos.store.device.impl.peermsg.PortFragmentId;
 import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
 import org.onlab.util.KryoPool;
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -57,19 +59,21 @@
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Predicates.notNull;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 import static org.onlab.onos.net.DefaultAnnotations.merge;
-import static org.onlab.onos.net.DefaultAnnotations.union;
 import static com.google.common.base.Verify.verify;
+import static org.onlab.util.Tools.namedThreads;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
+import static org.onlab.onos.store.common.impl.ControllerNodeToNodeId.toNodeId;
 
 // TODO: give me a better name
 /**
@@ -86,8 +90,9 @@
 
     public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
 
-    // TODO: Check if inner Map can be replaced with plain Map
+    // TODO: Check if inner Map can be replaced with plain Map.
     // innerMap is used to lock a Device, thus instance should never be replaced.
+
     // collection of Description given from various providers
     private final ConcurrentMap<DeviceId,
                             ConcurrentMap<ProviderId, DeviceDescriptions>>
@@ -117,21 +122,23 @@
         @Override
         protected void setupKryoPool() {
             serializerPool = KryoPool.newBuilder()
-                    .register(KryoPoolUtil.API)
+                    .register(DistributedStoreSerializers.COMMON)
+
                     .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
                     .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
                     .register(InternalDeviceRemovedEvent.class)
                     .register(InternalPortEvent.class, new InternalPortEventSerializer())
                     .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
-                    .register(Timestamp.class)
-                    .register(Timestamped.class)
-                    .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+                    .register(DeviceAntiEntropyAdvertisement.class)
+                    .register(DeviceFragmentId.class)
+                    .register(PortFragmentId.class)
                     .build()
                     .populate(1);
         }
-
     };
 
+    private ScheduledExecutorService executor;
+
     @Activate
     public void activate() {
         clusterCommunicator.addSubscriber(
@@ -144,11 +151,35 @@
                 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+
+        executor =
+                newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
+
+        // TODO: Make these configurable
+        long initialDelaySec = 5;
+        long periodSec = 5;
+        // start anti-entropy thread
+        executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                    initialDelaySec, periodSec, TimeUnit.SECONDS);
+
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+
+        executor.shutdownNow();
+        try {
+            boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
+            if (timedout) {
+                log.error("Timeout during executor shutdown");
+            }
+        } catch (InterruptedException e) {
+            log.error("Error during executor shutdown", e);
+        }
+
         deviceDescs.clear();
         devices.clear();
         devicePorts.clear();
@@ -543,14 +574,14 @@
 
             final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
             if (existingPortDesc == null ||
-                deltaDesc == existingPortDesc ||
                 deltaDesc.isNewer(existingPortDesc)) {
                 // on new port or valid update
                 // update description
                 descs.putPortDesc(deltaDesc);
                 newPort = composePort(device, number, descsMap);
             } else {
-                // outdated event, ignored.
+                // same or outdated event, ignored.
+                log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
                 return null;
             }
 
@@ -627,6 +658,14 @@
         }
     }
 
+    /**
+     * Checks if given timestamp is superseded by removal request
+     * with more recent timestamp.
+     *
+     * @param deviceId identifier of a device
+     * @param timestampToCheck timestamp of an event to check
+     * @return true if device is already removed
+     */
     private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
         Timestamp removalTimestamp = removalRequest.get(deviceId);
         if (removalTimestamp != null &&
@@ -667,7 +706,7 @@
                 continue;
             }
             // TODO: should keep track of Description timestamp
-            // and only merge conflicting keys when timestamp is newer
+            // and only merge conflicting keys when timestamp is newer.
             // Currently assuming there will never be a key conflict between
             // providers
 
@@ -708,7 +747,7 @@
                 continue;
             }
             // TODO: should keep track of Description timestamp
-            // and only merge conflicting keys when timestamp is newer
+            // and only merge conflicting keys when timestamp is newer.
             // Currently assuming there will never be a key conflict between
             // providers
 
@@ -745,129 +784,258 @@
         return providerDescs.get(pid);
     }
 
-    public static final class InitDeviceDescs
-        implements ConcurrentInitializer<DeviceDescriptions> {
-
-        private final Timestamped<DeviceDescription> deviceDesc;
-
-        public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
-            this.deviceDesc = checkNotNull(deviceDesc);
-        }
-        @Override
-        public DeviceDescriptions get() throws ConcurrentException {
-            return new DeviceDescriptions(deviceDesc);
-        }
+    // TODO: should we be throwing exception?
+    private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                SERIALIZER.encode(event));
+        clusterCommunicator.unicast(message, recipient);
     }
 
-
-    /**
-     * Collection of Description of a Device and it's Ports given from a Provider.
-     */
-    public static class DeviceDescriptions {
-
-        private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
-        private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
-
-        public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
-            this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
-            this.portDescs = new ConcurrentHashMap<>();
-        }
-
-        Timestamp getLatestTimestamp() {
-            Timestamp latest = deviceDesc.get().timestamp();
-            for (Timestamped<PortDescription> desc : portDescs.values()) {
-                if (desc.timestamp().compareTo(latest) > 0) {
-                    latest = desc.timestamp();
-                }
-            }
-            return latest;
-        }
-
-        public Timestamped<DeviceDescription> getDeviceDesc() {
-            return deviceDesc.get();
-        }
-
-        public Timestamped<PortDescription> getPortDesc(PortNumber number) {
-            return portDescs.get(number);
-        }
-
-        /**
-         * Puts DeviceDescription, merging annotations as necessary.
-         *
-         * @param newDesc new DeviceDescription
-         * @return previous DeviceDescription
-         */
-        public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
-            Timestamped<DeviceDescription> oldOne = deviceDesc.get();
-            Timestamped<DeviceDescription> newOne = newDesc;
-            if (oldOne != null) {
-                SparseAnnotations merged = union(oldOne.value().annotations(),
-                                                 newDesc.value().annotations());
-                newOne = new Timestamped<DeviceDescription>(
-                        new DefaultDeviceDescription(newDesc.value(), merged),
-                        newDesc.timestamp());
-            }
-            return deviceDesc.getAndSet(newOne);
-        }
-
-        /**
-         * Puts PortDescription, merging annotations as necessary.
-         *
-         * @param newDesc new PortDescription
-         * @return previous PortDescription
-         */
-        public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
-            Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
-            Timestamped<PortDescription> newOne = newDesc;
-            if (oldOne != null) {
-                SparseAnnotations merged = union(oldOne.value().annotations(),
-                                                 newDesc.value().annotations());
-                newOne = new Timestamped<PortDescription>(
-                        new DefaultPortDescription(newDesc.value(), merged),
-                        newDesc.timestamp());
-            }
-            return portDescs.put(newOne.value().portNumber(), newOne);
-        }
+    // TODO: should we be throwing exception?
+    private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
     }
 
     private void notifyPeers(InternalDeviceEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
     }
 
     private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
     }
 
     private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
     }
 
     private void notifyPeers(InternalPortEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.PORT_UPDATE,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
     }
 
     private void notifyPeers(InternalPortStatusEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+    }
+
+    private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private void notifyPeer(NodeId recipient, InternalPortEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private DeviceAntiEntropyAdvertisement createAdvertisement() {
+        final NodeId self = clusterService.getLocalNode().id();
+
+        Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size());
+        final int portsPerDevice = 8; // random guess to minimize reallocation
+        Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice);
+        Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size());
+
+        for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>>
+            provs : deviceDescs.entrySet()) {
+
+            final DeviceId deviceId = provs.getKey();
+            final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
+            synchronized (devDescs) {
+
+                offline.put(deviceId, this.offline.get(deviceId));
+
+                for (Entry<ProviderId, DeviceDescriptions>
+                        prov : devDescs.entrySet()) {
+
+                    final ProviderId provId = prov.getKey();
+                    final DeviceDescriptions descs = prov.getValue();
+
+                    devices.put(new DeviceFragmentId(deviceId, provId),
+                            descs.getDeviceDesc().timestamp());
+
+                    for (Entry<PortNumber, Timestamped<PortDescription>>
+                            portDesc : descs.getPortDescs().entrySet()) {
+
+                        final PortNumber number = portDesc.getKey();
+                        ports.put(new PortFragmentId(deviceId, provId, number),
+                                portDesc.getValue().timestamp());
+                    }
+                }
+            }
+        }
+
+        return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline);
+    }
+
+    /**
+     * Responds to anti-entropy advertisement message.
+     * <P>
+     * Notify sender about out-dated information using regular replication message.
+     * Send back advertisement to sender if not in sync.
+     *
+     * @param advertisement to respond to
+     */
+    private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
+
+        final NodeId sender = advertisement.sender();
+
+        Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
+        Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
+        Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
+
+        // Fragments to request
+        Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
+        Collection<PortFragmentId> reqPorts = new ArrayList<>();
+
+        for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
+            final DeviceId deviceId = de.getKey();
+            final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
+
+            synchronized (lDevice) {
+                // latestTimestamp across provider
+                // Note: can be null initially
+                Timestamp localLatest = offline.get(deviceId);
+
+                // handle device Ads
+                for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
+                    final ProviderId provId = prov.getKey();
+                    final DeviceDescriptions lDeviceDescs = prov.getValue();
+
+                    final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
+
+
+                    Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
+                    Timestamp advDevTimestamp = devAds.get(devFragId);
+
+                    if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
+                        // remote does not have it or outdated, suggest
+                        notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
+                    } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
+                        // local is outdated, request
+                        reqDevices.add(devFragId);
+                    }
+
+                    // handle port Ads
+                    for (Entry<PortNumber, Timestamped<PortDescription>>
+                            pe : lDeviceDescs.getPortDescs().entrySet()) {
+
+                        final PortNumber num = pe.getKey();
+                        final Timestamped<PortDescription> lPort = pe.getValue();
+
+                        final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
+
+                        Timestamp advPortTimestamp = portAds.get(portFragId);
+                        if ( advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
+                            // remote does not have it or outdated, suggest
+                            notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
+                        } else if (!lPort.timestamp().equals(advPortTimestamp)) {
+                            // local is outdated, request
+                            log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
+                            reqPorts.add(portFragId);
+                        }
+
+                        // remove port Ad already processed
+                        portAds.remove(portFragId);
+                    } // end local port loop
+
+                    // remove device Ad already processed
+                    devAds.remove(devFragId);
+
+                    // find latest and update
+                    final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
+                    if (localLatest == null ||
+                        providerLatest.compareTo(localLatest) > 0) {
+                        localLatest = providerLatest;
+                    }
+                } // end local provider loop
+
+                // checking if remote timestamp is more recent.
+                Timestamp rOffline = offlineAds.get(deviceId);
+                if (rOffline != null &&
+                    rOffline.compareTo(localLatest) > 0) {
+                    // remote offline timestamp suggests that the
+                    // device is off-line
+                    markOfflineInternal(deviceId, rOffline);
+                }
+
+                Timestamp lOffline = offline.get(deviceId);
+                if (lOffline != null && rOffline == null) {
+                    // locally offline, but remote is online, suggest offline
+                    notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
+                }
+
+                // remove device offline Ad already processed
+                offlineAds.remove(deviceId);
+            } // end local device loop
+        } // device lock
+
+        // If there is any Ads left, request them
+        log.trace("Ads left {}, {}", devAds, portAds);
+        reqDevices.addAll(devAds.keySet());
+        reqPorts.addAll(portAds.keySet());
+
+        if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
+            log.trace("Nothing to request to remote peer {}", sender);
+            return;
+        }
+
+        log.info("Need to sync {} {}", reqDevices, reqPorts);
+
+        // 2-way Anti-Entropy for now
+        try {
+            unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
+        } catch (IOException e) {
+            log.error("Failed to send response advertisement to " + sender, e);
+        }
+
+// Sketch of 3-way Anti-Entropy
+//        DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
+//        ClusterMessage message = new ClusterMessage(
+//                clusterService.getLocalNode().id(),
+//                GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
+//                SERIALIZER.encode(request));
+//
+//        try {
+//            clusterCommunicator.unicast(message, advertisement.sender());
+//        } catch (IOException e) {
+//            log.error("Failed to send advertisement reply to "
+//                      + advertisement.sender(), e);
+//        }
     }
 
     private void notifyDelegateIfNotNull(DeviceEvent event) {
@@ -876,6 +1044,54 @@
         }
     }
 
+    private final class SendAdvertisementTask implements Runnable {
+
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Interrupted, quitting");
+                return;
+            }
+
+            try {
+                final NodeId self = clusterService.getLocalNode().id();
+                Set<ControllerNode> nodes = clusterService.getNodes();
+
+                ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
+                        .transform(toNodeId())
+                        .toList();
+
+                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+                    log.info("No other peers in the cluster.");
+                    return;
+                }
+
+                NodeId peer;
+                do {
+                    int idx = RandomUtils.nextInt(0, nodeIds.size());
+                    peer = nodeIds.get(idx);
+                } while (peer.equals(self));
+
+                DeviceAntiEntropyAdvertisement ad = createAdvertisement();
+
+                if (Thread.currentThread().isInterrupted()) {
+                    log.info("Interrupted, quitting");
+                    return;
+                }
+
+                try {
+                    unicastMessage(peer, DEVICE_ADVERTISE, ad);
+                } catch (IOException e) {
+                    log.error("Failed to send anti-entropy advertisement", e);
+                    return;
+                }
+            } catch (Exception e) {
+                // catch all Exception to avoid Scheduled task being suppressed.
+                log.error("Exception thrown while sending advertisement", e);
+            }
+        }
+    }
+
     private class InternalDeviceEventListener implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
@@ -940,6 +1156,7 @@
 
             log.info("Received port status update event from peer: {}", message.sender());
             InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+            log.info("{}", event);
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -948,4 +1165,15 @@
             notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
         }
     }
+
+    private final class InternalDeviceAdvertisementListener
+        implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+            log.info("Received Device advertisement from peer: {}", message.sender());
+            DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+            handleAdvertisement(advertisement);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
index 5272182..3168368 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -2,6 +2,7 @@
 
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 
+// TODO: add prefix to assure uniqueness.
 /**
  * MessageSubjects used by GossipDeviceStore peer-peer communication.
  */
@@ -14,4 +15,8 @@
     public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
     public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
     public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
+
+    public static final MessageSubject DEVICE_ADVERTISE = new MessageSubject("peer-device-advertisements");
+    // to be used with 3-way anti-entropy process
+    public static final MessageSubject DEVICE_REQUEST = new MessageSubject("peer-device-request");
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InitDeviceDescs.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InitDeviceDescs.java
new file mode 100644
index 0000000..2de2364
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InitDeviceDescs.java
@@ -0,0 +1,23 @@
+package org.onlab.onos.store.device.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+// FIXME: consider removing this class
+public final class InitDeviceDescs
+    implements ConcurrentInitializer<DeviceDescriptions> {
+
+    private final Timestamped<DeviceDescription> deviceDesc;
+
+    public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
+        this.deviceDesc = checkNotNull(deviceDesc);
+    }
+    @Override
+    public DeviceDescriptions get() throws ConcurrentException {
+        return new DeviceDescriptions(deviceDesc);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
index 4214384..344fe73 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -5,6 +5,8 @@
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.common.impl.Timestamped;
 
+import com.google.common.base.MoreObjects;
+
 /**
  * Information published by GossipDeviceStore to notify peers of a device
  * change event.
@@ -36,6 +38,15 @@
         return deviceDescription;
     }
 
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .add("deviceDescription", deviceDescription)
+                .toString();
+    }
+
     // for serializer
     protected InternalDeviceEvent() {
         this.providerId = null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
index d8942d6..4540efb 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
@@ -3,6 +3,8 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.store.Timestamp;
 
+import com.google.common.base.MoreObjects;
+
 /**
  * Information published by GossipDeviceStore to notify peers of a device
  * going offline.
@@ -30,6 +32,14 @@
         return timestamp;
     }
 
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("deviceId", deviceId)
+                .add("timestamp", timestamp)
+                .toString();
+    }
+
     // for serializer
     @SuppressWarnings("unused")
     private InternalDeviceOfflineEvent() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
index 6c8b905..42cb177 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
@@ -3,6 +3,8 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.store.Timestamp;
 
+import com.google.common.base.MoreObjects;
+
 /**
  * Information published by GossipDeviceStore to notify peers of a device
  * being administratively removed.
@@ -30,6 +32,14 @@
         return timestamp;
     }
 
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("deviceId", deviceId)
+                .add("timestamp", timestamp)
+                .toString();
+    }
+
     // for serializer
     @SuppressWarnings("unused")
     private InternalDeviceRemovedEvent() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
index 64e77ca..d1fc73a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -7,6 +7,8 @@
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.common.impl.Timestamped;
 
+import com.google.common.base.MoreObjects;
+
 /**
  * Information published by GossipDeviceStore to notify peers of a port
  * change event.
@@ -38,6 +40,15 @@
         return portDescriptions;
     }
 
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .add("portDescriptions", portDescriptions)
+                .toString();
+    }
+
     // for serializer
     protected InternalPortEvent() {
         this.providerId = null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
index 7d3854b..fd154da 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -5,6 +5,8 @@
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.common.impl.Timestamped;
 
+import com.google.common.base.MoreObjects;
+
 /**
  * Information published by GossipDeviceStore to notify peers of a port
  * status change event.
@@ -36,6 +38,15 @@
         return portDescription;
     }
 
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .add("portDescription", portDescription)
+                .toString();
+    }
+
     // for serializer
     protected InternalPortStatusEvent() {
         this.providerId = null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
index 6ec4122..8f0c2b0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
@@ -35,6 +35,7 @@
                                Class<InternalPortStatusEvent> type) {
         ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
         DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+        @SuppressWarnings("unchecked")
         Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
 
         return new InternalPortStatusEvent(providerId, deviceId, portDescription);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyAdvertisement.java
new file mode 100644
index 0000000..00873ad
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyAdvertisement.java
@@ -0,0 +1,57 @@
+package org.onlab.onos.store.device.impl.peermsg;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+
+/**
+ * Device Advertisement message.
+ */
+public class DeviceAntiEntropyAdvertisement {
+
+    private final NodeId sender;
+    private final Map<DeviceFragmentId, Timestamp> deviceFingerPrints;
+    private final Map<PortFragmentId, Timestamp> portFingerPrints;
+    private final Map<DeviceId, Timestamp> offline;
+
+
+    public DeviceAntiEntropyAdvertisement(NodeId sender,
+                Map<DeviceFragmentId, Timestamp> devices,
+                Map<PortFragmentId, Timestamp> ports,
+                Map<DeviceId, Timestamp> offline) {
+        this.sender = checkNotNull(sender);
+        this.deviceFingerPrints = checkNotNull(devices);
+        this.portFingerPrints = checkNotNull(ports);
+        this.offline = checkNotNull(offline);
+    }
+
+    public NodeId sender() {
+        return sender;
+    }
+
+    public Map<DeviceFragmentId, Timestamp> deviceFingerPrints() {
+        return deviceFingerPrints;
+    }
+
+    public Map<PortFragmentId, Timestamp> ports() {
+        return portFingerPrints;
+    }
+
+    public Map<DeviceId, Timestamp> offline() {
+        return offline;
+    }
+
+    // For serializer
+    @SuppressWarnings("unused")
+    private DeviceAntiEntropyAdvertisement() {
+        this.sender = null;
+        this.deviceFingerPrints = null;
+        this.portFingerPrints = null;
+        this.offline = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyRequest.java
new file mode 100644
index 0000000..6f3096b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyRequest.java
@@ -0,0 +1,46 @@
+package org.onlab.onos.store.device.impl.peermsg;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+
+import org.onlab.onos.cluster.NodeId;
+
+/**
+ * Message to request for other peers information.
+ */
+public class DeviceAntiEntropyRequest {
+
+    private final NodeId sender;
+    private final Collection<DeviceFragmentId> devices;
+    private final Collection<PortFragmentId> ports;
+
+    public DeviceAntiEntropyRequest(NodeId sender,
+                                   Collection<DeviceFragmentId> devices,
+                                   Collection<PortFragmentId> ports) {
+
+        this.sender = checkNotNull(sender);
+        this.devices = checkNotNull(devices);
+        this.ports = checkNotNull(ports);
+    }
+
+    public NodeId sender() {
+        return sender;
+    }
+
+    public Collection<DeviceFragmentId> devices() {
+        return devices;
+    }
+
+    public Collection<PortFragmentId> ports() {
+        return ports;
+    }
+
+    // For serializer
+    @SuppressWarnings("unused")
+    private DeviceAntiEntropyRequest() {
+        this.sender = null;
+        this.devices = null;
+        this.ports = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceFragmentId.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceFragmentId.java
new file mode 100644
index 0000000..d4fcda9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceFragmentId.java
@@ -0,0 +1,54 @@
+package org.onlab.onos.store.device.impl.peermsg;
+
+import java.util.Objects;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Identifier for DeviceDesctiption from a Provider.
+ */
+public final class DeviceFragmentId {
+    public final ProviderId providerId;
+    public final DeviceId deviceId;
+
+    public DeviceFragmentId(DeviceId deviceId, ProviderId providerId) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(providerId, deviceId);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof DeviceFragmentId)) {
+            return false;
+        }
+        DeviceFragmentId that = (DeviceFragmentId) obj;
+        return Objects.equals(this.deviceId, that.deviceId) &&
+               Objects.equals(this.providerId, that.providerId);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .toString();
+    }
+
+    // for serializer
+    @SuppressWarnings("unused")
+    private DeviceFragmentId() {
+        this.providerId = null;
+        this.deviceId = null;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/PortFragmentId.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/PortFragmentId.java
new file mode 100644
index 0000000..8e7bac3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/PortFragmentId.java
@@ -0,0 +1,61 @@
+package org.onlab.onos.store.device.impl.peermsg;
+
+import java.util.Objects;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Identifier for PortDescription from a Provider.
+ */
+public final class PortFragmentId {
+    public final ProviderId providerId;
+    public final DeviceId deviceId;
+    public final PortNumber portNumber;
+
+    public PortFragmentId(DeviceId deviceId, ProviderId providerId,
+                          PortNumber portNumber) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portNumber = portNumber;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(providerId, deviceId, portNumber);
+    };
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof PortFragmentId)) {
+            return false;
+        }
+        PortFragmentId that = (PortFragmentId) obj;
+        return Objects.equals(this.deviceId, that.deviceId) &&
+               Objects.equals(this.portNumber, that.portNumber) &&
+               Objects.equals(this.providerId, that.providerId);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .add("portNumber", portNumber)
+                .toString();
+    }
+
+    // for serializer
+    @SuppressWarnings("unused")
+    private PortFragmentId() {
+        this.providerId = null;
+        this.deviceId = null;
+        this.portNumber = null;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/package-info.java
new file mode 100644
index 0000000..5d9dc4b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Structure and utilities used for inter-Node messaging.
+ */
+package org.onlab.onos.store.device.impl.peermsg;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
index a59b151..63245a3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
@@ -31,7 +31,6 @@
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.ClockService;
 import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.device.impl.VersionedValue;
 import org.slf4j.Logger;
 
 import com.google.common.collect.HashMultimap;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/VersionedValue.java
similarity index 95%
rename from core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/link/impl/VersionedValue.java
index a0f485a..391cd1d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/VersionedValue.java
@@ -1,9 +1,10 @@
-package org.onlab.onos.store.device.impl;
+package org.onlab.onos.store.link.impl;
 
 import java.util.Objects;
 
 import org.onlab.onos.store.Timestamp;
 
+// TODO: remove once we stop using this
 /**
  * Wrapper class for a entity that is versioned
  * and can either be up or down.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
index c0cefd6..e20b4a6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -35,4 +35,4 @@
         byte[] payload = input.readBytes(payloadSize);
         return new ClusterMessage(sender, subject, payload);
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java
new file mode 100644
index 0000000..e0ba906
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.util.KryoPool;
+
+public final class DistributedStoreSerializers {
+
+    /**
+     * KryoPool which can serialize ON.lab misc classes.
+     */
+    public static final KryoPool COMMON = KryoPool.newBuilder()
+            .register(KryoPoolUtil.API)
+            .register(Timestamped.class)
+            .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+            .build();
+
+    // avoid instantiation
+    private DistributedStoreSerializers() {}
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index 0c33cfe..f079874 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -25,6 +25,7 @@
 import org.onlab.onos.net.device.DefaultDeviceDescription;
 import org.onlab.onos.net.device.DefaultPortDescription;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamp;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.util.KryoPool;
@@ -63,7 +64,9 @@
                     Port.class,
                     DefaultPortDescription.class,
                     Element.class,
-                    Link.Type.class
+                    Link.Type.class,
+                    Timestamp.class
+
                     )
             .register(URI.class, new URISerializer())
             .register(NodeId.class, new NodeIdSerializer())
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
index 738086e..3920dd6 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
@@ -1,9 +1,6 @@
 package org.onlab.onos.store.serializers;
 
 import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.nio.ByteBuffer;
 
 /**
@@ -11,10 +8,8 @@
  */
 public class KryoSerializer implements StoreSerializer {
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
     protected KryoPool serializerPool;
 
-
     public KryoSerializer() {
         setupKryoPool();
     }