Simplified ECMap implmentation by merging items and tombstones maps

Change-Id: If4253722d91c35a7e57dec3c2fceb216d14a7314
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 28be8dc..57943ad 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -16,8 +16,8 @@
 package org.onosproject.store.ecmap;
 
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import org.junit.After;
@@ -32,7 +32,6 @@
 import org.onosproject.event.AbstractEvent;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.LogicalTimestamp;
@@ -44,11 +43,13 @@
 import org.onosproject.store.service.EventuallyConsistentMapListener;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -89,8 +90,8 @@
     private final ControllerNode self =
             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
 
-    private ClusterMessageHandler updateHandler;
-    private ClusterMessageHandler antiEntropyHandler;
+    private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
+    private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
 
     /*
      * Serialization is a bit tricky here. We need to serialize in the tests
@@ -109,11 +110,10 @@
                     // Below is the classes that the map internally registers
                     .register(LogicalTimestamp.class)
                     .register(WallClockTimestamp.class)
-                    .register(PutEntry.class)
-                    .register(RemoveEntry.class)
                     .register(ArrayList.class)
                     .register(AntiEntropyAdvertisement.class)
                     .register(HashMap.class)
+                    .register(Optional.class)
                     .build();
         }
     };
@@ -131,9 +131,9 @@
         // delegate to our ClusterCommunicationService implementation. This
         // allows us to get a reference to the map's internal cluster message
         // handlers so we can induce events coming in from a peer.
-        clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
-                anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
-        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
+        clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
+                anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
 
         replay(clusterCommunicator);
 
@@ -237,15 +237,15 @@
         assertEquals(VALUE1, ecMap.get(KEY1));
 
         // Remote put
-        ClusterMessage message
-                = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
+        List<UpdateEntry<String, String>> message
+                = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
 
         // Create a latch so we know when the put operation has finished
         latch = new CountDownLatch(1);
         ecMap.addListener(new TestListener(latch));
 
         assertNull(ecMap.get(KEY2));
-        updateHandler.handle(message);
+        updateHandler.accept(message);
         assertTrue("External listener never got notified of internal event",
                    latch.await(100, TimeUnit.MILLISECONDS));
         assertEquals(VALUE2, ecMap.get(KEY2));
@@ -255,14 +255,13 @@
         assertNull(ecMap.get(KEY2));
 
         // Remote remove
-        ClusterMessage removeMessage
-                = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
+        message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
 
         // Create a latch so we know when the remove operation has finished
         latch = new CountDownLatch(1);
         ecMap.addListener(new TestListener(latch));
 
-        updateHandler.handle(removeMessage);
+        updateHandler.accept(message);
         assertTrue("External listener never got notified of internal event",
                    latch.await(100, TimeUnit.MILLISECONDS));
         assertNull(ecMap.get(KEY1));
@@ -601,49 +600,35 @@
         }
     }
 
-    private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
-        PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
-
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(Lists.newArrayList(event)));
+    private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
+        return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
     }
 
-    private List<PutEntry<String, String>> generatePutMessage(
+    private List<UpdateEntry<String, String>> generatePutMessage(
             String key1, String value1, String key2, String value2) {
-        ArrayList<PutEntry<String, String>> list = new ArrayList<>();
+        List<UpdateEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
         Timestamp timestamp2 = clockService.peek(2);
 
-        PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
-        PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
-
-        list.add(pe1);
-        list.add(pe2);
+        list.add(generatePutMessage(key1, value1, timestamp1));
+        list.add(generatePutMessage(key2, value2, timestamp2));
 
         return list;
     }
 
-    private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
-        RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
-
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(Lists.newArrayList(event)));
+    private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
+        return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
     }
 
-    private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
-        ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
+    private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
+        List<UpdateEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
         Timestamp timestamp2 = clockService.peek(2);
 
-        RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
-        RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
-
-        list.add(re1);
-        list.add(re2);
+        list.add(generateRemoveMessage(key1, timestamp1));
+        list.add(generateRemoveMessage(key2, timestamp2));
 
         return list;
     }
@@ -737,13 +722,6 @@
         public void addSubscriber(MessageSubject subject,
                                   ClusterMessageHandler subscriber,
                                   ExecutorService executor) {
-            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
-                updateHandler = subscriber;
-            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
-                antiEntropyHandler = subscriber;
-            } else {
-                throw new RuntimeException("Unexpected message subject " + subject.toString());
-            }
         }
 
         @Override
@@ -793,6 +771,13 @@
         public <M> void addSubscriber(MessageSubject subject,
                 Function<byte[], M> decoder, Consumer<M> handler,
                 Executor executor) {
+            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+                updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
+            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+                antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
+            } else {
+                throw new RuntimeException("Unexpected message subject " + subject.toString());
+            }
         }
     }