Simplified ECMap implmentation by merging items and tombstones maps

Change-Id: If4253722d91c35a7e57dec3c2fceb216d14a7314
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 118ef78..1fd27d3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -18,9 +18,8 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.AbstractAccumulator;
 import org.onlab.util.KryoNamespace;
@@ -30,12 +29,10 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.LogicalTimestamp;
-import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
@@ -43,30 +40,27 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.Timer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.isNull;
-import static java.util.Objects.nonNull;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
@@ -80,12 +74,12 @@
 
     private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
 
-    private final ConcurrentMap<K, Timestamped<V>> items;
-    private final ConcurrentMap<K, Timestamp> removedItems;
+    private final Map<K, MapValue<V>> items;
 
     private final ClusterService clusterService;
     private final ClusterCommunicationService clusterCommunicator;
     private final KryoSerializer serializer;
+    private final NodeId localNodeId;
 
     private final BiFunction<K, V, Timestamp> timestampProvider;
 
@@ -93,7 +87,7 @@
     private final MessageSubject antiEntropyAdvertisementSubject;
 
     private final Set<EventuallyConsistentMapListener<K, V>> listeners
-            = new CopyOnWriteArraySet<>();
+            = Sets.newCopyOnWriteArraySet();
 
     private final ExecutorService executor;
     private final ScheduledExecutorService backgroundExecutor;
@@ -162,13 +156,13 @@
                                 TimeUnit antiEntropyTimeUnit,
                                 boolean convergeFaster,
                                 boolean persistent) {
-        items = new ConcurrentHashMap<>();
-        removedItems = new ConcurrentHashMap<>();
+        items = Maps.newConcurrentMap();
         senderPending = Maps.newConcurrentMap();
         destroyedMessage = mapName + ERROR_DESTROYED;
 
         this.clusterService = clusterService;
         this.clusterCommunicator = clusterCommunicator;
+        this.localNodeId = clusterService.getLocalNode().id();
 
         this.serializer = createSerializer(serializerBuilder);
 
@@ -179,7 +173,7 @@
         } else {
             this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
                     .map(ControllerNode::id)
-                    .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+                    .filter(nodeId -> !nodeId.equals(localNodeId))
                     .collect(Collectors.toList());
         }
 
@@ -210,7 +204,7 @@
                     newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
 
             persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
-            persistentStore.readInto(items, removedItems);
+            persistentStore.readInto(items);
         } else {
             this.persistentStore = null;
         }
@@ -223,17 +217,21 @@
         }
 
         // start anti-entropy thread
-        this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+        this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
                                                     initialDelaySec, antiEntropyPeriod,
                                                     antiEntropyTimeUnit);
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalEventListener(), this.executor);
+                                          serializer::decode,
+                                          this::processUpdates,
+                                          this.executor);
 
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
-                                          new InternalAntiEntropyListener(), this.backgroundExecutor);
+                                          serializer::decode,
+                                          this::handleAntiEntropyAdvertisement,
+                                          this.backgroundExecutor);
 
         this.tombstonesDisabled = tombstonesDisabled;
         this.lightweightAntiEntropy = !convergeFaster;
@@ -245,14 +243,13 @@
             protected void setupKryoPool() {
                 // Add the map's internal helper classes to the user-supplied serializer
                 serializerPool = builder
+                        .register(KryoNamespaces.BASIC)
                         .register(LogicalTimestamp.class)
                         .register(WallClockTimestamp.class)
-                        .register(PutEntry.class)
-                        .register(RemoveEntry.class)
-                        .register(ArrayList.class)
                         .register(AntiEntropyAdvertisement.class)
-                        .register(HashMap.class)
-                        .register(Timestamped.class)
+                        .register(UpdateEntry.class)
+                        .register(MapValue.class)
+                        .register(MapValue.Digest.class)
                         .build();
             }
         };
@@ -261,29 +258,31 @@
     @Override
     public int size() {
         checkState(!destroyed, destroyedMessage);
-        return items.size();
+        // TODO: Maintain a separate counter for tracking live elements in map.
+        return Maps.filterValues(items, MapValue::isAlive).size();
     }
 
     @Override
     public boolean isEmpty() {
         checkState(!destroyed, destroyedMessage);
-        return items.isEmpty();
+        return size() == 0;
     }
 
     @Override
     public boolean containsKey(K key) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
-        return items.containsKey(key);
+        return get(key) != null;
     }
 
     @Override
     public boolean containsValue(V value) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(value, ERROR_NULL_VALUE);
-
-        return items.values().stream()
-                .anyMatch(timestamped -> timestamped.value().equals(value));
+        return items.values()
+                    .stream()
+                    .filter(MapValue::isAlive)
+                    .anyMatch(v -> v.get().equals(value));
     }
 
     @Override
@@ -291,11 +290,8 @@
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
 
-        Timestamped<V> value = items.get(key);
-        if (value != null) {
-            return value.value();
-        }
-        return null;
+        MapValue<V> value = items.get(key);
+        return (value == null || value.isTombstone()) ? null : value.get();
     }
 
     @Override
@@ -304,123 +300,18 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
 
-        Timestamp timestamp = timestampProvider.apply(key, value);
-
-        if (putInternal(key, value, timestamp)) {
-            notifyPeers(new PutEntry<>(key, value, timestamp),
-                        peerUpdateFunction.apply(key, value));
-            notifyListeners(new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.PUT, key, value));
+        MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
+        if (updateInternal(key, newValue)) {
+            notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
         }
     }
 
-    private boolean putInternal(K key, V value, Timestamp timestamp) {
-        counter.incrementCount();
-        Timestamp removed = removedItems.get(key);
-        if (removed != null && removed.isNewerThan(timestamp)) {
-            log.debug("ecmap - removed was newer {}", value);
-            return false;
-        }
-
-        final MutableBoolean updated = new MutableBoolean(false);
-
-        items.compute(key, (k, existing) -> {
-            if (existing != null && existing.isNewerThan(timestamp)) {
-                updated.setFalse();
-                return existing;
-            } else {
-                updated.setTrue();
-                return new Timestamped<>(value, timestamp);
-            }
-            });
-
-        boolean success = updated.booleanValue();
-        if (!success) {
-            log.debug("ecmap - existing was newer {}", value);
-        }
-
-        if (success && removed != null) {
-            removedItems.remove(key, removed);
-        }
-
-        if (success && persistent) {
-            persistentStore.put(key, value, timestamp);
-        }
-
-        return success;
-    }
-
     @Override
     public V remove(K key) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
-
-        // TODO prevent calls here if value is important for timestamp
-        Timestamp timestamp = timestampProvider.apply(key, null);
-
-        Optional<V> removedValue = removeInternal(key, timestamp);
-        if (removedValue == null) {
-            return null;
-        }
-        notifyPeers(new RemoveEntry<>(key, timestamp),
-                peerUpdateFunction.apply(key, null));
-        notifyListeners(new EventuallyConsistentMapEvent<>(
-                EventuallyConsistentMapEvent.Type.REMOVE, key, removedValue.orElse(null)));
-
-        return removedValue.orElse(null);
-    }
-
-    /**
-     * Returns null if the timestamp is for a outdated request i.e.
-     * the value is the map is more recent or a tombstone exists with a
-     * more recent timestamp.
-     * Returns non-empty optional if a value was indeed removed from the map.
-     * Returns empty optional if map did not contain a value for the key but the existing
-     * tombstone is older than this timestamp.
-     * @param key key
-     * @param timestamp timestamp for remove request
-     * @return Optional value.
-     */
-    private Optional<V> removeInternal(K key, Timestamp timestamp) {
-        if (timestamp == null) {
-            return null;
-        }
-
-        counter.incrementCount();
-        final AtomicReference<Optional<V>> removedValue = new AtomicReference<>(null);
-        items.compute(key, (k, existing) -> {
-            if (existing != null && existing.isNewerThan(timestamp)) {
-                return existing;
-            } else {
-                removedValue.set(existing == null ? Optional.empty() : Optional.of(existing.value()));
-                return null;
-            }
-        });
-
-        if (isNull(removedValue.get())) {
-            return null;
-        }
-
-        boolean updatedTombstone = false;
-
-        if (!tombstonesDisabled) {
-            Timestamp removedTimestamp = removedItems.get(key);
-            if (removedTimestamp == null) {
-                //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
-                updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
-            } else if (timestamp.isNewerThan(removedTimestamp)) {
-                updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
-            }
-        }
-
-        if (persistent) {
-            persistentStore.remove(key, timestamp);
-        }
-
-        if (tombstonesDisabled || updatedTombstone) {
-            return removedValue.get();
-        }
-        return null;
+        return removeInternal(key, Optional.empty());
     }
 
     @Override
@@ -428,15 +319,34 @@
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
+        removeInternal(key, Optional.of(value));
+    }
 
-        Timestamp timestamp = timestampProvider.apply(key, value);
+    private V removeInternal(K key, Optional<V> value) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
 
-        if (nonNull(removeInternal(key, timestamp))) {
-            notifyPeers(new RemoveEntry<>(key, timestamp),
-                        peerUpdateFunction.apply(key, value));
-            notifyListeners(new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.REMOVE, key, value));
+        MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null)));
+        AtomicBoolean updated = new AtomicBoolean(false);
+        AtomicReference<V> previousValue = new AtomicReference<>();
+        items.compute(key, (k, existing) -> {
+            if (existing != null && existing.isAlive()) {
+                updated.set(!value.isPresent() ||  value.get().equals(existing.get()));
+                previousValue.set(existing.get());
+            }
+            updated.set(existing == null || newValue.isNewerThan(existing));
+            return updated.get() ? newValue : existing;
+        });
+        if (updated.get()) {
+            notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get()));
+            notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
+            if (persistent) {
+                persistentStore.update(key, newValue);
+            }
+            return previousValue.get();
         }
+        return null;
     }
 
     @Override
@@ -448,30 +358,59 @@
     @Override
     public void clear() {
         checkState(!destroyed, destroyedMessage);
-        items.forEach((key, value) -> remove(key));
+        Maps.filterValues(items, MapValue::isAlive)
+            .forEach((k, v) -> remove(k));
     }
 
     @Override
     public Set<K> keySet() {
         checkState(!destroyed, destroyedMessage);
-        return items.keySet();
+        return Maps.filterValues(items, MapValue::isAlive)
+                   .keySet();
     }
 
     @Override
     public Collection<V> values() {
         checkState(!destroyed, destroyedMessage);
-        return items.values().stream()
-                .map(Timestamped::value)
-                .collect(Collectors.toList());
+        return Maps.filterValues(items, MapValue::isAlive)
+                   .values()
+                   .stream()
+                   .map(MapValue::get)
+                   .collect(Collectors.toList());
     }
 
     @Override
     public Set<Map.Entry<K, V>> entrySet() {
         checkState(!destroyed, destroyedMessage);
+        return Maps.filterValues(items, MapValue::isAlive)
+                   .entrySet()
+                   .stream()
+                   .map(e -> Pair.of(e.getKey(), e.getValue().get()))
+                   .collect(Collectors.toSet());
+    }
 
-        return items.entrySet().stream()
-                .map(e -> Pair.of(e.getKey(), e.getValue().value()))
-                .collect(Collectors.toSet());
+    /**
+     * Returns true if newValue was accepted i.e. map is updated.
+     * @param key key
+     * @param newValue proposed new value
+     * @return true if update happened; false if map already contains a more recent value for the key
+     */
+    private boolean updateInternal(K key, MapValue<V> newValue) {
+        AtomicBoolean updated = new AtomicBoolean(false);
+        items.compute(key, (k, existing) -> {
+            if (existing == null || newValue.isNewerThan(existing)) {
+                updated.set(true);
+                if (newValue.isTombstone()) {
+                    return tombstonesDisabled ? null : newValue;
+                }
+                return newValue;
+            }
+            return existing;
+        });
+        if (updated.get() && persistent) {
+            persistentStore.update(key, newValue);
+        }
+        return updated.get();
     }
 
     @Override
@@ -503,26 +442,20 @@
     }
 
     private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
-        for (EventuallyConsistentMapListener<K, V> listener : listeners) {
-            listener.event(event);
-        }
+        listeners.forEach(listener -> listener.event(event));
     }
 
-    private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
+    private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
         queueUpdate(event, peers);
     }
 
-    private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
-        queueUpdate(event, peers);
-    }
-
-    private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
+    private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
         if (peers == null) {
             // we have no friends :(
             return;
         }
         peers.forEach(node ->
-              senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+            senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
         );
     }
 
@@ -530,276 +463,107 @@
         return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
     }
 
-    private final class SendAdvertisementTask implements Runnable {
-        @Override
-        public void run() {
-            if (Thread.currentThread().isInterrupted()) {
-                log.info("Interrupted, quitting");
-                return;
-            }
-
+    private void sendAdvertisement() {
+        try {
             if (underHighLoad() || destroyed) {
                 return;
             }
-
-            try {
-                final NodeId self = clusterService.getLocalNode().id();
-                Set<ControllerNode> nodes = clusterService.getNodes();
-
-                List<NodeId> nodeIds = nodes.stream()
-                        .map(ControllerNode::id)
-                        .collect(Collectors.toList());
-
-                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
-                    log.trace("No other peers in the cluster.");
-                    return;
-                }
-
-                NodeId peer;
-                do {
-                    int idx = RandomUtils.nextInt(0, nodeIds.size());
-                    peer = nodeIds.get(idx);
-                } while (peer.equals(self));
-
-                if (Thread.currentThread().isInterrupted()) {
-                    log.info("Interrupted, quitting");
-                    return;
-                }
-
-                AntiEntropyAdvertisement<K> ad = createAdvertisement();
-                NodeId destination = peer;
-                clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
-                                   .whenComplete((result, error) -> {
-                                       if (error != null) {
-                                           log.debug("Failed to send anti-entropy advertisement to {}", destination);
-                                       }
-                                   });
-
-            } catch (Exception e) {
-                // Catch all exceptions to avoid scheduled task being suppressed.
-                log.error("Exception thrown while sending advertisement", e);
-            }
+            pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
+        } catch (Exception e) {
+            // Catch all exceptions to avoid scheduled task being suppressed.
+            log.error("Exception thrown while sending advertisement", e);
         }
     }
 
+    private Optional<NodeId> pickRandomActivePeer() {
+        List<NodeId> activePeers = clusterService.getNodes()
+                .stream()
+                .filter(node -> !localNodeId.equals(node))
+                 .map(ControllerNode::id)
+                .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
+                .collect(Collectors.toList());
+        Collections.shuffle(activePeers);
+        return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
+    }
+
+    private void sendAdvertisementToPeer(NodeId peer) {
+        clusterCommunicator.unicast(createAdvertisement(),
+                antiEntropyAdvertisementSubject,
+                serializer::encode,
+                peer)
+                .whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.warn("Failed to send anti-entropy advertisement to {}", peer);
+                    }
+                });
+    }
+
+
     private AntiEntropyAdvertisement<K> createAdvertisement() {
-        final NodeId self = clusterService.getLocalNode().id();
-
-        Map<K, Timestamp> timestamps = new HashMap<>(items.size());
-
-        items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
-
-        Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
-
-        return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
+        return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
     }
 
     private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
-        List<EventuallyConsistentMapEvent<K, V>> externalEvents;
+        if (destroyed || underHighLoad()) {
+            return;
+        }
+        try {
+            antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
 
-        externalEvents = antiEntropyCheckLocalItems(ad);
-
-        antiEntropyCheckLocalRemoved(ad);
-
-        if (!lightweightAntiEntropy) {
-            externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
-
-            // if remote ad has something unknown, actively sync
-            for (K key : ad.timestamps().keySet()) {
-                if (!items.containsKey(key)) {
+            if (!lightweightAntiEntropy) {
+                Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
+                // if remote ad has something unknown, actively sync
+                if (missingKeys.size() > 0) {
                     // Send the advertisement back if this peer is out-of-sync
-                    final NodeId sender = ad.sender();
-                    AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-
-                    clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
-                                       .whenComplete((result, error) -> {
-                                           if (error != null) {
-                                               log.debug("Failed to send reactive "
-                                                       + "anti-entropy advertisement to {}", sender);
-                                           }
-                                       });
-                    break;
+                    // TODO: Send ad for missing keys and for entries that are stale
+                    sendAdvertisementToPeer(ad.sender());
                 }
             }
+        } catch (Exception e) {
+            log.warn("Error handling anti-entropy advertisement", e);
         }
-        externalEvents.forEach(this::notifyListeners);
     }
 
     /**
-     * Checks if any of the remote's live items or tombstones are out of date
-     * according to our local live item list, or if our live items are out of
-     * date according to the remote's tombstone list.
-     * If the local copy is more recent, it will be pushed to the remote. If the
-     * remote has a more recent remove, we apply that to the local state.
-     *
-     * @param ad remote anti-entropy advertisement
-     * @return list of external events relating to local operations performed
+     * Processes anti-entropy ad from peer by taking following actions:
+     * 1. If peer has an old entry, updates peer.
+     * 2. If peer indicates an entry is removed and has a more recent
+     * timestamp than the local entry, update local state.
      */
     private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
             AntiEntropyAdvertisement<K> ad) {
-        final List<EventuallyConsistentMapEvent<K, V>> externalEvents
-                = new LinkedList<>();
+        final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
         final NodeId sender = ad.sender();
-
-        for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
-            K key = item.getKey();
-            Timestamped<V> localValue = item.getValue();
-
-            Timestamp remoteTimestamp = ad.timestamps().get(key);
-            if (remoteTimestamp == null) {
-                remoteTimestamp = ad.tombstones().get(key);
-            }
-            if (remoteTimestamp == null || localValue
-                    .isNewerThan(remoteTimestamp)) {
+        items.forEach((key, localValue) -> {
+            MapValue.Digest remoteValueDigest = ad.digest().get(key);
+            if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
                 // local value is more recent, push to sender
-                queueUpdate(new PutEntry<>(key, localValue.value(),
-                                            localValue.timestamp()), ImmutableList.of(sender));
-            }
-
-            Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
-            if (remoteDeadTimestamp != null &&
-                    remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
-                // sender has a more recent remove
-                if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
-                    externalEvents.add(new EventuallyConsistentMapEvent<>(
-                            EventuallyConsistentMapEvent.Type.REMOVE, key, null));
+                queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
+            } else {
+                if (remoteValueDigest.isTombstone()
+                        && remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
+                    if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
+                        externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
+                    }
                 }
             }
-        }
-
+        });
         return externalEvents;
     }
 
-    /**
-     * Checks if any items in the remote live list are out of date according
-     * to our tombstone list. If we find we have a more up to date tombstone,
-     * we'll send it to the remote.
-     *
-     * @param ad remote anti-entropy advertisement
-     */
-    private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
-        final NodeId sender = ad.sender();
-
-        for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
-            K key = dead.getKey();
-            Timestamp localDeadTimestamp = dead.getValue();
-
-            Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
-            if (remoteLiveTimestamp != null
-                    && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
-                // sender has zombie, push remove
-                queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
-            }
+    private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
+        if (destroyed) {
+            return;
         }
-    }
+        updates.forEach(update -> {
+            final K key = update.key();
+            final MapValue<V> value = update.value();
 
-    /**
-     * Checks if any of the local live items are out of date according to the
-     * remote's tombstone advertisements. If we find a local item is out of date,
-     * we'll apply the remove operation to the local state.
-     *
-     * @param ad remote anti-entropy advertisement
-     * @return list of external events relating to local operations performed
-     */
-    private List<EventuallyConsistentMapEvent<K, V>>
-    antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
-        final List<EventuallyConsistentMapEvent<K, V>> externalEvents
-                = new LinkedList<>();
-
-        for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
-            K key = remoteDead.getKey();
-            Timestamp remoteDeadTimestamp = remoteDead.getValue();
-
-            Timestamped<V> local = items.get(key);
-            Timestamp localDead = removedItems.get(key);
-            if (local != null && remoteDeadTimestamp.isNewerThan(
-                    local.timestamp())) {
-                // If the remote has a more recent tombstone than either our local
-                // value, then do a remove with their timestamp
-                if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
-                    externalEvents.add(new EventuallyConsistentMapEvent<>(
-                            EventuallyConsistentMapEvent.Type.REMOVE, key, null));
-                }
-            } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
-                    localDead)) {
-                // If the remote has a more recent tombstone than us, update ours
-                // to their timestamp
-                removeInternal(key, remoteDeadTimestamp);
+            if (updateInternal(key, value)) {
+                final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT;
+                notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get()));
             }
-        }
-
-        return externalEvents;
-    }
-
-    private final class InternalAntiEntropyListener
-            implements ClusterMessageHandler {
-
-        @Override
-        public void handle(ClusterMessage message) {
-            log.trace("Received anti-entropy advertisement from peer: {}",
-                      message.sender());
-            AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
-            try {
-                if (!underHighLoad()) {
-                    handleAntiEntropyAdvertisement(advertisement);
-                }
-            } catch (Exception e) {
-                log.warn("Exception thrown handling advertisements", e);
-            }
-        }
-    }
-
-    private final class InternalEventListener implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            if (destroyed) {
-                return;
-            }
-
-            log.debug("Received update event from peer: {}", message.sender());
-            Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
-
-            try {
-                // TODO clean this for loop up
-                for (AbstractEntry<K, V> entry : events) {
-                    final K key = entry.key();
-                    V value;
-                    final Timestamp timestamp = entry.timestamp();
-                    final EventuallyConsistentMapEvent.Type type;
-                    if (entry instanceof PutEntry) {
-                        PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
-                        value = putEntry.value();
-                        type = EventuallyConsistentMapEvent.Type.PUT;
-                    } else if (entry instanceof RemoveEntry) {
-                        type = EventuallyConsistentMapEvent.Type.REMOVE;
-                        value = null;
-                    } else {
-                        throw new IllegalStateException("Unknown entry type " + entry.getClass());
-                    }
-
-                    boolean success;
-                    switch (type) {
-                        case PUT:
-                            success = putInternal(key, value, timestamp);
-                            break;
-                        case REMOVE:
-                            Optional<V> removedValue = removeInternal(key, timestamp);
-                            success = removedValue != null;
-                            if (success) {
-                                value = removedValue.orElse(null);
-                            }
-                            break;
-                        default:
-                            success = false;
-                    }
-                    if (success) {
-                        notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
-                    }
-                }
-            } catch (Exception e) {
-                log.warn("Exception thrown handling put", e);
-            }
-        }
+        });
     }
 
     // TODO pull this into the class if this gets pulled out...
@@ -808,7 +572,7 @@
     private static final int DEFAULT_MAX_BATCH_MS = 50;
     private static final Timer TIMER = new Timer("onos-ecm-sender-events");
 
-    private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
+    private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
 
         private final NodeId peer;
 
@@ -818,23 +582,21 @@
         }
 
         @Override
-        public void processItems(List<AbstractEntry<K, V>> items) {
-            Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
-            items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
-                  oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
-                  )
-            );
+        public void processItems(List<UpdateEntry<K, V>> items) {
+            Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
+            items.forEach(item -> map.compute(item.key(), (key, existing) ->
+                    existing == null || item.compareTo(existing) > 0 ? item : existing));
             communicationExecutor.submit(() -> {
-                clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+                clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
                                             updateMessageSubject,
                                             serializer::encode,
                                             peer)
                                    .whenComplete((result, error) -> {
                                        if (error != null) {
-                                           log.debug("Failed to send to {}", peer);
+                                           log.debug("Failed to send to {}", peer, error);
                                        }
                                    });
             });
         }
     }
-}
+}
\ No newline at end of file