[ONOS-3591] Anti-Entropy speed up via push/pull interaction

Adds an UpdateRequest message. This contains a set of keys that a node
is missing updates for. The receiver will then send an UpdateEntry for
each missing key to the requester.

Change-Id: I2115f4a05833b51ae14d1191f09f083b5251f8ec
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index b4174b6..2e1e29a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -90,6 +91,7 @@
 
     private final MessageSubject updateMessageSubject;
     private final MessageSubject antiEntropyAdvertisementSubject;
+    private final MessageSubject updateRequestSubject;
 
     private final Set<EventuallyConsistentMapListener<K, V>> listeners
             = Sets.newCopyOnWriteArraySet();
@@ -244,6 +246,12 @@
                                           serializer::encode,
                                           this.backgroundExecutor);
 
+        updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
+        clusterCommunicator.addSubscriber(updateRequestSubject,
+                                          serializer::decode,
+                                          this::handleUpdateRequests,
+                                          this.backgroundExecutor);
+
         if (!tombstonesDisabled) {
             previousTombstonePurgeTime = 0;
             this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
@@ -513,6 +521,7 @@
         listeners.clear();
 
         clusterCommunicator.removeSubscriber(updateMessageSubject);
+        clusterCommunicator.removeSubscriber(updateRequestSubject);
         clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
         return CompletableFuture.completedFuture(null);
     }
@@ -579,6 +588,19 @@
                 });
     }
 
+    private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
+        UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
+        clusterCommunicator.unicast(request,
+                updateRequestSubject,
+                serializer::encode,
+                peer)
+                .whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.debug("Failed to send update request to {}", peer, error);
+                    }
+                });
+    }
+
     private AntiEntropyAdvertisement<K> createAdvertisement() {
         return new AntiEntropyAdvertisement<>(localNodeId,
                 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
@@ -591,18 +613,9 @@
         try {
             if (log.isTraceEnabled()) {
                 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
-                        mapName, ad.sender(), ad.digest().size());
+                        ad.sender(), mapName, ad.digest().size());
             }
             antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
-
-            if (!lightweightAntiEntropy) {
-                // if remote ad has any entries that the local copy is missing, actively sync
-                // TODO: Missing keys is not the way local copy can be behind.
-                if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
-                    // TODO: Send ad for missing keys and for entries that are stale
-                    sendAdvertisementToPeer(ad.sender());
-                }
-            }
         } catch (Exception e) {
             log.warn("Error handling anti-entropy advertisement", e);
             return AntiEntropyResponse.FAILED;
@@ -620,15 +633,20 @@
             AntiEntropyAdvertisement<K> ad) {
         final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
         final NodeId sender = ad.sender();
+        final List<NodeId> peers = ImmutableList.of(sender);
+        Set<K> staleOrMissing = new HashSet<>();
+        Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());
+
         items.forEach((key, localValue) -> {
+            locallyUnknown.remove(key);
             MapValue.Digest remoteValueDigest = ad.digest().get(key);
             if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
                 // local value is more recent, push to sender
-                queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
-            }
-            if (remoteValueDigest != null
+                queueUpdate(new UpdateEntry<>(key, localValue), peers);
+            } else if (remoteValueDigest != null
                     && remoteValueDigest.isNewerThan(localValue.digest())
                     && remoteValueDigest.isTombstone()) {
+                // remote value is more recent and a tombstone: update local value
                 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
                 MapValue<V> previousValue = removeInternal(key,
                                                            Optional.empty(),
@@ -636,14 +654,31 @@
                 if (previousValue != null && previousValue.isAlive()) {
                     externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
                 }
+            } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
+                // Not a tombstone and remote is newer
+                staleOrMissing.add(key);
             }
         });
+        // Keys missing in local map
+        staleOrMissing.addAll(locallyUnknown);
+        // Request updates that we missed out on
+        sendUpdateRequestToPeer(sender, staleOrMissing);
         return externalEvents;
     }
 
+    private void handleUpdateRequests(UpdateRequest<K> request) {
+        final Set<K> keys = request.keys();
+        final NodeId sender = request.sender();
+        final List<NodeId> peers = ImmutableList.of(sender);
+
+        keys.forEach(key ->
+            queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
+        );
+    }
+
     private void purgeTombstones() {
         /*
-         * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
+         * In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
          * of tombstones we employ the following heuristic to purge old tombstones periodically.
          * First, we keep track of the time (local system time) when we were able to have a successful
          * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateRequest.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateRequest.java
new file mode 100644
index 0000000..e6b62d7
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.cluster.NodeId;
+
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Describes a request for update events in an EventuallyConsistentMap.
+ */
+final class UpdateRequest<K> {
+
+    private final NodeId sender;
+    private final Set<K> keys;
+
+    /**
+     * Creates a new update request.
+     *
+     * @param sender the sender's node ID
+     * @param keys keys requested
+     */
+    public UpdateRequest(NodeId sender, Set<K> keys) {
+        this.sender = checkNotNull(sender);
+        this.keys = ImmutableSet.copyOf(keys);
+    }
+
+    /**
+     * Returns the sender's node ID.
+     *
+     * @return the sender's node ID
+     */
+    public NodeId sender() {
+        return sender;
+    }
+
+    /**
+     * Returns the keys.
+     *
+     * @return the keys
+     */
+    public Set<K> keys() {
+        return keys;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("sender", sender)
+                .add("keys", keys())
+                .toString();
+    }
+
+    @SuppressWarnings("unused")
+    private UpdateRequest() {
+        this.sender = null;
+        this.keys = null;
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 0012d68..8d41fd2 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -88,6 +88,8 @@
             = new MessageSubject("ecm-" + MAP_NAME + "-update");
     private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
             = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
+    private static final MessageSubject UPDATE_REQUEST_SUBJECT
+            = new MessageSubject("ecm-" + MAP_NAME + "-update-request");
 
     private static final String KEY1 = "one";
     private static final String KEY2 = "two";
@@ -98,6 +100,7 @@
             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
 
     private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
+    private Consumer<Collection<UpdateRequest<String>>> requestHandler;
     private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
 
     @Before
@@ -123,6 +126,9 @@
                                                           anyObject(Function.class),
                                                           anyObject(Executor.class));
         expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+        clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
+                anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
 
         replay(clusterCommunicator);
 
@@ -627,6 +633,7 @@
     @Test
     public void testDestroy() throws Exception {
         clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
+        clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
         clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
 
         replay(clusterCommunicator);
@@ -774,6 +781,8 @@
                 Executor executor) {
             if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
                 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
+            } else if (subject.equals(UPDATE_REQUEST_SUBJECT)) {
+                requestHandler = (Consumer<Collection<UpdateRequest<String>>>) handler;
             } else {
                 throw new RuntimeException("Unexpected message subject " + subject.toString());
             }