Device Anti-Entropy

- create Advertisement
- handler for Advertisement
- register handler, background thread to send advertisement

Change-Id: I99e8a7d68747970c34b3c25c6d0489769d251446
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 2221ea0..5319749 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -5,8 +5,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -14,6 +13,8 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.AnnotationsUtil;
 import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DefaultDevice;
@@ -23,9 +24,6 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Port;
 import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.SparseAnnotations;
-import org.onlab.onos.net.device.DefaultDeviceDescription;
-import org.onlab.onos.net.device.DefaultPortDescription;
 import org.onlab.onos.net.device.DeviceDescription;
 import org.onlab.onos.net.device.DeviceEvent;
 import org.onlab.onos.net.device.DeviceStore;
@@ -38,18 +36,22 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.common.impl.Timestamped;
-import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.device.impl.peermsg.DeviceAntiEntropyAdvertisement;
+import org.onlab.onos.store.device.impl.peermsg.DeviceFragmentId;
+import org.onlab.onos.store.device.impl.peermsg.PortFragmentId;
 import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
 import org.onlab.util.KryoPool;
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -57,19 +59,21 @@
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Predicates.notNull;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 import static org.onlab.onos.net.DefaultAnnotations.merge;
-import static org.onlab.onos.net.DefaultAnnotations.union;
 import static com.google.common.base.Verify.verify;
+import static org.onlab.util.Tools.namedThreads;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
+import static org.onlab.onos.store.common.impl.ControllerNodeToNodeId.toNodeId;
 
 // TODO: give me a better name
 /**
@@ -86,8 +90,9 @@
 
     public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
 
-    // TODO: Check if inner Map can be replaced with plain Map
+    // TODO: Check if inner Map can be replaced with plain Map.
     // innerMap is used to lock a Device, thus instance should never be replaced.
+
     // collection of Description given from various providers
     private final ConcurrentMap<DeviceId,
                             ConcurrentMap<ProviderId, DeviceDescriptions>>
@@ -117,21 +122,23 @@
         @Override
         protected void setupKryoPool() {
             serializerPool = KryoPool.newBuilder()
-                    .register(KryoPoolUtil.API)
+                    .register(DistributedStoreSerializers.COMMON)
+
                     .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
                     .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
                     .register(InternalDeviceRemovedEvent.class)
                     .register(InternalPortEvent.class, new InternalPortEventSerializer())
                     .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
-                    .register(Timestamp.class)
-                    .register(Timestamped.class)
-                    .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+                    .register(DeviceAntiEntropyAdvertisement.class)
+                    .register(DeviceFragmentId.class)
+                    .register(PortFragmentId.class)
                     .build()
                     .populate(1);
         }
-
     };
 
+    private ScheduledExecutorService executor;
+
     @Activate
     public void activate() {
         clusterCommunicator.addSubscriber(
@@ -144,11 +151,35 @@
                 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+
+        executor =
+                newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
+
+        // TODO: Make these configurable
+        long initialDelaySec = 5;
+        long periodSec = 5;
+        // start anti-entropy thread
+        executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                    initialDelaySec, periodSec, TimeUnit.SECONDS);
+
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+
+        executor.shutdownNow();
+        try {
+            boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
+            if (timedout) {
+                log.error("Timeout during executor shutdown");
+            }
+        } catch (InterruptedException e) {
+            log.error("Error during executor shutdown", e);
+        }
+
         deviceDescs.clear();
         devices.clear();
         devicePorts.clear();
@@ -543,14 +574,14 @@
 
             final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
             if (existingPortDesc == null ||
-                deltaDesc == existingPortDesc ||
                 deltaDesc.isNewer(existingPortDesc)) {
                 // on new port or valid update
                 // update description
                 descs.putPortDesc(deltaDesc);
                 newPort = composePort(device, number, descsMap);
             } else {
-                // outdated event, ignored.
+                // same or outdated event, ignored.
+                log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
                 return null;
             }
 
@@ -627,6 +658,14 @@
         }
     }
 
+    /**
+     * Checks if given timestamp is superseded by removal request
+     * with more recent timestamp.
+     *
+     * @param deviceId identifier of a device
+     * @param timestampToCheck timestamp of an event to check
+     * @return true if device is already removed
+     */
     private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
         Timestamp removalTimestamp = removalRequest.get(deviceId);
         if (removalTimestamp != null &&
@@ -667,7 +706,7 @@
                 continue;
             }
             // TODO: should keep track of Description timestamp
-            // and only merge conflicting keys when timestamp is newer
+            // and only merge conflicting keys when timestamp is newer.
             // Currently assuming there will never be a key conflict between
             // providers
 
@@ -708,7 +747,7 @@
                 continue;
             }
             // TODO: should keep track of Description timestamp
-            // and only merge conflicting keys when timestamp is newer
+            // and only merge conflicting keys when timestamp is newer.
             // Currently assuming there will never be a key conflict between
             // providers
 
@@ -745,129 +784,258 @@
         return providerDescs.get(pid);
     }
 
-    public static final class InitDeviceDescs
-        implements ConcurrentInitializer<DeviceDescriptions> {
-
-        private final Timestamped<DeviceDescription> deviceDesc;
-
-        public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
-            this.deviceDesc = checkNotNull(deviceDesc);
-        }
-        @Override
-        public DeviceDescriptions get() throws ConcurrentException {
-            return new DeviceDescriptions(deviceDesc);
-        }
+    // TODO: should we be throwing exception?
+    private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                SERIALIZER.encode(event));
+        clusterCommunicator.unicast(message, recipient);
     }
 
-
-    /**
-     * Collection of Description of a Device and it's Ports given from a Provider.
-     */
-    public static class DeviceDescriptions {
-
-        private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
-        private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
-
-        public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
-            this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
-            this.portDescs = new ConcurrentHashMap<>();
-        }
-
-        Timestamp getLatestTimestamp() {
-            Timestamp latest = deviceDesc.get().timestamp();
-            for (Timestamped<PortDescription> desc : portDescs.values()) {
-                if (desc.timestamp().compareTo(latest) > 0) {
-                    latest = desc.timestamp();
-                }
-            }
-            return latest;
-        }
-
-        public Timestamped<DeviceDescription> getDeviceDesc() {
-            return deviceDesc.get();
-        }
-
-        public Timestamped<PortDescription> getPortDesc(PortNumber number) {
-            return portDescs.get(number);
-        }
-
-        /**
-         * Puts DeviceDescription, merging annotations as necessary.
-         *
-         * @param newDesc new DeviceDescription
-         * @return previous DeviceDescription
-         */
-        public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
-            Timestamped<DeviceDescription> oldOne = deviceDesc.get();
-            Timestamped<DeviceDescription> newOne = newDesc;
-            if (oldOne != null) {
-                SparseAnnotations merged = union(oldOne.value().annotations(),
-                                                 newDesc.value().annotations());
-                newOne = new Timestamped<DeviceDescription>(
-                        new DefaultDeviceDescription(newDesc.value(), merged),
-                        newDesc.timestamp());
-            }
-            return deviceDesc.getAndSet(newOne);
-        }
-
-        /**
-         * Puts PortDescription, merging annotations as necessary.
-         *
-         * @param newDesc new PortDescription
-         * @return previous PortDescription
-         */
-        public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
-            Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
-            Timestamped<PortDescription> newOne = newDesc;
-            if (oldOne != null) {
-                SparseAnnotations merged = union(oldOne.value().annotations(),
-                                                 newDesc.value().annotations());
-                newOne = new Timestamped<PortDescription>(
-                        new DefaultPortDescription(newDesc.value(), merged),
-                        newDesc.timestamp());
-            }
-            return portDescs.put(newOne.value().portNumber(), newOne);
-        }
+    // TODO: should we be throwing exception?
+    private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
     }
 
     private void notifyPeers(InternalDeviceEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
     }
 
     private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
     }
 
     private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
     }
 
     private void notifyPeers(InternalPortEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.PORT_UPDATE,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
     }
 
     private void notifyPeers(InternalPortStatusEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+    }
+
+    private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private void notifyPeer(NodeId recipient, InternalPortEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
+        try {
+            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+        } catch (IOException e) {
+            log.error("Failed to send" + event + " to " + recipient, e);
+        }
+    }
+
+    private DeviceAntiEntropyAdvertisement createAdvertisement() {
+        final NodeId self = clusterService.getLocalNode().id();
+
+        Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size());
+        final int portsPerDevice = 8; // random guess to minimize reallocation
+        Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice);
+        Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size());
+
+        for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>>
+            provs : deviceDescs.entrySet()) {
+
+            final DeviceId deviceId = provs.getKey();
+            final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
+            synchronized (devDescs) {
+
+                offline.put(deviceId, this.offline.get(deviceId));
+
+                for (Entry<ProviderId, DeviceDescriptions>
+                        prov : devDescs.entrySet()) {
+
+                    final ProviderId provId = prov.getKey();
+                    final DeviceDescriptions descs = prov.getValue();
+
+                    devices.put(new DeviceFragmentId(deviceId, provId),
+                            descs.getDeviceDesc().timestamp());
+
+                    for (Entry<PortNumber, Timestamped<PortDescription>>
+                            portDesc : descs.getPortDescs().entrySet()) {
+
+                        final PortNumber number = portDesc.getKey();
+                        ports.put(new PortFragmentId(deviceId, provId, number),
+                                portDesc.getValue().timestamp());
+                    }
+                }
+            }
+        }
+
+        return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline);
+    }
+
+    /**
+     * Responds to anti-entropy advertisement message.
+     * <P>
+     * Notify sender about out-dated information using regular replication message.
+     * Send back advertisement to sender if not in sync.
+     *
+     * @param advertisement to respond to
+     */
+    private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
+
+        final NodeId sender = advertisement.sender();
+
+        Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
+        Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
+        Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
+
+        // Fragments to request
+        Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
+        Collection<PortFragmentId> reqPorts = new ArrayList<>();
+
+        for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
+            final DeviceId deviceId = de.getKey();
+            final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
+
+            synchronized (lDevice) {
+                // latestTimestamp across provider
+                // Note: can be null initially
+                Timestamp localLatest = offline.get(deviceId);
+
+                // handle device Ads
+                for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
+                    final ProviderId provId = prov.getKey();
+                    final DeviceDescriptions lDeviceDescs = prov.getValue();
+
+                    final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
+
+
+                    Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
+                    Timestamp advDevTimestamp = devAds.get(devFragId);
+
+                    if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
+                        // remote does not have it or outdated, suggest
+                        notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
+                    } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
+                        // local is outdated, request
+                        reqDevices.add(devFragId);
+                    }
+
+                    // handle port Ads
+                    for (Entry<PortNumber, Timestamped<PortDescription>>
+                            pe : lDeviceDescs.getPortDescs().entrySet()) {
+
+                        final PortNumber num = pe.getKey();
+                        final Timestamped<PortDescription> lPort = pe.getValue();
+
+                        final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
+
+                        Timestamp advPortTimestamp = portAds.get(portFragId);
+                        if ( advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
+                            // remote does not have it or outdated, suggest
+                            notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
+                        } else if (!lPort.timestamp().equals(advPortTimestamp)) {
+                            // local is outdated, request
+                            log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
+                            reqPorts.add(portFragId);
+                        }
+
+                        // remove port Ad already processed
+                        portAds.remove(portFragId);
+                    } // end local port loop
+
+                    // remove device Ad already processed
+                    devAds.remove(devFragId);
+
+                    // find latest and update
+                    final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
+                    if (localLatest == null ||
+                        providerLatest.compareTo(localLatest) > 0) {
+                        localLatest = providerLatest;
+                    }
+                } // end local provider loop
+
+                // checking if remote timestamp is more recent.
+                Timestamp rOffline = offlineAds.get(deviceId);
+                if (rOffline != null &&
+                    rOffline.compareTo(localLatest) > 0) {
+                    // remote offline timestamp suggests that the
+                    // device is off-line
+                    markOfflineInternal(deviceId, rOffline);
+                }
+
+                Timestamp lOffline = offline.get(deviceId);
+                if (lOffline != null && rOffline == null) {
+                    // locally offline, but remote is online, suggest offline
+                    notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
+                }
+
+                // remove device offline Ad already processed
+                offlineAds.remove(deviceId);
+            } // end local device loop
+        } // device lock
+
+        // If there is any Ads left, request them
+        log.trace("Ads left {}, {}", devAds, portAds);
+        reqDevices.addAll(devAds.keySet());
+        reqPorts.addAll(portAds.keySet());
+
+        if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
+            log.trace("Nothing to request to remote peer {}", sender);
+            return;
+        }
+
+        log.info("Need to sync {} {}", reqDevices, reqPorts);
+
+        // 2-way Anti-Entropy for now
+        try {
+            unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
+        } catch (IOException e) {
+            log.error("Failed to send response advertisement to " + sender, e);
+        }
+
+// Sketch of 3-way Anti-Entropy
+//        DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
+//        ClusterMessage message = new ClusterMessage(
+//                clusterService.getLocalNode().id(),
+//                GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
+//                SERIALIZER.encode(request));
+//
+//        try {
+//            clusterCommunicator.unicast(message, advertisement.sender());
+//        } catch (IOException e) {
+//            log.error("Failed to send advertisement reply to "
+//                      + advertisement.sender(), e);
+//        }
     }
 
     private void notifyDelegateIfNotNull(DeviceEvent event) {
@@ -876,6 +1044,54 @@
         }
     }
 
+    private final class SendAdvertisementTask implements Runnable {
+
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Interrupted, quitting");
+                return;
+            }
+
+            try {
+                final NodeId self = clusterService.getLocalNode().id();
+                Set<ControllerNode> nodes = clusterService.getNodes();
+
+                ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
+                        .transform(toNodeId())
+                        .toList();
+
+                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+                    log.info("No other peers in the cluster.");
+                    return;
+                }
+
+                NodeId peer;
+                do {
+                    int idx = RandomUtils.nextInt(0, nodeIds.size());
+                    peer = nodeIds.get(idx);
+                } while (peer.equals(self));
+
+                DeviceAntiEntropyAdvertisement ad = createAdvertisement();
+
+                if (Thread.currentThread().isInterrupted()) {
+                    log.info("Interrupted, quitting");
+                    return;
+                }
+
+                try {
+                    unicastMessage(peer, DEVICE_ADVERTISE, ad);
+                } catch (IOException e) {
+                    log.error("Failed to send anti-entropy advertisement", e);
+                    return;
+                }
+            } catch (Exception e) {
+                // catch all Exception to avoid Scheduled task being suppressed.
+                log.error("Exception thrown while sending advertisement", e);
+            }
+        }
+    }
+
     private class InternalDeviceEventListener implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
@@ -940,6 +1156,7 @@
 
             log.info("Received port status update event from peer: {}", message.sender());
             InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+            log.info("{}", event);
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -948,4 +1165,15 @@
             notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
         }
     }
+
+    private final class InternalDeviceAdvertisementListener
+        implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+            log.info("Received Device advertisement from peer: {}", message.sender());
+            DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+            handleAdvertisement(advertisement);
+        }
+    }
 }