Simplified ECMap implmentation by merging items and tombstones maps

Change-Id: If4253722d91c35a7e57dec3c2fceb216d14a7314
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java
deleted file mode 100644
index 68d51d4..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import java.util.Objects;
-
-import org.onosproject.store.Timestamp;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for events in an EventuallyConsistentMap.
- */
-public abstract class AbstractEntry<K, V> implements Comparable<AbstractEntry<K, V>> {
-    private final K key;
-    private final Timestamp timestamp;
-
-    /**
-     * Creates a new put entry.
-     *
-     * @param key key of the entry
-     * @param timestamp timestamp of the put event
-     */
-    public AbstractEntry(K key, Timestamp timestamp) {
-        this.key = checkNotNull(key);
-        this.timestamp = checkNotNull(timestamp);
-    }
-
-    // Needed for serialization.
-    @SuppressWarnings("unused")
-    protected AbstractEntry() {
-        this.key = null;
-        this.timestamp = null;
-    }
-
-    /**
-     * Returns the key of the entry.
-     *
-     * @return the key
-     */
-    public K key() {
-        return key;
-    }
-
-    /**
-     * Returns the timestamp of the event.
-     *
-     * @return the timestamp
-     */
-    public Timestamp timestamp() {
-        return timestamp;
-    }
-
-    @Override
-    public int compareTo(AbstractEntry<K, V> o) {
-        return this.timestamp.compareTo(o.timestamp);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(timestamp);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o instanceof AbstractEntry) {
-            final AbstractEntry that = (AbstractEntry) o;
-            return this.timestamp.equals(that.timestamp);
-        }
-        return false;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java
index 23b2dfc..d783fe2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java
@@ -16,11 +16,11 @@
 package org.onosproject.store.ecmap;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+
 import org.onosproject.cluster.NodeId;
-import org.onosproject.store.Timestamp;
 
 import java.util.Map;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -29,22 +29,18 @@
 public class AntiEntropyAdvertisement<K> {
 
     private final NodeId sender;
-    private final Map<K, Timestamp> timestamps;
-    private final Map<K, Timestamp> tombstones;
+    private final Map<K, MapValue.Digest> digest;
 
     /**
      * Creates a new anti entropy advertisement message.
      *
      * @param sender the sender's node ID
-     * @param timestamps map of item key to timestamp for current items
-     * @param tombstones map of item key to timestamp for removed items
+     * @param digest for map entries
      */
     public AntiEntropyAdvertisement(NodeId sender,
-                                    Map<K, Timestamp> timestamps,
-                                    Map<K, Timestamp> tombstones) {
+                                    Map<K, MapValue.Digest> digest) {
         this.sender = checkNotNull(sender);
-        this.timestamps = checkNotNull(timestamps);
-        this.tombstones = checkNotNull(tombstones);
+        this.digest = ImmutableMap.copyOf(checkNotNull(digest));
     }
 
     /**
@@ -57,36 +53,19 @@
     }
 
     /**
-     * Returns the map of current item timestamps.
+     * Returns the digest for map entries.
      *
-     * @return current item timestamps
+     * @return mapping from key to associated digest
      */
-    public Map<K, Timestamp> timestamps() {
-        return timestamps;
-    }
-
-    /**
-     * Returns the map of removed item timestamps.
-     *
-     * @return removed item timestamps
-     */
-    public Map<K, Timestamp> tombstones() {
-        return tombstones;
-    }
-
-    // For serializer
-    @SuppressWarnings("unused")
-    private AntiEntropyAdvertisement() {
-        this.sender = null;
-        this.timestamps = null;
-        this.tombstones = null;
+    public Map<K, MapValue.Digest> digest() {
+        return digest;
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
-                .add("timestampsSize", timestamps.size())
-                .add("tombstonesSize", tombstones.size())
+                .add("sender", sender)
+                .add("totalEntries", digest.size())
                 .toString();
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 118ef78..1fd27d3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -18,9 +18,8 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.AbstractAccumulator;
 import org.onlab.util.KryoNamespace;
@@ -30,12 +29,10 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.LogicalTimestamp;
-import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
@@ -43,30 +40,27 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.Timer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.isNull;
-import static java.util.Objects.nonNull;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
@@ -80,12 +74,12 @@
 
     private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
 
-    private final ConcurrentMap<K, Timestamped<V>> items;
-    private final ConcurrentMap<K, Timestamp> removedItems;
+    private final Map<K, MapValue<V>> items;
 
     private final ClusterService clusterService;
     private final ClusterCommunicationService clusterCommunicator;
     private final KryoSerializer serializer;
+    private final NodeId localNodeId;
 
     private final BiFunction<K, V, Timestamp> timestampProvider;
 
@@ -93,7 +87,7 @@
     private final MessageSubject antiEntropyAdvertisementSubject;
 
     private final Set<EventuallyConsistentMapListener<K, V>> listeners
-            = new CopyOnWriteArraySet<>();
+            = Sets.newCopyOnWriteArraySet();
 
     private final ExecutorService executor;
     private final ScheduledExecutorService backgroundExecutor;
@@ -162,13 +156,13 @@
                                 TimeUnit antiEntropyTimeUnit,
                                 boolean convergeFaster,
                                 boolean persistent) {
-        items = new ConcurrentHashMap<>();
-        removedItems = new ConcurrentHashMap<>();
+        items = Maps.newConcurrentMap();
         senderPending = Maps.newConcurrentMap();
         destroyedMessage = mapName + ERROR_DESTROYED;
 
         this.clusterService = clusterService;
         this.clusterCommunicator = clusterCommunicator;
+        this.localNodeId = clusterService.getLocalNode().id();
 
         this.serializer = createSerializer(serializerBuilder);
 
@@ -179,7 +173,7 @@
         } else {
             this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
                     .map(ControllerNode::id)
-                    .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+                    .filter(nodeId -> !nodeId.equals(localNodeId))
                     .collect(Collectors.toList());
         }
 
@@ -210,7 +204,7 @@
                     newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
 
             persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
-            persistentStore.readInto(items, removedItems);
+            persistentStore.readInto(items);
         } else {
             this.persistentStore = null;
         }
@@ -223,17 +217,21 @@
         }
 
         // start anti-entropy thread
-        this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+        this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
                                                     initialDelaySec, antiEntropyPeriod,
                                                     antiEntropyTimeUnit);
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalEventListener(), this.executor);
+                                          serializer::decode,
+                                          this::processUpdates,
+                                          this.executor);
 
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
-                                          new InternalAntiEntropyListener(), this.backgroundExecutor);
+                                          serializer::decode,
+                                          this::handleAntiEntropyAdvertisement,
+                                          this.backgroundExecutor);
 
         this.tombstonesDisabled = tombstonesDisabled;
         this.lightweightAntiEntropy = !convergeFaster;
@@ -245,14 +243,13 @@
             protected void setupKryoPool() {
                 // Add the map's internal helper classes to the user-supplied serializer
                 serializerPool = builder
+                        .register(KryoNamespaces.BASIC)
                         .register(LogicalTimestamp.class)
                         .register(WallClockTimestamp.class)
-                        .register(PutEntry.class)
-                        .register(RemoveEntry.class)
-                        .register(ArrayList.class)
                         .register(AntiEntropyAdvertisement.class)
-                        .register(HashMap.class)
-                        .register(Timestamped.class)
+                        .register(UpdateEntry.class)
+                        .register(MapValue.class)
+                        .register(MapValue.Digest.class)
                         .build();
             }
         };
@@ -261,29 +258,31 @@
     @Override
     public int size() {
         checkState(!destroyed, destroyedMessage);
-        return items.size();
+        // TODO: Maintain a separate counter for tracking live elements in map.
+        return Maps.filterValues(items, MapValue::isAlive).size();
     }
 
     @Override
     public boolean isEmpty() {
         checkState(!destroyed, destroyedMessage);
-        return items.isEmpty();
+        return size() == 0;
     }
 
     @Override
     public boolean containsKey(K key) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
-        return items.containsKey(key);
+        return get(key) != null;
     }
 
     @Override
     public boolean containsValue(V value) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(value, ERROR_NULL_VALUE);
-
-        return items.values().stream()
-                .anyMatch(timestamped -> timestamped.value().equals(value));
+        return items.values()
+                    .stream()
+                    .filter(MapValue::isAlive)
+                    .anyMatch(v -> v.get().equals(value));
     }
 
     @Override
@@ -291,11 +290,8 @@
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
 
-        Timestamped<V> value = items.get(key);
-        if (value != null) {
-            return value.value();
-        }
-        return null;
+        MapValue<V> value = items.get(key);
+        return (value == null || value.isTombstone()) ? null : value.get();
     }
 
     @Override
@@ -304,123 +300,18 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
 
-        Timestamp timestamp = timestampProvider.apply(key, value);
-
-        if (putInternal(key, value, timestamp)) {
-            notifyPeers(new PutEntry<>(key, value, timestamp),
-                        peerUpdateFunction.apply(key, value));
-            notifyListeners(new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.PUT, key, value));
+        MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
+        if (updateInternal(key, newValue)) {
+            notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
         }
     }
 
-    private boolean putInternal(K key, V value, Timestamp timestamp) {
-        counter.incrementCount();
-        Timestamp removed = removedItems.get(key);
-        if (removed != null && removed.isNewerThan(timestamp)) {
-            log.debug("ecmap - removed was newer {}", value);
-            return false;
-        }
-
-        final MutableBoolean updated = new MutableBoolean(false);
-
-        items.compute(key, (k, existing) -> {
-            if (existing != null && existing.isNewerThan(timestamp)) {
-                updated.setFalse();
-                return existing;
-            } else {
-                updated.setTrue();
-                return new Timestamped<>(value, timestamp);
-            }
-            });
-
-        boolean success = updated.booleanValue();
-        if (!success) {
-            log.debug("ecmap - existing was newer {}", value);
-        }
-
-        if (success && removed != null) {
-            removedItems.remove(key, removed);
-        }
-
-        if (success && persistent) {
-            persistentStore.put(key, value, timestamp);
-        }
-
-        return success;
-    }
-
     @Override
     public V remove(K key) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
-
-        // TODO prevent calls here if value is important for timestamp
-        Timestamp timestamp = timestampProvider.apply(key, null);
-
-        Optional<V> removedValue = removeInternal(key, timestamp);
-        if (removedValue == null) {
-            return null;
-        }
-        notifyPeers(new RemoveEntry<>(key, timestamp),
-                peerUpdateFunction.apply(key, null));
-        notifyListeners(new EventuallyConsistentMapEvent<>(
-                EventuallyConsistentMapEvent.Type.REMOVE, key, removedValue.orElse(null)));
-
-        return removedValue.orElse(null);
-    }
-
-    /**
-     * Returns null if the timestamp is for a outdated request i.e.
-     * the value is the map is more recent or a tombstone exists with a
-     * more recent timestamp.
-     * Returns non-empty optional if a value was indeed removed from the map.
-     * Returns empty optional if map did not contain a value for the key but the existing
-     * tombstone is older than this timestamp.
-     * @param key key
-     * @param timestamp timestamp for remove request
-     * @return Optional value.
-     */
-    private Optional<V> removeInternal(K key, Timestamp timestamp) {
-        if (timestamp == null) {
-            return null;
-        }
-
-        counter.incrementCount();
-        final AtomicReference<Optional<V>> removedValue = new AtomicReference<>(null);
-        items.compute(key, (k, existing) -> {
-            if (existing != null && existing.isNewerThan(timestamp)) {
-                return existing;
-            } else {
-                removedValue.set(existing == null ? Optional.empty() : Optional.of(existing.value()));
-                return null;
-            }
-        });
-
-        if (isNull(removedValue.get())) {
-            return null;
-        }
-
-        boolean updatedTombstone = false;
-
-        if (!tombstonesDisabled) {
-            Timestamp removedTimestamp = removedItems.get(key);
-            if (removedTimestamp == null) {
-                //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
-                updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
-            } else if (timestamp.isNewerThan(removedTimestamp)) {
-                updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
-            }
-        }
-
-        if (persistent) {
-            persistentStore.remove(key, timestamp);
-        }
-
-        if (tombstonesDisabled || updatedTombstone) {
-            return removedValue.get();
-        }
-        return null;
+        return removeInternal(key, Optional.empty());
     }
 
     @Override
@@ -428,15 +319,34 @@
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
+        removeInternal(key, Optional.of(value));
+    }
 
-        Timestamp timestamp = timestampProvider.apply(key, value);
+    private V removeInternal(K key, Optional<V> value) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
 
-        if (nonNull(removeInternal(key, timestamp))) {
-            notifyPeers(new RemoveEntry<>(key, timestamp),
-                        peerUpdateFunction.apply(key, value));
-            notifyListeners(new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.REMOVE, key, value));
+        MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null)));
+        AtomicBoolean updated = new AtomicBoolean(false);
+        AtomicReference<V> previousValue = new AtomicReference<>();
+        items.compute(key, (k, existing) -> {
+            if (existing != null && existing.isAlive()) {
+                updated.set(!value.isPresent() ||  value.get().equals(existing.get()));
+                previousValue.set(existing.get());
+            }
+            updated.set(existing == null || newValue.isNewerThan(existing));
+            return updated.get() ? newValue : existing;
+        });
+        if (updated.get()) {
+            notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get()));
+            notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
+            if (persistent) {
+                persistentStore.update(key, newValue);
+            }
+            return previousValue.get();
         }
+        return null;
     }
 
     @Override
@@ -448,30 +358,59 @@
     @Override
     public void clear() {
         checkState(!destroyed, destroyedMessage);
-        items.forEach((key, value) -> remove(key));
+        Maps.filterValues(items, MapValue::isAlive)
+            .forEach((k, v) -> remove(k));
     }
 
     @Override
     public Set<K> keySet() {
         checkState(!destroyed, destroyedMessage);
-        return items.keySet();
+        return Maps.filterValues(items, MapValue::isAlive)
+                   .keySet();
     }
 
     @Override
     public Collection<V> values() {
         checkState(!destroyed, destroyedMessage);
-        return items.values().stream()
-                .map(Timestamped::value)
-                .collect(Collectors.toList());
+        return Maps.filterValues(items, MapValue::isAlive)
+                   .values()
+                   .stream()
+                   .map(MapValue::get)
+                   .collect(Collectors.toList());
     }
 
     @Override
     public Set<Map.Entry<K, V>> entrySet() {
         checkState(!destroyed, destroyedMessage);
+        return Maps.filterValues(items, MapValue::isAlive)
+                   .entrySet()
+                   .stream()
+                   .map(e -> Pair.of(e.getKey(), e.getValue().get()))
+                   .collect(Collectors.toSet());
+    }
 
-        return items.entrySet().stream()
-                .map(e -> Pair.of(e.getKey(), e.getValue().value()))
-                .collect(Collectors.toSet());
+    /**
+     * Returns true if newValue was accepted i.e. map is updated.
+     * @param key key
+     * @param newValue proposed new value
+     * @return true if update happened; false if map already contains a more recent value for the key
+     */
+    private boolean updateInternal(K key, MapValue<V> newValue) {
+        AtomicBoolean updated = new AtomicBoolean(false);
+        items.compute(key, (k, existing) -> {
+            if (existing == null || newValue.isNewerThan(existing)) {
+                updated.set(true);
+                if (newValue.isTombstone()) {
+                    return tombstonesDisabled ? null : newValue;
+                }
+                return newValue;
+            }
+            return existing;
+        });
+        if (updated.get() && persistent) {
+            persistentStore.update(key, newValue);
+        }
+        return updated.get();
     }
 
     @Override
@@ -503,26 +442,20 @@
     }
 
     private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
-        for (EventuallyConsistentMapListener<K, V> listener : listeners) {
-            listener.event(event);
-        }
+        listeners.forEach(listener -> listener.event(event));
     }
 
-    private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
+    private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
         queueUpdate(event, peers);
     }
 
-    private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
-        queueUpdate(event, peers);
-    }
-
-    private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
+    private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
         if (peers == null) {
             // we have no friends :(
             return;
         }
         peers.forEach(node ->
-              senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+            senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
         );
     }
 
@@ -530,276 +463,107 @@
         return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
     }
 
-    private final class SendAdvertisementTask implements Runnable {
-        @Override
-        public void run() {
-            if (Thread.currentThread().isInterrupted()) {
-                log.info("Interrupted, quitting");
-                return;
-            }
-
+    private void sendAdvertisement() {
+        try {
             if (underHighLoad() || destroyed) {
                 return;
             }
-
-            try {
-                final NodeId self = clusterService.getLocalNode().id();
-                Set<ControllerNode> nodes = clusterService.getNodes();
-
-                List<NodeId> nodeIds = nodes.stream()
-                        .map(ControllerNode::id)
-                        .collect(Collectors.toList());
-
-                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
-                    log.trace("No other peers in the cluster.");
-                    return;
-                }
-
-                NodeId peer;
-                do {
-                    int idx = RandomUtils.nextInt(0, nodeIds.size());
-                    peer = nodeIds.get(idx);
-                } while (peer.equals(self));
-
-                if (Thread.currentThread().isInterrupted()) {
-                    log.info("Interrupted, quitting");
-                    return;
-                }
-
-                AntiEntropyAdvertisement<K> ad = createAdvertisement();
-                NodeId destination = peer;
-                clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
-                                   .whenComplete((result, error) -> {
-                                       if (error != null) {
-                                           log.debug("Failed to send anti-entropy advertisement to {}", destination);
-                                       }
-                                   });
-
-            } catch (Exception e) {
-                // Catch all exceptions to avoid scheduled task being suppressed.
-                log.error("Exception thrown while sending advertisement", e);
-            }
+            pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
+        } catch (Exception e) {
+            // Catch all exceptions to avoid scheduled task being suppressed.
+            log.error("Exception thrown while sending advertisement", e);
         }
     }
 
+    private Optional<NodeId> pickRandomActivePeer() {
+        List<NodeId> activePeers = clusterService.getNodes()
+                .stream()
+                .filter(node -> !localNodeId.equals(node))
+                 .map(ControllerNode::id)
+                .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
+                .collect(Collectors.toList());
+        Collections.shuffle(activePeers);
+        return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
+    }
+
+    private void sendAdvertisementToPeer(NodeId peer) {
+        clusterCommunicator.unicast(createAdvertisement(),
+                antiEntropyAdvertisementSubject,
+                serializer::encode,
+                peer)
+                .whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.warn("Failed to send anti-entropy advertisement to {}", peer);
+                    }
+                });
+    }
+
+
     private AntiEntropyAdvertisement<K> createAdvertisement() {
-        final NodeId self = clusterService.getLocalNode().id();
-
-        Map<K, Timestamp> timestamps = new HashMap<>(items.size());
-
-        items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
-
-        Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
-
-        return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
+        return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
     }
 
     private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
-        List<EventuallyConsistentMapEvent<K, V>> externalEvents;
+        if (destroyed || underHighLoad()) {
+            return;
+        }
+        try {
+            antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
 
-        externalEvents = antiEntropyCheckLocalItems(ad);
-
-        antiEntropyCheckLocalRemoved(ad);
-
-        if (!lightweightAntiEntropy) {
-            externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
-
-            // if remote ad has something unknown, actively sync
-            for (K key : ad.timestamps().keySet()) {
-                if (!items.containsKey(key)) {
+            if (!lightweightAntiEntropy) {
+                Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
+                // if remote ad has something unknown, actively sync
+                if (missingKeys.size() > 0) {
                     // Send the advertisement back if this peer is out-of-sync
-                    final NodeId sender = ad.sender();
-                    AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-
-                    clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
-                                       .whenComplete((result, error) -> {
-                                           if (error != null) {
-                                               log.debug("Failed to send reactive "
-                                                       + "anti-entropy advertisement to {}", sender);
-                                           }
-                                       });
-                    break;
+                    // TODO: Send ad for missing keys and for entries that are stale
+                    sendAdvertisementToPeer(ad.sender());
                 }
             }
+        } catch (Exception e) {
+            log.warn("Error handling anti-entropy advertisement", e);
         }
-        externalEvents.forEach(this::notifyListeners);
     }
 
     /**
-     * Checks if any of the remote's live items or tombstones are out of date
-     * according to our local live item list, or if our live items are out of
-     * date according to the remote's tombstone list.
-     * If the local copy is more recent, it will be pushed to the remote. If the
-     * remote has a more recent remove, we apply that to the local state.
-     *
-     * @param ad remote anti-entropy advertisement
-     * @return list of external events relating to local operations performed
+     * Processes anti-entropy ad from peer by taking following actions:
+     * 1. If peer has an old entry, updates peer.
+     * 2. If peer indicates an entry is removed and has a more recent
+     * timestamp than the local entry, update local state.
      */
     private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
             AntiEntropyAdvertisement<K> ad) {
-        final List<EventuallyConsistentMapEvent<K, V>> externalEvents
-                = new LinkedList<>();
+        final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
         final NodeId sender = ad.sender();
-
-        for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
-            K key = item.getKey();
-            Timestamped<V> localValue = item.getValue();
-
-            Timestamp remoteTimestamp = ad.timestamps().get(key);
-            if (remoteTimestamp == null) {
-                remoteTimestamp = ad.tombstones().get(key);
-            }
-            if (remoteTimestamp == null || localValue
-                    .isNewerThan(remoteTimestamp)) {
+        items.forEach((key, localValue) -> {
+            MapValue.Digest remoteValueDigest = ad.digest().get(key);
+            if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
                 // local value is more recent, push to sender
-                queueUpdate(new PutEntry<>(key, localValue.value(),
-                                            localValue.timestamp()), ImmutableList.of(sender));
-            }
-
-            Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
-            if (remoteDeadTimestamp != null &&
-                    remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
-                // sender has a more recent remove
-                if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
-                    externalEvents.add(new EventuallyConsistentMapEvent<>(
-                            EventuallyConsistentMapEvent.Type.REMOVE, key, null));
+                queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
+            } else {
+                if (remoteValueDigest.isTombstone()
+                        && remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
+                    if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
+                        externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
+                    }
                 }
             }
-        }
-
+        });
         return externalEvents;
     }
 
-    /**
-     * Checks if any items in the remote live list are out of date according
-     * to our tombstone list. If we find we have a more up to date tombstone,
-     * we'll send it to the remote.
-     *
-     * @param ad remote anti-entropy advertisement
-     */
-    private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
-        final NodeId sender = ad.sender();
-
-        for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
-            K key = dead.getKey();
-            Timestamp localDeadTimestamp = dead.getValue();
-
-            Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
-            if (remoteLiveTimestamp != null
-                    && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
-                // sender has zombie, push remove
-                queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
-            }
+    private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
+        if (destroyed) {
+            return;
         }
-    }
+        updates.forEach(update -> {
+            final K key = update.key();
+            final MapValue<V> value = update.value();
 
-    /**
-     * Checks if any of the local live items are out of date according to the
-     * remote's tombstone advertisements. If we find a local item is out of date,
-     * we'll apply the remove operation to the local state.
-     *
-     * @param ad remote anti-entropy advertisement
-     * @return list of external events relating to local operations performed
-     */
-    private List<EventuallyConsistentMapEvent<K, V>>
-    antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
-        final List<EventuallyConsistentMapEvent<K, V>> externalEvents
-                = new LinkedList<>();
-
-        for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
-            K key = remoteDead.getKey();
-            Timestamp remoteDeadTimestamp = remoteDead.getValue();
-
-            Timestamped<V> local = items.get(key);
-            Timestamp localDead = removedItems.get(key);
-            if (local != null && remoteDeadTimestamp.isNewerThan(
-                    local.timestamp())) {
-                // If the remote has a more recent tombstone than either our local
-                // value, then do a remove with their timestamp
-                if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
-                    externalEvents.add(new EventuallyConsistentMapEvent<>(
-                            EventuallyConsistentMapEvent.Type.REMOVE, key, null));
-                }
-            } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
-                    localDead)) {
-                // If the remote has a more recent tombstone than us, update ours
-                // to their timestamp
-                removeInternal(key, remoteDeadTimestamp);
+            if (updateInternal(key, value)) {
+                final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT;
+                notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get()));
             }
-        }
-
-        return externalEvents;
-    }
-
-    private final class InternalAntiEntropyListener
-            implements ClusterMessageHandler {
-
-        @Override
-        public void handle(ClusterMessage message) {
-            log.trace("Received anti-entropy advertisement from peer: {}",
-                      message.sender());
-            AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
-            try {
-                if (!underHighLoad()) {
-                    handleAntiEntropyAdvertisement(advertisement);
-                }
-            } catch (Exception e) {
-                log.warn("Exception thrown handling advertisements", e);
-            }
-        }
-    }
-
-    private final class InternalEventListener implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            if (destroyed) {
-                return;
-            }
-
-            log.debug("Received update event from peer: {}", message.sender());
-            Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
-
-            try {
-                // TODO clean this for loop up
-                for (AbstractEntry<K, V> entry : events) {
-                    final K key = entry.key();
-                    V value;
-                    final Timestamp timestamp = entry.timestamp();
-                    final EventuallyConsistentMapEvent.Type type;
-                    if (entry instanceof PutEntry) {
-                        PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
-                        value = putEntry.value();
-                        type = EventuallyConsistentMapEvent.Type.PUT;
-                    } else if (entry instanceof RemoveEntry) {
-                        type = EventuallyConsistentMapEvent.Type.REMOVE;
-                        value = null;
-                    } else {
-                        throw new IllegalStateException("Unknown entry type " + entry.getClass());
-                    }
-
-                    boolean success;
-                    switch (type) {
-                        case PUT:
-                            success = putInternal(key, value, timestamp);
-                            break;
-                        case REMOVE:
-                            Optional<V> removedValue = removeInternal(key, timestamp);
-                            success = removedValue != null;
-                            if (success) {
-                                value = removedValue.orElse(null);
-                            }
-                            break;
-                        default:
-                            success = false;
-                    }
-                    if (success) {
-                        notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
-                    }
-                }
-            } catch (Exception e) {
-                log.warn("Exception thrown handling put", e);
-            }
-        }
+        });
     }
 
     // TODO pull this into the class if this gets pulled out...
@@ -808,7 +572,7 @@
     private static final int DEFAULT_MAX_BATCH_MS = 50;
     private static final Timer TIMER = new Timer("onos-ecm-sender-events");
 
-    private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
+    private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
 
         private final NodeId peer;
 
@@ -818,23 +582,21 @@
         }
 
         @Override
-        public void processItems(List<AbstractEntry<K, V>> items) {
-            Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
-            items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
-                  oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
-                  )
-            );
+        public void processItems(List<UpdateEntry<K, V>> items) {
+            Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
+            items.forEach(item -> map.compute(item.key(), (key, existing) ->
+                    existing == null || item.compareTo(existing) > 0 ? item : existing));
             communicationExecutor.submit(() -> {
-                clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+                clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
                                             updateMessageSubject,
                                             serializer::encode,
                                             peer)
                                    .whenComplete((result, error) -> {
                                        if (error != null) {
-                                           log.debug("Failed to send to {}", peer);
+                                           log.debug("Failed to send to {}", peer, error);
                                        }
                                    });
             });
         }
     }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
index f803bb8..d1ada8f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
@@ -16,13 +16,10 @@
 
 package org.onosproject.store.ecmap;
 
-import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.mapdb.DB;
 import org.mapdb.DBMaker;
 import org.mapdb.Hasher;
 import org.mapdb.Serializer;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.serializers.KryoSerializer;
 
 import java.io.File;
@@ -42,7 +39,6 @@
     private final DB database;
 
     private final Map<byte[], byte[]> items;
-    private final Map<byte[], byte[]> tombstones;
 
     /**
      * Creates a new MapDB based persistent store.
@@ -65,102 +61,32 @@
                 .valueSerializer(Serializer.BYTE_ARRAY)
                 .hasher(Hasher.BYTE_ARRAY)
                 .makeOrGet();
-
-        tombstones = database.createHashMap("tombstones")
-                .keySerializer(Serializer.BYTE_ARRAY)
-                .valueSerializer(Serializer.BYTE_ARRAY)
-                .hasher(Hasher.BYTE_ARRAY)
-                .makeOrGet();
     }
 
     @Override
-    public void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones) {
+    public void readInto(Map<K, MapValue<V>> items) {
         this.items.forEach((keyBytes, valueBytes) ->
                               items.put(serializer.decode(keyBytes),
-                                               serializer.decode(valueBytes)));
-
-        this.tombstones.forEach((keyBytes, valueBytes) ->
-                                   tombstones.put(serializer.decode(keyBytes),
-                                                    serializer.decode(valueBytes)));
+                                        serializer.decode(valueBytes)));
     }
 
     @Override
-    public void put(K key, V value, Timestamp timestamp) {
-        executor.submit(() -> putInternal(key, value, timestamp));
+    public void update(K key, MapValue<V> value) {
+        executor.submit(() -> updateInternal(key, value));
     }
 
-    private void putInternal(K key, V value, Timestamp timestamp) {
+    private void updateInternal(K key, MapValue<V> newValue) {
         byte[] keyBytes = serializer.encode(key);
-        byte[] removedBytes = tombstones.get(keyBytes);
-
-        Timestamp removed = removedBytes == null ? null :
-                            serializer.decode(removedBytes);
-        if (removed != null && removed.isNewerThan(timestamp)) {
-            return;
-        }
-
-        final MutableBoolean updated = new MutableBoolean(false);
 
         items.compute(keyBytes, (k, existingBytes) -> {
-            Timestamped<V> existing = existingBytes == null ? null :
+            MapValue<V> existing = existingBytes == null ? null :
                                       serializer.decode(existingBytes);
-            if (existing != null && existing.isNewerThan(timestamp)) {
-                updated.setFalse();
-                return existingBytes;
+            if (existing == null || newValue.isNewerThan(existing)) {
+                return serializer.encode(newValue);
             } else {
-                updated.setTrue();
-                return serializer.encode(new Timestamped<>(value, timestamp));
+                return existingBytes;
             }
         });
-
-        boolean success = updated.booleanValue();
-
-        if (success && removed != null) {
-            tombstones.remove(keyBytes, removedBytes);
-        }
-
         database.commit();
     }
-
-    @Override
-    public void remove(K key, Timestamp timestamp) {
-        executor.submit(() -> removeInternal(key, timestamp));
-    }
-
-    private void removeInternal(K key, Timestamp timestamp) {
-        byte[] keyBytes = serializer.encode(key);
-
-        final MutableBoolean updated = new MutableBoolean(false);
-
-        items.compute(keyBytes, (k, existingBytes) -> {
-            Timestamp existing = existingBytes == null ? null :
-                                 serializer.decode(existingBytes);
-            if (existing != null && existing.isNewerThan(timestamp)) {
-                updated.setFalse();
-                return existingBytes;
-            } else {
-                updated.setTrue();
-                // remove from items map
-                return null;
-            }
-        });
-
-        if (!updated.booleanValue()) {
-            return;
-        }
-
-        byte[] timestampBytes = serializer.encode(timestamp);
-        byte[] removedBytes = tombstones.get(keyBytes);
-
-        Timestamp removedTimestamp = removedBytes == null ? null :
-                                     serializer.decode(removedBytes);
-        if (removedTimestamp == null) {
-            tombstones.putIfAbsent(keyBytes, timestampBytes);
-        } else if (timestamp.isNewerThan(removedTimestamp)) {
-            tombstones.replace(keyBytes, removedBytes, timestampBytes);
-        }
-
-        database.commit();
-    }
-
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java
new file mode 100644
index 0000000..9225561
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java
@@ -0,0 +1,99 @@
+package org.onosproject.store.ecmap;
+
+import org.onosproject.store.Timestamp;
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a value in EventuallyConsistentMap.
+ *
+ * @param <V> value type
+ */
+public class MapValue<V> implements Comparable<MapValue<V>> {
+    private final Timestamp timestamp;
+    private final V value;
+
+    public MapValue(V value, Timestamp timestamp) {
+        this.value = value;
+        this.timestamp = timestamp;
+    }
+
+    public boolean isTombstone() {
+        return value == null;
+    }
+
+    public boolean isAlive() {
+        return value != null;
+    }
+
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    public V get() {
+        return value;
+    }
+
+    @Override
+    public int compareTo(MapValue<V> o) {
+        return this.timestamp.compareTo(o.timestamp);
+    }
+
+    public boolean isNewerThan(MapValue<V> other) {
+        return timestamp.isNewerThan(other.timestamp);
+    }
+
+    public boolean isNewerThan(Timestamp timestamp) {
+        return timestamp.isNewerThan(timestamp);
+    }
+
+    public Digest digest() {
+        return new Digest(timestamp, isTombstone());
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("timestamp", timestamp)
+                .add("value", value)
+                .toString();
+    }
+
+    @SuppressWarnings("unused")
+    private MapValue() {
+        this.timestamp = null;
+        this.value = null;
+    }
+
+    /**
+     * Digest or summary of a MapValue for use during Anti-Entropy exchanges.
+     */
+    public static class Digest {
+        private final Timestamp timestamp;
+        private final boolean isTombstone;
+
+        public Digest(Timestamp timestamp, boolean isTombstone) {
+            this.timestamp = timestamp;
+            this.isTombstone = isTombstone;
+        }
+
+        public Timestamp timestamp() {
+            return timestamp;
+        }
+
+        public boolean isTombstone() {
+            return isTombstone;
+        }
+
+        public boolean isNewerThan(Digest other) {
+            return timestamp.isNewerThan(other.timestamp);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("timestamp", timestamp)
+                    .add("isTombstone", isTombstone)
+                    .toString();
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
index b945f93..302f7f8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
@@ -16,9 +16,6 @@
 
 package org.onosproject.store.ecmap;
 
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.impl.Timestamped;
-
 import java.util.Map;
 
 /**
@@ -30,24 +27,14 @@
      * Read the contents of the disk into the given maps.
      *
      * @param items items map
-     * @param tombstones tombstones map
      */
-    void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones);
+    void readInto(Map<K, MapValue<V>> items);
 
     /**
-     * Puts a new key,value pair into the map on disk.
+     * Updates a key,value pair in the persistent store.
      *
      * @param key the key
      * @param value the value
-     * @param timestamp the timestamp of the update
      */
-    void put(K key, V value, Timestamp timestamp);
-
-    /**
-     * Removes a key from the map on disk.
-     *
-     * @param key the key
-     * @param timestamp the timestamp of the update
-     */
-    void remove(K key, Timestamp timestamp);
+    void update(K key, MapValue<V> value);
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
deleted file mode 100644
index 18b0986..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.base.MoreObjects;
-import org.onosproject.store.Timestamp;
-
-/**
- * Describes a single remove event in an EventuallyConsistentMap.
- */
-final class RemoveEntry<K, V> extends AbstractEntry<K, V> {
-    /**
-     * Creates a new remove entry.
-     *
-     * @param key key of the entry
-     * @param timestamp timestamp of the remove event
-     */
-    public RemoveEntry(K key, Timestamp timestamp) {
-        super(key, timestamp);
-    }
-
-    // Needed for serialization.
-    @SuppressWarnings("unused")
-    private RemoveEntry() {
-        super();
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("key", key())
-                .add("timestamp", timestamp())
-                .toString();
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java
similarity index 66%
rename from core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
rename to core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java
index ddb4ae9..41eb3a2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java
@@ -15,34 +15,35 @@
  */
 package org.onosproject.store.ecmap;
 
-import com.google.common.base.MoreObjects;
-import org.onosproject.store.Timestamp;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.MoreObjects;
+
 /**
- * Describes a single put event in an EventuallyConsistentMap.
+ * Describes a single update event in an EventuallyConsistentMap.
  */
-final class PutEntry<K, V> extends AbstractEntry<K, V> {
-    private final V value;
+final class UpdateEntry<K, V> implements Comparable<UpdateEntry<K, V>> {
+    private final K key;
+    private final MapValue<V> value;
 
     /**
-     * Creates a new put entry.
+     * Creates a new update entry.
      *
      * @param key key of the entry
      * @param value value of the entry
-     * @param timestamp timestamp of the put event
      */
-    public PutEntry(K key, V value, Timestamp timestamp) {
-        super(key, timestamp);
+    public UpdateEntry(K key, MapValue<V> value) {
+        this.key = checkNotNull(key);
         this.value = checkNotNull(value);
     }
 
-    // Needed for serialization.
-    @SuppressWarnings("unused")
-    private PutEntry() {
-        super();
-        this.value = null;
+    /**
+     * Returns the key.
+     *
+     * @return the key
+     */
+    public K key() {
+        return key;
     }
 
     /**
@@ -50,16 +51,26 @@
      *
      * @return the value
      */
-    public V value() {
+    public MapValue<V> value() {
         return value;
     }
 
     @Override
+    public int compareTo(UpdateEntry<K, V> o) {
+        return this.value.timestamp().compareTo(o.value.timestamp());
+    }
+
+    @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                 .add("key", key())
                 .add("value", value)
-                .add("timestamp", timestamp())
                 .toString();
     }
+
+    @SuppressWarnings("unused")
+    private UpdateEntry() {
+        this.key = null;
+        this.value = null;
+    }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 28be8dc..57943ad 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -16,8 +16,8 @@
 package org.onosproject.store.ecmap;
 
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import org.junit.After;
@@ -32,7 +32,6 @@
 import org.onosproject.event.AbstractEvent;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.LogicalTimestamp;
@@ -44,11 +43,13 @@
 import org.onosproject.store.service.EventuallyConsistentMapListener;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -89,8 +90,8 @@
     private final ControllerNode self =
             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
 
-    private ClusterMessageHandler updateHandler;
-    private ClusterMessageHandler antiEntropyHandler;
+    private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
+    private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
 
     /*
      * Serialization is a bit tricky here. We need to serialize in the tests
@@ -109,11 +110,10 @@
                     // Below is the classes that the map internally registers
                     .register(LogicalTimestamp.class)
                     .register(WallClockTimestamp.class)
-                    .register(PutEntry.class)
-                    .register(RemoveEntry.class)
                     .register(ArrayList.class)
                     .register(AntiEntropyAdvertisement.class)
                     .register(HashMap.class)
+                    .register(Optional.class)
                     .build();
         }
     };
@@ -131,9 +131,9 @@
         // delegate to our ClusterCommunicationService implementation. This
         // allows us to get a reference to the map's internal cluster message
         // handlers so we can induce events coming in from a peer.
-        clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
-                anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
-        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
+        clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
+                anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
 
         replay(clusterCommunicator);
 
@@ -237,15 +237,15 @@
         assertEquals(VALUE1, ecMap.get(KEY1));
 
         // Remote put
-        ClusterMessage message
-                = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
+        List<UpdateEntry<String, String>> message
+                = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
 
         // Create a latch so we know when the put operation has finished
         latch = new CountDownLatch(1);
         ecMap.addListener(new TestListener(latch));
 
         assertNull(ecMap.get(KEY2));
-        updateHandler.handle(message);
+        updateHandler.accept(message);
         assertTrue("External listener never got notified of internal event",
                    latch.await(100, TimeUnit.MILLISECONDS));
         assertEquals(VALUE2, ecMap.get(KEY2));
@@ -255,14 +255,13 @@
         assertNull(ecMap.get(KEY2));
 
         // Remote remove
-        ClusterMessage removeMessage
-                = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
+        message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
 
         // Create a latch so we know when the remove operation has finished
         latch = new CountDownLatch(1);
         ecMap.addListener(new TestListener(latch));
 
-        updateHandler.handle(removeMessage);
+        updateHandler.accept(message);
         assertTrue("External listener never got notified of internal event",
                    latch.await(100, TimeUnit.MILLISECONDS));
         assertNull(ecMap.get(KEY1));
@@ -601,49 +600,35 @@
         }
     }
 
-    private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
-        PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
-
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(Lists.newArrayList(event)));
+    private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
+        return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
     }
 
-    private List<PutEntry<String, String>> generatePutMessage(
+    private List<UpdateEntry<String, String>> generatePutMessage(
             String key1, String value1, String key2, String value2) {
-        ArrayList<PutEntry<String, String>> list = new ArrayList<>();
+        List<UpdateEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
         Timestamp timestamp2 = clockService.peek(2);
 
-        PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
-        PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
-
-        list.add(pe1);
-        list.add(pe2);
+        list.add(generatePutMessage(key1, value1, timestamp1));
+        list.add(generatePutMessage(key2, value2, timestamp2));
 
         return list;
     }
 
-    private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
-        RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
-
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(Lists.newArrayList(event)));
+    private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
+        return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
     }
 
-    private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
-        ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
+    private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
+        List<UpdateEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
         Timestamp timestamp2 = clockService.peek(2);
 
-        RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
-        RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
-
-        list.add(re1);
-        list.add(re2);
+        list.add(generateRemoveMessage(key1, timestamp1));
+        list.add(generateRemoveMessage(key2, timestamp2));
 
         return list;
     }
@@ -737,13 +722,6 @@
         public void addSubscriber(MessageSubject subject,
                                   ClusterMessageHandler subscriber,
                                   ExecutorService executor) {
-            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
-                updateHandler = subscriber;
-            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
-                antiEntropyHandler = subscriber;
-            } else {
-                throw new RuntimeException("Unexpected message subject " + subject.toString());
-            }
         }
 
         @Override
@@ -793,6 +771,13 @@
         public <M> void addSubscriber(MessageSubject subject,
                 Function<byte[], M> decoder, Consumer<M> handler,
                 Executor executor) {
+            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+                updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
+            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+                antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
+            } else {
+                throw new RuntimeException("Unexpected message subject " + subject.toString());
+            }
         }
     }