adding sender-side accumulator to ecmap

Change-Id: I63de27131c067c07b41ca311b14ef3ac85b6ae3e
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 5ed6384..f7c9323 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -17,6 +17,7 @@
 
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.After;
@@ -67,10 +68,8 @@
     private SequentialClockService<String, String> clockService;
 
     private static final String MAP_NAME = "test";
-    private static final MessageSubject PUT_MESSAGE_SUBJECT
+    private static final MessageSubject UPDATE_MESSAGE_SUBJECT
             = new MessageSubject("ecm-" + MAP_NAME + "-update");
-    private static final MessageSubject REMOVE_MESSAGE_SUBJECT
-            = new MessageSubject("ecm-" + MAP_NAME + "-remove");
     private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
             = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
 
@@ -82,8 +81,7 @@
     private final ControllerNode self =
             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
 
-    private ClusterMessageHandler putHandler;
-    private ClusterMessageHandler removeHandler;
+    private ClusterMessageHandler updateHandler;
     private ClusterMessageHandler antiEntropyHandler;
 
     /*
@@ -105,8 +103,6 @@
                     .register(PutEntry.class)
                     .register(RemoveEntry.class)
                     .register(ArrayList.class)
-                    .register(InternalPutEvent.class)
-                    .register(InternalRemoveEvent.class)
                     .register(AntiEntropyAdvertisement.class)
                     .register(HashMap.class)
                     .build();
@@ -237,7 +233,7 @@
         ecMap.addListener(new TestListener(latch));
 
         assertNull(ecMap.get(KEY2));
-        putHandler.handle(message);
+        updateHandler.handle(message);
         assertTrue("External listener never got notified of internal event",
                    latch.await(100, TimeUnit.MILLISECONDS));
         assertEquals(VALUE2, ecMap.get(KEY2));
@@ -254,7 +250,7 @@
         latch = new CountDownLatch(1);
         ecMap.addListener(new TestListener(latch));
 
-        removeHandler.handle(removeMessage);
+        updateHandler.handle(removeMessage);
         assertTrue("External listener never got notified of internal event",
                    latch.await(100, TimeUnit.MILLISECONDS));
         assertNull(ecMap.get(KEY1));
@@ -568,8 +564,7 @@
 
     @Test
     public void testDestroy() throws Exception {
-        clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
-        clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
+        clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
         clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
 
         replay(clusterCommunicator);
@@ -594,12 +589,11 @@
     }
 
     private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
-        InternalPutEvent<String, String> event =
-                new InternalPutEvent<>(key, value, timestamp);
+        PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
 
         return new ClusterMessage(
-                clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
-                SERIALIZER.encode(event));
+                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+                SERIALIZER.encode(Lists.newArrayList(event)));
     }
 
     private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
@@ -614,38 +608,35 @@
         list.add(pe1);
         list.add(pe2);
 
-        InternalPutEvent<String, String> event = new InternalPutEvent<>(list);
 
         return new ClusterMessage(
-                clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
-                SERIALIZER.encode(event));
+                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+                SERIALIZER.encode(list));
     }
 
     private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
-        InternalRemoveEvent<String> event = new InternalRemoveEvent<>(key, timestamp);
+        RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
 
         return new ClusterMessage(
-                clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(event));
+                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+                SERIALIZER.encode(Lists.newArrayList(event)));
     }
 
     private ClusterMessage generateRemoveMessage(String key1, String key2) {
-        ArrayList<RemoveEntry<String>> list = new ArrayList<>();
+        ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
         Timestamp timestamp2 = clockService.peek(2);
 
-        RemoveEntry<String> re1 = new RemoveEntry<>(key1, timestamp1);
-        RemoveEntry<String> re2 = new RemoveEntry<>(key2, timestamp2);
+        RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
+        RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
 
         list.add(re1);
         list.add(re2);
 
-        InternalRemoveEvent<String> event = new InternalRemoveEvent<>(list);
-
         return new ClusterMessage(
-                clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(event));
+                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+                SERIALIZER.encode(list));
     }
 
     /**
@@ -655,10 +646,14 @@
      * @param m message we expect to be sent
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
+    //FIXME rename
     private static void expectSpecificBroadcastMessage(ClusterMessage m,
                            ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(m)).andReturn(true);
+//        expect(clusterCommunicator.broadcast(m)).andReturn(true);
+        expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
+                .andReturn(true)
+                .anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -669,10 +664,14 @@
      * @param m message we expect to be sent
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
+    //FIXME rename
     private static void expectSpecificMulticastMessage(ClusterMessage m,
                            ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-        expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+//        expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+        expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
+                .andReturn(true)
+                .anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -684,10 +683,13 @@
      *
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
+    //FIXME rename
     private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
-                                             anyObject(Iterable.class)))
+//        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
+//                                             anyObject(Iterable.class)))
+        expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
+                                           anyObject(NodeId.class)))
                 .andReturn(true)
                 .anyTimes();
         replay(clusterCommunicator);
@@ -700,9 +702,13 @@
      *
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
+    //FIXME rename
     private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+//        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+//                .andReturn(true)
+//                .anyTimes();
+        expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
                 .andReturn(true)
                 .anyTimes();
         replay(clusterCommunicator);
@@ -747,10 +753,8 @@
         @Override
         public void addSubscriber(MessageSubject subject,
                                   ClusterMessageHandler subscriber) {
-            if (subject.equals(PUT_MESSAGE_SUBJECT)) {
-                putHandler = subscriber;
-            } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
-                removeHandler = subscriber;
+            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+                updateHandler = subscriber;
             } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
                 antiEntropyHandler = subscriber;
             } else {
@@ -762,10 +766,8 @@
         public void addSubscriber(MessageSubject subject,
                                   ClusterMessageHandler subscriber,
                                   ExecutorService executor) {
-            if (subject.equals(PUT_MESSAGE_SUBJECT)) {
-                putHandler = subscriber;
-            } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
-                removeHandler = subscriber;
+            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+                updateHandler = subscriber;
             } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
                 antiEntropyHandler = subscriber;
             } else {