Unit tests for EventuallyConsistentMapImpl.

Most functionality is tested here, except for the anti-entropy code.

ONOS-859.

Change-Id: Ib9e83518f8a91d599364106bc0f7d869e62f5133
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
index 5b60e4d..cadcfa2 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
@@ -15,12 +15,13 @@
  */
 package org.onosproject.store.cluster.messaging;
 
-import java.io.IOException;
-
-import org.onosproject.cluster.NodeId;
-import org.onlab.util.ByteArraySizeHashPrinter;
-
 import com.google.common.base.MoreObjects;
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.cluster.NodeId;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
 
 // TODO: Should payload type be ByteBuffer?
 /**
@@ -79,7 +80,7 @@
      * @throws IOException when I/O exception of some sort has occurred
      */
     public void respond(byte[] data) throws IOException {
-        throw new IllegalStateException("One can only repond to message recived from others.");
+        throw new IllegalStateException("One can only respond to message received from others.");
     }
 
     @Override
@@ -90,4 +91,22 @@
                 .add("payload", ByteArraySizeHashPrinter.of(payload))
                 .toString();
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ClusterMessage)) {
+            return false;
+        }
+
+        ClusterMessage that = (ClusterMessage) o;
+
+        return Objects.equals(this.sender, that.sender) &&
+                Objects.equals(this.subject, that.subject) &&
+                Arrays.equals(this.payload, that.payload);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sender, subject, payload);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java
index fd38e64..0473abf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java
@@ -15,6 +15,10 @@
  */
 package org.onosproject.store.impl;
 
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
 /**
  * Event object signalling that the map was modified.
  */
@@ -68,4 +72,30 @@
     public V value() {
         return value;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof EventuallyConsistentMapEvent)) {
+            return false;
+        }
+
+        EventuallyConsistentMapEvent that = (EventuallyConsistentMapEvent) o;
+        return Objects.equals(this.type, that.type) &&
+                Objects.equals(this.key, that.key) &&
+                Objects.equals(this.value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, key, value);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("type", type)
+                .add("key", key)
+                .add("value", value)
+                .toString();
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
index 00b2004..450f1c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.impl;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.RandomUtils;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
@@ -33,11 +34,11 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -290,13 +291,15 @@
             }
         }
 
-        notifyPeers(new InternalPutEvent<>(updates));
+        if (!updates.isEmpty()) {
+            notifyPeers(new InternalPutEvent<>(updates));
 
-        for (PutEntry<K, V> entry : updates) {
-            EventuallyConsistentMapEvent<K, V> externalEvent =
-                    new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
-            notifyListeners(externalEvent);
+            for (PutEntry<K, V> entry : updates) {
+                EventuallyConsistentMapEvent<K, V> externalEvent = new EventuallyConsistentMapEvent<>(
+                        EventuallyConsistentMapEvent.Type.PUT, entry.key(),
+                        entry.value());
+                notifyListeners(externalEvent);
+            }
         }
     }
 
@@ -314,13 +317,16 @@
             }
         }
 
-        notifyPeers(new InternalRemoveEvent<>(removed));
+        if (!removed.isEmpty()) {
+            notifyPeers(new InternalRemoveEvent<>(removed));
 
-        for (RemoveEntry<K> entry : removed) {
-            EventuallyConsistentMapEvent<K, V> externalEvent =
-                    new EventuallyConsistentMapEvent<>(
-                            EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
-            notifyListeners(externalEvent);
+            for (RemoveEntry<K> entry : removed) {
+                EventuallyConsistentMapEvent<K, V> externalEvent
+                        = new EventuallyConsistentMapEvent<>(
+                        EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
+                        null);
+                notifyListeners(externalEvent);
+            }
         }
     }
 
@@ -370,8 +376,11 @@
         executor.shutdown();
         backgroundExecutor.shutdown();
 
+        listeners.clear();
+
         clusterCommunicator.removeSubscriber(updateMessageSubject);
         clusterCommunicator.removeSubscriber(removeMessageSubject);
+        clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
     }
 
     private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
@@ -430,6 +439,23 @@
         public V setValue(V value) {
             throw new UnsupportedOperationException();
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Map.Entry)) {
+                return false;
+            }
+
+            Map.Entry that = (Map.Entry) o;
+
+            return Objects.equals(this.key, that.getKey()) &&
+                    Objects.equals(this.value, that.getValue());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
     }
 
     private final class SendAdvertisementTask implements Runnable {
@@ -728,12 +754,11 @@
         }
     }
 
-    private static final class InternalPutEvent<K, V> {
+    static final class InternalPutEvent<K, V> {
         private final List<PutEntry<K, V>> entries;
 
         public InternalPutEvent(K key, V value, Timestamp timestamp) {
-            entries = Collections
-                    .singletonList(new PutEntry<>(key, value, timestamp));
+            entries = ImmutableList.of(new PutEntry<>(key, value, timestamp));
         }
 
         public InternalPutEvent(List<PutEntry<K, V>> entries) {
@@ -751,7 +776,7 @@
         }
     }
 
-    private static final class PutEntry<K, V> {
+    static final class PutEntry<K, V> {
         private final K key;
         private final V value;
         private final Timestamp timestamp;
@@ -791,12 +816,11 @@
         }
     }
 
-    private static final class InternalRemoveEvent<K> {
+    static final class InternalRemoveEvent<K> {
         private final List<RemoveEntry<K>> entries;
 
         public InternalRemoveEvent(K key, Timestamp timestamp) {
-            entries = Collections.singletonList(
-                    new RemoveEntry<>(key, timestamp));
+            entries = ImmutableList.of(new RemoveEntry<>(key, timestamp));
         }
 
         public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
@@ -814,7 +838,7 @@
         }
     }
 
-    private static final class RemoveEntry<K> {
+    static final class RemoveEntry<K> {
         private final K key;
         private final Timestamp timestamp;
 
diff --git a/core/store/dist/src/test/java/org/onosproject/store/impl/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/impl/EventuallyConsistentMapImplTest.java
new file mode 100644
index 0000000..5269d39
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onosproject/store/impl/EventuallyConsistentMapImplTest.java
@@ -0,0 +1,841 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static junit.framework.TestCase.assertFalse;
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for EventuallyConsistentMapImpl.
+ */
+public class EventuallyConsistentMapImplTest {
+
+    private EventuallyConsistentMap<String, String> ecMap;
+
+    private ClusterService clusterService;
+    private ClusterCommunicationService clusterCommunicator;
+    private SequentialClockService<String> clockService;
+
+    private static final String MAP_NAME = "test";
+    private static final MessageSubject PUT_MESSAGE_SUBJECT
+            = new MessageSubject("ecm-" + MAP_NAME + "-update");
+    private static final MessageSubject REMOVE_MESSAGE_SUBJECT
+            = new MessageSubject("ecm-" + MAP_NAME + "-remove");
+    private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
+            = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
+
+    private static final String KEY1 = "one";
+    private static final String KEY2 = "two";
+    private static final String VALUE1 = "oneValue";
+    private static final String VALUE2 = "twoValue";
+
+    private final ControllerNode self =
+            new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
+
+    private ClusterMessageHandler putHandler;
+    private ClusterMessageHandler removeHandler;
+    private ClusterMessageHandler antiEntropyHandler;
+
+    /*
+     * Serialization is a bit tricky here. We need to serialize in the tests
+     * to set the expectations, which will use this serializer here, but the
+     * EventuallyConsistentMap will use its own internal serializer. This means
+     * this serializer must be set up exactly the same as map's internal
+     * serializer.
+     */
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    // Classes we give to the map
+                    .register(KryoNamespaces.API)
+                    .register(TestTimestamp.class)
+                    // Below is the classes that the map internally registers
+                    .register(WallClockTimestamp.class)
+                    .register(EventuallyConsistentMapImpl.PutEntry.class)
+                    .register(EventuallyConsistentMapImpl.RemoveEntry.class)
+                    .register(ArrayList.class)
+                    .register(EventuallyConsistentMapImpl.InternalPutEvent.class)
+                    .register(EventuallyConsistentMapImpl.InternalRemoveEvent.class)
+                    .register(AntiEntropyAdvertisement.class)
+                    .register(HashMap.class)
+                    .build();
+        }
+    };
+
+    @Before
+    public void setUp() throws Exception {
+        clusterService = createMock(ClusterService.class);
+        expect(clusterService.getLocalNode()).andReturn(self)
+                .anyTimes();
+        replay(clusterService);
+
+        clusterCommunicator = createMock(ClusterCommunicationService.class);
+
+        // Add expectation for adding cluster message subscribers which
+        // delegate to our ClusterCommunicationService implementation. This
+        // allows us to get a reference to the map's internal cluster message
+        // handlers so we can induce events coming in from a peer.
+        clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
+                anyObject(ClusterMessageHandler.class));
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
+
+        replay(clusterCommunicator);
+
+        clockService = new SequentialClockService<>();
+
+        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(TestTimestamp.class);
+
+        ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
+                                                  clusterCommunicator,
+                                                  serializer, clockService);
+
+        // Reset ready for tests to add their own expectations
+        reset(clusterCommunicator);
+    }
+
+    @After
+    public void tearDown() {
+        reset(clusterCommunicator);
+        ecMap.destroy();
+    }
+
+    @Test
+    public void testSize() throws Exception {
+        expectAnyMessage(clusterCommunicator);
+
+        assertEquals(0, ecMap.size());
+        ecMap.put(KEY1, VALUE1);
+        assertEquals(1, ecMap.size());
+        ecMap.put(KEY1, VALUE2);
+        assertEquals(1, ecMap.size());
+        ecMap.put(KEY2, VALUE2);
+        assertEquals(2, ecMap.size());
+        for (int i = 0; i < 10; i++) {
+            ecMap.put("" + i, "" + i);
+        }
+        assertEquals(12, ecMap.size());
+        ecMap.remove(KEY1);
+        assertEquals(11, ecMap.size());
+        ecMap.remove(KEY1);
+        assertEquals(11, ecMap.size());
+    }
+
+    @Test
+    public void testIsEmpty() throws Exception {
+        expectAnyMessage(clusterCommunicator);
+
+        assertTrue(ecMap.isEmpty());
+        ecMap.put(KEY1, VALUE1);
+        assertFalse(ecMap.isEmpty());
+        ecMap.remove(KEY1);
+        assertTrue(ecMap.isEmpty());
+    }
+
+    @Test
+    public void testContainsKey() throws Exception {
+        expectAnyMessage(clusterCommunicator);
+
+        assertFalse(ecMap.containsKey(KEY1));
+        ecMap.put(KEY1, VALUE1);
+        assertTrue(ecMap.containsKey(KEY1));
+        assertFalse(ecMap.containsKey(KEY2));
+        ecMap.remove(KEY1);
+        assertFalse(ecMap.containsKey(KEY1));
+    }
+
+    @Test
+    public void testContainsValue() throws Exception {
+        expectAnyMessage(clusterCommunicator);
+
+        assertFalse(ecMap.containsValue(VALUE1));
+        ecMap.put(KEY1, VALUE1);
+        assertTrue(ecMap.containsValue(VALUE1));
+        assertFalse(ecMap.containsValue(VALUE2));
+        ecMap.put(KEY1, VALUE2);
+        assertFalse(ecMap.containsValue(VALUE1));
+        assertTrue(ecMap.containsValue(VALUE2));
+        ecMap.remove(KEY1);
+        assertFalse(ecMap.containsValue(VALUE2));
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        expectAnyMessage(clusterCommunicator);
+
+        CountDownLatch latch;
+
+        // Local put
+        assertNull(ecMap.get(KEY1));
+        ecMap.put(KEY1, VALUE1);
+        assertEquals(VALUE1, ecMap.get(KEY1));
+
+        // Remote put
+        ClusterMessage message
+                = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2));
+
+        // Create a latch so we know when the put operation has finished
+        latch = new CountDownLatch(1);
+        ecMap.addListener(new TestListener(latch));
+
+        assertNull(ecMap.get(KEY2));
+        putHandler.handle(message);
+        assertTrue("External listener never got notified of internal event",
+                   latch.await(100, TimeUnit.MILLISECONDS));
+        assertEquals(VALUE2, ecMap.get(KEY2));
+
+        // Local remove
+        ecMap.remove(KEY2);
+        assertNull(ecMap.get(KEY2));
+
+        // Remote remove
+        ClusterMessage removeMessage
+                = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1));
+
+        // Create a latch so we know when the remove operation has finished
+        latch = new CountDownLatch(1);
+        ecMap.addListener(new TestListener(latch));
+
+        removeHandler.handle(removeMessage);
+        assertTrue("External listener never got notified of internal event",
+                   latch.await(100, TimeUnit.MILLISECONDS));
+        assertNull(ecMap.get(KEY1));
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        // Set up expectations of external events to be sent to listeners during
+        // the test. These don't use timestamps so we can set them all up at once.
+        EventuallyConsistentMapListener<String, String> listener
+                = createMock(EventuallyConsistentMapListener.class);
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
+        replay(listener);
+
+        ecMap.addListener(listener);
+
+        // Set up expected internal message to be broadcast to peers on first put
+        expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
+                .peekAtNextTimestamp()), clusterCommunicator);
+
+        // Put first value
+        assertNull(ecMap.get(KEY1));
+        ecMap.put(KEY1, VALUE1);
+        assertEquals(VALUE1, ecMap.get(KEY1));
+
+        verify(clusterCommunicator);
+
+        // Set up expected internal message to be broadcast to peers on second put
+        expectSpecificMessage(generatePutMessage(
+                KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
+
+        // Update same key to a new value
+        ecMap.put(KEY1, VALUE2);
+        assertEquals(VALUE2, ecMap.get(KEY1));
+
+        verify(clusterCommunicator);
+
+        // Do a put with a older timestamp than the value already there.
+        // The map data should not be changed and no notifications should be sent.
+        reset(clusterCommunicator);
+        replay(clusterCommunicator);
+
+        clockService.turnBackTime();
+        ecMap.put(KEY1, VALUE1);
+        // Value should not have changed.
+        assertEquals(VALUE2, ecMap.get(KEY1));
+
+        verify(clusterCommunicator);
+
+        // Check that our listener received the correct events during the test
+        verify(listener);
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+        // Set up expectations of external events to be sent to listeners during
+        // the test. These don't use timestamps so we can set them all up at once.
+        EventuallyConsistentMapListener<String, String> listener
+                = createMock(EventuallyConsistentMapListener.class);
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
+        expectLastCall().times(2);
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
+        replay(listener);
+
+        ecMap.addListener(listener);
+
+        // Put in an initial value
+        expectAnyMessage(clusterCommunicator);
+        ecMap.put(KEY1, VALUE1);
+        assertEquals(VALUE1, ecMap.get(KEY1));
+
+        // Remove the value and check the correct internal cluster messages
+        // are sent
+        expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+                              clusterCommunicator);
+
+        ecMap.remove(KEY1);
+        assertNull(ecMap.get(KEY1));
+
+        verify(clusterCommunicator);
+
+        // Remove the same value again. Even though the value is no longer in
+        // the map, we expect that the tombstone is updated and another remove
+        // event is sent to the cluster and external listeners.
+        expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+                              clusterCommunicator);
+
+        ecMap.remove(KEY1);
+        assertNull(ecMap.get(KEY1));
+
+        verify(clusterCommunicator);
+
+
+        // Put in a new value for us to try and remove
+        expectAnyMessage(clusterCommunicator);
+
+        ecMap.put(KEY2, VALUE2);
+
+        clockService.turnBackTime();
+
+        // Remove should have no effect, since it has an older timestamp than
+        // the put. Expect no notifications to be sent out
+        reset(clusterCommunicator);
+        replay(clusterCommunicator);
+
+        ecMap.remove(KEY2);
+
+        verify(clusterCommunicator);
+
+        // Check that our listener received the correct events during the test
+        verify(listener);
+    }
+
+    @Test
+    public void testPutAll() throws Exception {
+        // putAll() with an empty map is a no-op - no messages will be sent
+        reset(clusterCommunicator);
+        replay(clusterCommunicator);
+
+        ecMap.putAll(new HashMap<>());
+
+        verify(clusterCommunicator);
+
+        // Set up the listener with our expected events
+        EventuallyConsistentMapListener<String, String> listener
+                = createMock(EventuallyConsistentMapListener.class);
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
+        replay(listener);
+
+        ecMap.addListener(listener);
+
+        // Expect a multi-update inter-instance message
+        expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+                              clusterCommunicator);
+
+        Map<String, String> putAllValues = new HashMap<>();
+        putAllValues.put(KEY1, VALUE1);
+        putAllValues.put(KEY2, VALUE2);
+
+        // Put the values in the map
+        ecMap.putAll(putAllValues);
+
+        // Check the correct messages and events were sent
+        verify(clusterCommunicator);
+        verify(listener);
+    }
+
+    @Test
+    public void testClear() throws Exception {
+        EventuallyConsistentMapListener<String, String> listener
+                = createMock(EventuallyConsistentMapListener.class);
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
+        replay(listener);
+
+        // clear() on an empty map is a no-op - no messages will be sent
+        reset(clusterCommunicator);
+        replay(clusterCommunicator);
+
+        assertTrue(ecMap.isEmpty());
+        ecMap.clear();
+        verify(clusterCommunicator);
+
+        // Put some items in the map
+        expectAnyMessage(clusterCommunicator);
+        ecMap.put(KEY1, VALUE1);
+        ecMap.put(KEY2, VALUE2);
+
+        ecMap.addListener(listener);
+        expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+
+        ecMap.clear();
+
+        verify(clusterCommunicator);
+        verify(listener);
+    }
+
+    @Test
+    public void testKeySet() throws Exception {
+        expectAnyMessage(clusterCommunicator);
+
+        assertTrue(ecMap.keySet().isEmpty());
+
+        // Generate some keys
+        Set<String> keys = new HashSet<>();
+        for (int i = 1; i <= 10; i++) {
+            keys.add("" + i);
+        }
+
+        // Put each key in the map
+        keys.forEach(k -> ecMap.put(k, "value" + k));
+
+        // Check keySet() returns the correct value
+        assertEquals(keys, ecMap.keySet());
+
+        // Update the value for one of the keys
+        ecMap.put(keys.iterator().next(), "new-value");
+
+        // Check the key set is still the same
+        assertEquals(keys, ecMap.keySet());
+
+        // Remove a key
+        String removeKey = keys.iterator().next();
+        keys.remove(removeKey);
+        ecMap.remove(removeKey);
+
+        // Check the key set is still correct
+        assertEquals(keys, ecMap.keySet());
+    }
+
+    @Test
+    public void testValues() throws Exception {
+        expectAnyMessage(clusterCommunicator);
+
+        assertTrue(ecMap.values().isEmpty());
+
+        // Generate some values
+        Map<String, String> expectedValues = new HashMap<>();
+        for (int i = 1; i <= 10; i++) {
+            expectedValues.put("" + i, "value" + i);
+        }
+
+        // Add them into the map
+        expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
+
+        // Check the values collection is correct
+        assertEquals(expectedValues.values().size(), ecMap.values().size());
+        expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
+
+        // Update the value for one of the keys
+        Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
+        expectedValues.put(first.getKey(), "new-value");
+        ecMap.put(first.getKey(), "new-value");
+
+        // Check the values collection is still correct
+        assertEquals(expectedValues.values().size(), ecMap.values().size());
+        expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
+
+        // Remove a key
+        String removeKey = expectedValues.keySet().iterator().next();
+        expectedValues.remove(removeKey);
+        ecMap.remove(removeKey);
+
+        // Check the values collection is still correct
+        assertEquals(expectedValues.values().size(), ecMap.values().size());
+        expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
+    }
+
+    @Test
+    public void testEntrySet() throws Exception {
+        expectAnyMessage(clusterCommunicator);
+
+        assertTrue(ecMap.entrySet().isEmpty());
+
+        // Generate some values
+        Map<String, String> expectedValues = new HashMap<>();
+        for (int i = 1; i <= 10; i++) {
+            expectedValues.put("" + i, "value" + i);
+        }
+
+        // Add them into the map
+        expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
+
+        // Check the entry set is correct
+        assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
+
+        // Update the value for one of the keys
+        Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
+        expectedValues.put(first.getKey(), "new-value");
+        ecMap.put(first.getKey(), "new-value");
+
+        // Check the entry set is still correct
+        assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
+
+        // Remove a key
+        String removeKey = expectedValues.keySet().iterator().next();
+        expectedValues.remove(removeKey);
+        ecMap.remove(removeKey);
+
+        // Check the entry set is still correct
+        assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
+    }
+
+    private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
+        if (expectedMap.entrySet().size() != actual.size()) {
+            return false;
+        }
+
+        for (Map.Entry<String, String> e : actual) {
+            if (!expectedMap.containsKey(e.getKey())) {
+                return false;
+            }
+            if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Test
+    public void testDestroy() throws Exception {
+        clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
+        clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
+        clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
+
+        replay(clusterCommunicator);
+
+        ecMap.destroy();
+
+        verify(clusterCommunicator);
+
+        try {
+            ecMap.get(KEY1);
+            fail("get after destroy should throw exception");
+        } catch (IllegalStateException e) {
+            assertTrue(true);
+        }
+
+        try {
+            ecMap.put(KEY1, VALUE1);
+            fail("put after destroy should throw exception");
+        } catch (IllegalStateException e) {
+            assertTrue(true);
+        }
+    }
+
+    private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
+        EventuallyConsistentMapImpl.InternalPutEvent<String, String> event =
+                new EventuallyConsistentMapImpl.InternalPutEvent<>(
+                        key, value, timestamp);
+
+        return new ClusterMessage(
+                clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
+                SERIALIZER.encode(event));
+    }
+
+    private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
+        ArrayList<EventuallyConsistentMapImpl.PutEntry<String, String>> list = new ArrayList<>();
+
+        Timestamp timestamp1 = clockService.peek(1);
+        Timestamp timestamp2 = clockService.peek(2);
+
+        EventuallyConsistentMapImpl.PutEntry<String, String> pe1
+                = new EventuallyConsistentMapImpl.PutEntry<>(key1, value1, timestamp1);
+        EventuallyConsistentMapImpl.PutEntry<String, String> pe2
+                = new EventuallyConsistentMapImpl.PutEntry<>(key2, value2, timestamp2);
+
+        list.add(pe1);
+        list.add(pe2);
+
+        EventuallyConsistentMapImpl.InternalPutEvent<String, String> event
+                = new EventuallyConsistentMapImpl.InternalPutEvent<>(list);
+
+        return new ClusterMessage(
+                clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
+                SERIALIZER.encode(event));
+    }
+
+    private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
+        EventuallyConsistentMapImpl.InternalRemoveEvent<String> event =
+                new EventuallyConsistentMapImpl.InternalRemoveEvent<>(
+                        key, timestamp);
+
+        return new ClusterMessage(
+                clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
+                SERIALIZER.encode(event));
+    }
+
+    private ClusterMessage generateRemoveMessage(String key1, String key2) {
+        ArrayList<EventuallyConsistentMapImpl.RemoveEntry<String>> list = new ArrayList<>();
+
+        Timestamp timestamp1 = clockService.peek(1);
+        Timestamp timestamp2 = clockService.peek(2);
+
+        EventuallyConsistentMapImpl.RemoveEntry<String> re1
+                = new EventuallyConsistentMapImpl.RemoveEntry<>(key1, timestamp1);
+        EventuallyConsistentMapImpl.RemoveEntry<String> re2
+                = new EventuallyConsistentMapImpl.RemoveEntry<>(key2, timestamp2);
+
+        list.add(re1);
+        list.add(re2);
+
+        EventuallyConsistentMapImpl.InternalRemoveEvent<String> event
+                = new EventuallyConsistentMapImpl.InternalRemoveEvent<>(list);
+
+        return new ClusterMessage(
+                clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
+                SERIALIZER.encode(event));
+    }
+
+    /**
+     * Sets up a mock ClusterCommunicationService to expect a specific cluster
+     * message to be broadcast to the cluster.
+     *
+     * @param m message we expect to be sent
+     * @param clusterCommunicator a mock ClusterCommunicationService to set up
+     */
+    private static void expectSpecificMessage(ClusterMessage m,
+            ClusterCommunicationService clusterCommunicator) {
+        reset(clusterCommunicator);
+        expect(clusterCommunicator.broadcast(m)).andReturn(true);
+        replay(clusterCommunicator);
+    }
+
+    /**
+     * Sets up a mock ClusterCommunicationService to expect any cluster message
+     * that is sent to it. This is useful for unit tests where we aren't
+     * interested in testing the messaging component.
+     *
+     * @param clusterCommunicator a mock ClusterCommunicationService to set up
+     */
+    private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
+        reset(clusterCommunicator);
+        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+                .andReturn(true)
+                .anyTimes();
+        replay(clusterCommunicator);
+    }
+
+    /**
+     * ClusterCommunicationService implementation that the map's addSubscriber
+     * call will delegate to. This means we can get a reference to the
+     * internal cluster message handler used by the map, so that we can simulate
+     * events coming in from other instances.
+     */
+    private final class TestClusterCommunicationService
+            implements ClusterCommunicationService {
+
+        @Override
+        public boolean broadcast(ClusterMessage message) {
+            return false;
+        }
+
+        @Override
+        public boolean broadcastIncludeSelf(ClusterMessage message) {
+            return false;
+        }
+
+        @Override
+        public boolean unicast(ClusterMessage message, NodeId toNodeId)
+                throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
+            return false;
+        }
+
+        @Override
+        public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
+                                                       NodeId toNodeId)
+                throws IOException {
+            return null;
+        }
+
+        @Override
+        public void addSubscriber(MessageSubject subject,
+                                  ClusterMessageHandler subscriber) {
+            if (subject.equals(PUT_MESSAGE_SUBJECT)) {
+                putHandler = subscriber;
+            } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
+                removeHandler = subscriber;
+            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+                antiEntropyHandler = subscriber;
+            } else {
+                throw new RuntimeException("Unexpected message subject " + subject.toString());
+            }
+        }
+
+        @Override
+        public void removeSubscriber(MessageSubject subject) {}
+    }
+
+    /**
+     * ClockService implementation that gives out timestamps based on a
+     * sequential counter. This clock service enables more control over the
+     * timestamps that are given out, including being able to "turn back time"
+     * to give out timestamps from the past.
+     *
+     * @param <T> Type that the clock service will give out timestamps for
+     */
+    private class SequentialClockService<T> implements ClockService<T> {
+
+        private static final long INITIAL_VALUE = 1;
+        private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
+
+        @Override
+        public Timestamp getTimestamp(T object) {
+            return new TestTimestamp(counter.getAndIncrement());
+        }
+
+        /**
+         * Returns what the next timestamp will be without consuming the
+         * timestamp. This allows test code to set expectations correctly while
+         * still allowing the CUT to get the same timestamp.
+         *
+         * @return timestamp equal to the timestamp that will be returned by the
+         * next call to {@link #getTimestamp(T)}.
+         */
+        public Timestamp peekAtNextTimestamp() {
+            return peek(1);
+        }
+
+        /**
+         * Returns the ith timestamp to be given out in the future without
+         * consuming the timestamp. For example, i=1 returns the next timestamp,
+         * i=2 returns the timestamp after that, and so on.
+         *
+         * @param i number of the timestamp to peek at
+         * @return the ith timestamp that will be given out
+         */
+        public Timestamp peek(int i) {
+            checkArgument(i > 0, "i must be a positive integer");
+
+            return new TestTimestamp(counter.get() + i - 1);
+        }
+
+        /**
+         * Turns the clock back two ticks, so the next call to getTimestamp will
+         * return an older timestamp than the previous call to getTimestamp.
+         */
+        public void turnBackTime() {
+            // Not atomic, but should be OK for these tests.
+            counter.decrementAndGet();
+            counter.decrementAndGet();
+        }
+
+    }
+
+    /**
+     * Timestamp implementation where the value of the timestamp can be
+     * specified explicitly at creation time.
+     */
+    private class TestTimestamp implements Timestamp {
+
+        private final long timestamp;
+
+        /**
+         * Creates a new timestamp that has the specified value.
+         *
+         * @param timestamp value of the timestamp
+         */
+        public TestTimestamp(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public int compareTo(Timestamp o) {
+            checkArgument(o instanceof TestTimestamp);
+            TestTimestamp otherTimestamp = (TestTimestamp) o;
+            return ComparisonChain.start()
+                    .compare(this.timestamp, otherTimestamp.timestamp)
+                    .result();
+        }
+    }
+
+    /**
+     * EventuallyConsistentMapListener implementation which triggers a latch
+     * when it receives an event.
+     */
+    private class TestListener implements EventuallyConsistentMapListener<String, String> {
+        private CountDownLatch latch;
+
+        /**
+         * Creates a new listener that will trigger the specified latch when it
+         * receives and event.
+         *
+         * @param latch the latch to trigger on events
+         */
+        public TestListener(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        public void event(EventuallyConsistentMapEvent<String, String> event) {
+            latch.countDown();
+        }
+    }
+}