[ONOS-3591] Anti-Entropy speed up via push/pull interaction

Adds an UpdateRequest message. This contains a set of keys that a node
is missing updates for. The receiver will then send an UpdateEntry for
each missing key to the requester.

Change-Id: I2115f4a05833b51ae14d1191f09f083b5251f8ec
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 0012d68..8d41fd2 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -88,6 +88,8 @@
             = new MessageSubject("ecm-" + MAP_NAME + "-update");
     private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
             = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
+    private static final MessageSubject UPDATE_REQUEST_SUBJECT
+            = new MessageSubject("ecm-" + MAP_NAME + "-update-request");
 
     private static final String KEY1 = "one";
     private static final String KEY2 = "two";
@@ -98,6 +100,7 @@
             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
 
     private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
+    private Consumer<Collection<UpdateRequest<String>>> requestHandler;
     private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
 
     @Before
@@ -123,6 +126,9 @@
                                                           anyObject(Function.class),
                                                           anyObject(Executor.class));
         expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+        clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
+                anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
 
         replay(clusterCommunicator);
 
@@ -627,6 +633,7 @@
     @Test
     public void testDestroy() throws Exception {
         clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
+        clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
         clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
 
         replay(clusterCommunicator);
@@ -774,6 +781,8 @@
                 Executor executor) {
             if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
                 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
+            } else if (subject.equals(UPDATE_REQUEST_SUBJECT)) {
+                requestHandler = (Consumer<Collection<UpdateRequest<String>>>) handler;
             } else {
                 throw new RuntimeException("Unexpected message subject " + subject.toString());
             }