[ONOS-3591] Anti-Entropy speed up via push/pull interaction

Adds an UpdateRequest message. This contains a set of keys that a node
is missing updates for. The receiver will then send an UpdateEntry for
each missing key to the requester.

Change-Id: I2115f4a05833b51ae14d1191f09f083b5251f8ec
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index b4174b6..2e1e29a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -90,6 +91,7 @@
 
     private final MessageSubject updateMessageSubject;
     private final MessageSubject antiEntropyAdvertisementSubject;
+    private final MessageSubject updateRequestSubject;
 
     private final Set<EventuallyConsistentMapListener<K, V>> listeners
             = Sets.newCopyOnWriteArraySet();
@@ -244,6 +246,12 @@
                                           serializer::encode,
                                           this.backgroundExecutor);
 
+        updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
+        clusterCommunicator.addSubscriber(updateRequestSubject,
+                                          serializer::decode,
+                                          this::handleUpdateRequests,
+                                          this.backgroundExecutor);
+
         if (!tombstonesDisabled) {
             previousTombstonePurgeTime = 0;
             this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
@@ -513,6 +521,7 @@
         listeners.clear();
 
         clusterCommunicator.removeSubscriber(updateMessageSubject);
+        clusterCommunicator.removeSubscriber(updateRequestSubject);
         clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
         return CompletableFuture.completedFuture(null);
     }
@@ -579,6 +588,19 @@
                 });
     }
 
+    private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
+        UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
+        clusterCommunicator.unicast(request,
+                updateRequestSubject,
+                serializer::encode,
+                peer)
+                .whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.debug("Failed to send update request to {}", peer, error);
+                    }
+                });
+    }
+
     private AntiEntropyAdvertisement<K> createAdvertisement() {
         return new AntiEntropyAdvertisement<>(localNodeId,
                 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
@@ -591,18 +613,9 @@
         try {
             if (log.isTraceEnabled()) {
                 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
-                        mapName, ad.sender(), ad.digest().size());
+                        ad.sender(), mapName, ad.digest().size());
             }
             antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
-
-            if (!lightweightAntiEntropy) {
-                // if remote ad has any entries that the local copy is missing, actively sync
-                // TODO: Missing keys is not the way local copy can be behind.
-                if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
-                    // TODO: Send ad for missing keys and for entries that are stale
-                    sendAdvertisementToPeer(ad.sender());
-                }
-            }
         } catch (Exception e) {
             log.warn("Error handling anti-entropy advertisement", e);
             return AntiEntropyResponse.FAILED;
@@ -620,15 +633,20 @@
             AntiEntropyAdvertisement<K> ad) {
         final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
         final NodeId sender = ad.sender();
+        final List<NodeId> peers = ImmutableList.of(sender);
+        Set<K> staleOrMissing = new HashSet<>();
+        Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());
+
         items.forEach((key, localValue) -> {
+            locallyUnknown.remove(key);
             MapValue.Digest remoteValueDigest = ad.digest().get(key);
             if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
                 // local value is more recent, push to sender
-                queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
-            }
-            if (remoteValueDigest != null
+                queueUpdate(new UpdateEntry<>(key, localValue), peers);
+            } else if (remoteValueDigest != null
                     && remoteValueDigest.isNewerThan(localValue.digest())
                     && remoteValueDigest.isTombstone()) {
+                // remote value is more recent and a tombstone: update local value
                 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
                 MapValue<V> previousValue = removeInternal(key,
                                                            Optional.empty(),
@@ -636,14 +654,31 @@
                 if (previousValue != null && previousValue.isAlive()) {
                     externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
                 }
+            } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
+                // Not a tombstone and remote is newer
+                staleOrMissing.add(key);
             }
         });
+        // Keys missing in local map
+        staleOrMissing.addAll(locallyUnknown);
+        // Request updates that we missed out on
+        sendUpdateRequestToPeer(sender, staleOrMissing);
         return externalEvents;
     }
 
+    private void handleUpdateRequests(UpdateRequest<K> request) {
+        final Set<K> keys = request.keys();
+        final NodeId sender = request.sender();
+        final List<NodeId> peers = ImmutableList.of(sender);
+
+        keys.forEach(key ->
+            queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
+        );
+    }
+
     private void purgeTombstones() {
         /*
-         * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
+         * In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
          * of tombstones we employ the following heuristic to purge old tombstones periodically.
          * First, we keep track of the time (local system time) when we were able to have a successful
          * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded