Implement anti-entropy for the EventuallyConsistentMap.

ONOS-857. 

Change-Id: Ife2070142d3c165c2a0035c3011c05b426c8baa4
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
index 0acef1a..00b2004 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
@@ -16,8 +16,10 @@
 package org.onosproject.store.impl;
 
 import com.google.common.base.MoreObjects;
+import org.apache.commons.lang3.RandomUtils;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -32,6 +34,8 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,6 +44,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -69,8 +74,9 @@
 
     private final MessageSubject updateMessageSubject;
     private final MessageSubject removeMessageSubject;
+    private final MessageSubject antiEntropyAdvertisementSubject;
 
-    private final Set<EventuallyConsistentMapListener> listeners
+    private final Set<EventuallyConsistentMapListener<K, V>> listeners
             = new CopyOnWriteArraySet<>();
 
     private final ExecutorService executor;
@@ -138,12 +144,20 @@
                 newSingleThreadScheduledExecutor(minPriority(
                         namedThreads("onos-ecm-" + mapName + "-bg-%d")));
 
+        // start anti-entropy thread
+        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                                               initialDelaySec, periodSec,
+                                               TimeUnit.SECONDS);
+
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
                                           new InternalPutEventListener());
         removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
         clusterCommunicator.addSubscriber(removeMessageSubject,
                                           new InternalRemoveEventListener());
+        antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
+        clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
+                                          new InternalAntiEntropyListener());
     }
 
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -158,9 +172,9 @@
                         .register(ArrayList.class)
                         .register(InternalPutEvent.class)
                         .register(InternalRemoveEvent.class)
+                        .register(AntiEntropyAdvertisement.class)
+                        .register(HashMap.class)
                         .build();
-
-                // TODO anti-entropy classes
             }
         };
     }
@@ -360,8 +374,8 @@
         clusterCommunicator.removeSubscriber(removeMessageSubject);
     }
 
-    private void notifyListeners(EventuallyConsistentMapEvent event) {
-        for (EventuallyConsistentMapListener listener : listeners) {
+    private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
+        for (EventuallyConsistentMapListener<K, V> listener : listeners) {
             listener.event(event);
         }
     }
@@ -418,6 +432,245 @@
         }
     }
 
+    private final class SendAdvertisementTask implements Runnable {
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Interrupted, quitting");
+                return;
+            }
+
+            try {
+                final NodeId self = clusterService.getLocalNode().id();
+                Set<ControllerNode> nodes = clusterService.getNodes();
+
+                List<NodeId> nodeIds = nodes.stream()
+                        .map(node -> node.id())
+                        .collect(Collectors.toList());
+
+                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+                    log.trace("No other peers in the cluster.");
+                    return;
+                }
+
+                NodeId peer;
+                do {
+                    int idx = RandomUtils.nextInt(0, nodeIds.size());
+                    peer = nodeIds.get(idx);
+                } while (peer.equals(self));
+
+                if (Thread.currentThread().isInterrupted()) {
+                    log.info("Interrupted, quitting");
+                    return;
+                }
+
+                AntiEntropyAdvertisement<K> ad = createAdvertisement();
+
+                try {
+                    unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
+                } catch (IOException e) {
+                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
+                }
+            } catch (Exception e) {
+                // Catch all exceptions to avoid scheduled task being suppressed.
+                log.error("Exception thrown while sending advertisement", e);
+            }
+        }
+    }
+
+    private AntiEntropyAdvertisement<K> createAdvertisement() {
+        final NodeId self = clusterService.getLocalNode().id();
+
+        Map<K, Timestamp> timestamps = new HashMap<>(items.size());
+
+        items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
+
+        Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
+
+        return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
+    }
+
+    private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
+        List<EventuallyConsistentMapEvent<K, V>> externalEvents;
+
+        synchronized (this) {
+            final NodeId sender = ad.sender();
+
+            externalEvents = antiEntropyCheckLocalItems(ad);
+
+            antiEntropyCheckLocalRemoved(ad);
+
+            externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
+
+            // if remote ad has something unknown, actively sync
+            for (K key : ad.timestamps().keySet()) {
+                if (!items.containsKey(key)) {
+                    AntiEntropyAdvertisement<K> myAd = createAdvertisement();
+                    try {
+                        unicastMessage(sender, antiEntropyAdvertisementSubject,
+                                       myAd);
+                        break;
+                    } catch (IOException e) {
+                        log.debug(
+                                "Failed to send reactive anti-entropy advertisement to {}",
+                                sender);
+                    }
+                }
+            }
+        } // synchronized (this)
+
+        externalEvents.forEach(this::notifyListeners);
+    }
+
+    /**
+     * Checks if any of the remote's live items or tombstones are out of date
+     * according to our local live item list, or if our live items are out of
+     * date according to the remote's tombstone list.
+     * If the local copy is more recent, it will be pushed to the remote. If the
+     * remote has a more recent remove, we apply that to the local state.
+     *
+     * @param ad remote anti-entropy advertisement
+     * @return list of external events relating to local operations performed
+     */
+    // Guarded by synchronized (this)
+    private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
+            AntiEntropyAdvertisement<K> ad) {
+        final List<EventuallyConsistentMapEvent<K, V>> externalEvents
+                = new LinkedList<>();
+        final NodeId sender = ad.sender();
+
+        final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
+
+        for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
+            K key = item.getKey();
+            Timestamped<V> localValue = item.getValue();
+
+            Timestamp remoteTimestamp = ad.timestamps().get(key);
+            if (remoteTimestamp == null) {
+                remoteTimestamp = ad.tombstones().get(key);
+            }
+            if (remoteTimestamp == null || localValue
+                    .isNewer(remoteTimestamp)) {
+                // local value is more recent, push to sender
+                updatesToSend
+                        .add(new PutEntry<>(key, localValue.value(),
+                                            localValue.timestamp()));
+            }
+
+            Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
+            if (remoteDeadTimestamp != null &&
+                    remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
+                // sender has a more recent remove
+                if (removeInternal(key, remoteDeadTimestamp)) {
+                    externalEvents.add(new EventuallyConsistentMapEvent<>(
+                            EventuallyConsistentMapEvent.Type.REMOVE, key, null));
+                }
+            }
+        }
+
+        // Send all updates to the peer at once
+        if (!updatesToSend.isEmpty()) {
+            try {
+                unicastMessage(sender, updateMessageSubject, new InternalPutEvent<>(updatesToSend));
+            } catch (IOException e) {
+                log.warn("Failed to send advertisement response", e);
+            }
+        }
+
+        return externalEvents;
+    }
+
+    /**
+     * Checks if any items in the remote live list are out of date according
+     * to our tombstone list. If we find we have a more up to date tombstone,
+     * we'll send it to the remote.
+     *
+     * @param ad remote anti-entropy advertisement
+     */
+    // Guarded by synchronized (this)
+    private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
+        final NodeId sender = ad.sender();
+
+        final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
+
+        for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
+            K key = dead.getKey();
+            Timestamp localDeadTimestamp = dead.getValue();
+
+            Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
+            if (remoteLiveTimestamp != null
+                    && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
+                // sender has zombie, push remove
+                removesToSend
+                        .add(new RemoveEntry<>(key, localDeadTimestamp));
+            }
+        }
+
+        // Send all removes to the peer at once
+        if (!removesToSend.isEmpty()) {
+            try {
+                unicastMessage(sender, removeMessageSubject, new InternalRemoveEvent<>(removesToSend));
+            } catch (IOException e) {
+                log.warn("Failed to send advertisement response", e);
+            }
+        }
+    }
+
+    /**
+     * Checks if any of the local live items are out of date according to the
+     * remote's tombstone advertisements. If we find a local item is out of date,
+     * we'll apply the remove operation to the local state.
+     *
+     * @param ad remote anti-entropy advertisement
+     * @return list of external events relating to local operations performed
+     */
+    // Guarded by synchronized (this)
+    private List<EventuallyConsistentMapEvent<K, V>>
+            antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
+        final List<EventuallyConsistentMapEvent<K, V>> externalEvents
+                = new LinkedList<>();
+
+        for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
+            K key = remoteDead.getKey();
+            Timestamp remoteDeadTimestamp = remoteDead.getValue();
+
+            Timestamped<V> local = items.get(key);
+            Timestamp localDead = removedItems.get(key);
+            if (local != null
+                    && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
+                // remove our version
+                if (removeInternal(key, remoteDeadTimestamp)) {
+                    externalEvents.add(new EventuallyConsistentMapEvent<>(
+                            EventuallyConsistentMapEvent.Type.REMOVE, key, null));
+                }
+            } else if (localDead != null &&
+                    remoteDeadTimestamp.compareTo(localDead) > 0) {
+                // If we both had the item as removed, but their timestamp is
+                // newer, update ours to the newer value
+                removedItems.put(key, remoteDeadTimestamp);
+            }
+        }
+
+        return externalEvents;
+    }
+
+    private final class InternalAntiEntropyListener
+            implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+            log.trace("Received anti-entropy advertisement from peer: {}", message.sender());
+            AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
+            backgroundExecutor.submit(() -> {
+                try {
+                    handleAntiEntropyAdvertisement(advertisement);
+                } catch (Exception e) {
+                    log.warn("Exception thrown handling advertisements", e);
+                }
+            });
+        }
+    }
+
     private final class InternalPutEventListener implements
             ClusterMessageHandler {
         @Override
@@ -433,7 +686,7 @@
                         Timestamp timestamp = entry.timestamp();
 
                         if (putInternal(key, value, timestamp)) {
-                            EventuallyConsistentMapEvent externalEvent =
+                            EventuallyConsistentMapEvent<K, V> externalEvent =
                                     new EventuallyConsistentMapEvent<>(
                                     EventuallyConsistentMapEvent.Type.PUT, key,
                                     value);
@@ -461,7 +714,8 @@
                         Timestamp timestamp = entry.timestamp();
 
                         if (removeInternal(key, timestamp)) {
-                            EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
+                            EventuallyConsistentMapEvent<K, V> externalEvent
+                                    = new EventuallyConsistentMapEvent<>(
                                     EventuallyConsistentMapEvent.Type.REMOVE,
                                     key, null);
                             notifyListeners(externalEvent);