Refactor change: Moved all distributed primitive implementation classes into onos-core-primitives bundle

Change-Id: Icd5dbd4133cb2f21bd403bcd598e6012813e6bfd
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
new file mode 100644
index 0000000..f595e6a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -0,0 +1,685 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SlidingWindowCounter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed Map implementation which uses optimistic replication and gossip
+ * based techniques to provide an eventually consistent data store.
+ */
+public class EventuallyConsistentMapImpl<K, V>
+        implements EventuallyConsistentMap<K, V> {
+
+    private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
+
+    private final Map<K, MapValue<V>> items;
+
+    private final ClusterService clusterService;
+    private final ClusterCommunicationService clusterCommunicator;
+    private final KryoSerializer serializer;
+    private final NodeId localNodeId;
+    private final PersistenceService persistenceService;
+
+    private final BiFunction<K, V, Timestamp> timestampProvider;
+
+    private final MessageSubject updateMessageSubject;
+    private final MessageSubject antiEntropyAdvertisementSubject;
+
+    private final Set<EventuallyConsistentMapListener<K, V>> listeners
+            = Sets.newCopyOnWriteArraySet();
+
+    private final ExecutorService executor;
+    private final ScheduledExecutorService backgroundExecutor;
+    private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
+
+    private final ExecutorService communicationExecutor;
+    private final Map<NodeId, EventAccumulator> senderPending;
+
+    private final String mapName;
+
+    private volatile boolean destroyed = false;
+    private static final String ERROR_DESTROYED = " map is already destroyed";
+    private final String destroyedMessage;
+
+    private static final String ERROR_NULL_KEY = "Key cannot be null";
+    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+
+    private final long initialDelaySec = 5;
+    private final boolean lightweightAntiEntropy;
+    private final boolean tombstonesDisabled;
+
+    private static final int WINDOW_SIZE = 5;
+    private static final int HIGH_LOAD_THRESHOLD = 0;
+    private static final int LOAD_WINDOW = 2;
+    private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
+
+    private final boolean persistent;
+
+    private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
+
+
+    /**
+     * Creates a new eventually consistent map shared amongst multiple instances.
+     * <p>
+     * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
+     * for more description of the parameters expected by the map.
+     * </p>
+     *
+     * @param mapName               a String identifier for the map.
+     * @param clusterService        the cluster service
+     * @param clusterCommunicator   the cluster communications service
+     * @param serializerBuilder     a Kryo namespace builder that can serialize
+     *                              both K and V
+     * @param timestampProvider     provider of timestamps for K and V
+     * @param peerUpdateFunction    function that provides a set of nodes to immediately
+     *                              update to when there writes to the map
+     * @param eventExecutor         executor to use for processing incoming
+     *                              events from peers
+     * @param communicationExecutor executor to use for sending events to peers
+     * @param backgroundExecutor    executor to use for background anti-entropy
+     *                              tasks
+     * @param tombstonesDisabled    true if this map should not maintain
+     *                              tombstones
+     * @param antiEntropyPeriod     period that the anti-entropy task should run
+     * @param antiEntropyTimeUnit   time unit for anti-entropy period
+     * @param convergeFaster        make anti-entropy try to converge faster
+     * @param persistent            persist data to disk
+     */
+    EventuallyConsistentMapImpl(String mapName,
+                                ClusterService clusterService,
+                                ClusterCommunicationService clusterCommunicator,
+                                KryoNamespace.Builder serializerBuilder,
+                                BiFunction<K, V, Timestamp> timestampProvider,
+                                BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+                                ExecutorService eventExecutor,
+                                ExecutorService communicationExecutor,
+                                ScheduledExecutorService backgroundExecutor,
+                                boolean tombstonesDisabled,
+                                long antiEntropyPeriod,
+                                TimeUnit antiEntropyTimeUnit,
+                                boolean convergeFaster,
+                                boolean persistent,
+                                PersistenceService persistenceService) {
+        this.mapName = mapName;
+        this.serializer = createSerializer(serializerBuilder);
+        this.persistenceService = persistenceService;
+        this.persistent =
+                persistent;
+        if (persistent) {
+            items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
+                    .withName(PERSISTENT_LOCAL_MAP_NAME)
+                    .withSerializer(new Serializer() {
+
+                        @Override
+                        public <T> byte[] encode(T object) {
+                            return EventuallyConsistentMapImpl.this.serializer.encode(object);
+                        }
+
+                        @Override
+                        public <T> T decode(byte[] bytes) {
+                            return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
+                        }
+                    })
+                    .build();
+        } else {
+            items = Maps.newConcurrentMap();
+        }
+        senderPending = Maps.newConcurrentMap();
+        destroyedMessage = mapName + ERROR_DESTROYED;
+
+        this.clusterService = clusterService;
+        this.clusterCommunicator = clusterCommunicator;
+        this.localNodeId = clusterService.getLocalNode().id();
+
+        this.timestampProvider = timestampProvider;
+
+        if (peerUpdateFunction != null) {
+            this.peerUpdateFunction = peerUpdateFunction;
+        } else {
+            this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
+                    .map(ControllerNode::id)
+                    .filter(nodeId -> !nodeId.equals(localNodeId))
+                    .collect(Collectors.toList());
+        }
+
+        if (eventExecutor != null) {
+            this.executor = eventExecutor;
+        } else {
+            // should be a normal executor; it's used for receiving messages
+            this.executor =
+                    Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+        }
+
+        if (communicationExecutor != null) {
+            this.communicationExecutor = communicationExecutor;
+        } else {
+            // sending executor; should be capped
+            //TODO this probably doesn't need to be bounded anymore
+            this.communicationExecutor =
+                    newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+        }
+
+
+        if (backgroundExecutor != null) {
+            this.backgroundExecutor = backgroundExecutor;
+        } else {
+            this.backgroundExecutor =
+                    newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+        }
+
+        // start anti-entropy thread
+        this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
+                                                    initialDelaySec, antiEntropyPeriod,
+                                                    antiEntropyTimeUnit);
+
+        updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
+        clusterCommunicator.addSubscriber(updateMessageSubject,
+                                          serializer::decode,
+                                          this::processUpdates,
+                                          this.executor);
+
+        antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
+        clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
+                                          serializer::decode,
+                                          this::handleAntiEntropyAdvertisement,
+                                          this.backgroundExecutor);
+
+        this.tombstonesDisabled = tombstonesDisabled;
+        this.lightweightAntiEntropy = !convergeFaster;
+    }
+
+    private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
+        return new KryoSerializer() {
+            @Override
+            protected void setupKryoPool() {
+                // Add the map's internal helper classes to the user-supplied serializer
+                serializerPool = builder
+                        .register(KryoNamespaces.BASIC)
+                        .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+                        .register(LogicalTimestamp.class)
+                        .register(WallClockTimestamp.class)
+                        .register(AntiEntropyAdvertisement.class)
+                        .register(UpdateEntry.class)
+                        .register(MapValue.class)
+                        .register(MapValue.Digest.class)
+                        .build();
+            }
+        };
+    }
+
+    @Override
+    public String name() {
+        return mapName;
+    }
+
+    @Override
+    public int size() {
+        checkState(!destroyed, destroyedMessage);
+        // TODO: Maintain a separate counter for tracking live elements in map.
+        return Maps.filterValues(items, MapValue::isAlive).size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        checkState(!destroyed, destroyedMessage);
+        return size() == 0;
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        return get(key) != null;
+    }
+
+    @Override
+    public boolean containsValue(V value) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        return items.values()
+                    .stream()
+                    .filter(MapValue::isAlive)
+                    .anyMatch(v -> value.equals(v.get()));
+    }
+
+    @Override
+    public V get(K key) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+
+        MapValue<V> value = items.get(key);
+        return (value == null || value.isTombstone()) ? null : value.get();
+    }
+
+    @Override
+    public void put(K key, V value) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
+        if (putInternal(key, newValue)) {
+            notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
+        }
+    }
+
+    @Override
+    public V remove(K key) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        return removeAndNotify(key, null);
+    }
+
+    @Override
+    public void remove(K key, V value) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        removeAndNotify(key, value);
+    }
+
+    private V removeAndNotify(K key, V value) {
+        Timestamp timestamp = timestampProvider.apply(key, value);
+        Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
+                ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
+        MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
+        if (previousValue != null) {
+            notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
+                        peerUpdateFunction.apply(key, previousValue.get()));
+            if (previousValue.isAlive()) {
+                notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
+            }
+        }
+        return previousValue != null ? previousValue.get() : null;
+    }
+
+    private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        tombstone.ifPresent(v -> checkState(v.isTombstone()));
+
+        counter.incrementCount();
+        AtomicBoolean updated = new AtomicBoolean(false);
+        AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
+        items.compute(key, (k, existing) -> {
+            boolean valueMatches = true;
+            if (value.isPresent() && existing != null && existing.isAlive()) {
+                valueMatches = Objects.equals(value.get(), existing.get());
+            }
+            if (existing == null) {
+                log.trace("ECMap Remove: Existing value for key {} is already null", k);
+            }
+            if (valueMatches) {
+                if (existing == null) {
+                    updated.set(tombstone.isPresent());
+                } else {
+                    updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
+                }
+            }
+            if (updated.get()) {
+                previousValue.set(existing);
+                return tombstone.orElse(null);
+            } else {
+                return existing;
+            }
+        });
+        return previousValue.get();
+    }
+
+    @Override
+    public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(recomputeFunction, "Recompute function cannot be null");
+
+        AtomicBoolean updated = new AtomicBoolean(false);
+        AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
+        MapValue<V> computedValue = items.compute(key, (k, mv) -> {
+            previousValue.set(mv);
+            V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
+            MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
+            if (mv == null || newValue.isNewerThan(mv)) {
+                updated.set(true);
+                return newValue;
+            } else {
+                return mv;
+            }
+        });
+        if (updated.get()) {
+            notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
+            EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
+            V value = computedValue.isTombstone()
+                    ? previousValue.get() == null ? null : previousValue.get().get()
+                    : computedValue.get();
+            if (value != null) {
+                notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
+            }
+        }
+        return computedValue.get();
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+        checkState(!destroyed, destroyedMessage);
+        m.forEach(this::put);
+    }
+
+    @Override
+    public void clear() {
+        checkState(!destroyed, destroyedMessage);
+        Maps.filterValues(items, MapValue::isAlive)
+            .forEach((k, v) -> remove(k));
+    }
+
+    @Override
+    public Set<K> keySet() {
+        checkState(!destroyed, destroyedMessage);
+        return Maps.filterValues(items, MapValue::isAlive)
+                   .keySet();
+    }
+
+    @Override
+    public Collection<V> values() {
+        checkState(!destroyed, destroyedMessage);
+        return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
+    }
+
+    @Override
+    public Set<Map.Entry<K, V>> entrySet() {
+        checkState(!destroyed, destroyedMessage);
+        return Maps.filterValues(items, MapValue::isAlive)
+                   .entrySet()
+                   .stream()
+                   .map(e -> Pair.of(e.getKey(), e.getValue().get()))
+                   .collect(Collectors.toSet());
+    }
+
+    /**
+     * Returns true if newValue was accepted i.e. map is updated.
+     *
+     * @param key key
+     * @param newValue proposed new value
+     * @return true if update happened; false if map already contains a more recent value for the key
+     */
+    private boolean putInternal(K key, MapValue<V> newValue) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(newValue, ERROR_NULL_VALUE);
+        checkState(newValue.isAlive());
+        counter.incrementCount();
+        AtomicBoolean updated = new AtomicBoolean(false);
+        items.compute(key, (k, existing) -> {
+            if (existing == null || newValue.isNewerThan(existing)) {
+                updated.set(true);
+                return newValue;
+            }
+            return existing;
+        });
+        return updated.get();
+    }
+
+    @Override
+    public void addListener(EventuallyConsistentMapListener<K, V> listener) {
+        checkState(!destroyed, destroyedMessage);
+
+        listeners.add(checkNotNull(listener));
+    }
+
+    @Override
+    public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
+        checkState(!destroyed, destroyedMessage);
+
+        listeners.remove(checkNotNull(listener));
+    }
+
+    @Override
+    public void destroy() {
+        destroyed = true;
+
+        executor.shutdown();
+        backgroundExecutor.shutdown();
+        communicationExecutor.shutdown();
+
+        listeners.clear();
+
+        clusterCommunicator.removeSubscriber(updateMessageSubject);
+        clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
+    }
+
+    private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
+        listeners.forEach(listener -> listener.event(event));
+    }
+
+    private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
+        queueUpdate(event, peers);
+    }
+
+    private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
+        if (peers == null) {
+            // we have no friends :(
+            return;
+        }
+        peers.forEach(node ->
+                        senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+        );
+    }
+
+    private boolean underHighLoad() {
+        return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
+    }
+
+    private void sendAdvertisement() {
+        try {
+            if (underHighLoad() || destroyed) {
+                return;
+            }
+            pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
+        } catch (Exception e) {
+            // Catch all exceptions to avoid scheduled task being suppressed.
+            log.error("Exception thrown while sending advertisement", e);
+        }
+    }
+
+    private Optional<NodeId> pickRandomActivePeer() {
+        List<NodeId> activePeers = clusterService.getNodes()
+                .stream()
+                .map(ControllerNode::id)
+                .filter(id -> !localNodeId.equals(id))
+                .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
+                .collect(Collectors.toList());
+        Collections.shuffle(activePeers);
+        return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
+    }
+
+    private void sendAdvertisementToPeer(NodeId peer) {
+        clusterCommunicator.unicast(createAdvertisement(),
+                antiEntropyAdvertisementSubject,
+                serializer::encode,
+                peer)
+                .whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
+                    }
+                });
+    }
+
+    private AntiEntropyAdvertisement<K> createAdvertisement() {
+        return new AntiEntropyAdvertisement<K>(localNodeId,
+                ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
+    }
+
+    private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
+        if (destroyed || underHighLoad()) {
+            return;
+        }
+        try {
+            if (log.isTraceEnabled()) {
+                log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
+                        mapName, ad.sender(), ad.digest().size());
+            }
+            antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
+
+            if (!lightweightAntiEntropy) {
+                // if remote ad has any entries that the local copy is missing, actively sync
+                // TODO: Missing keys is not the way local copy can be behind.
+                if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
+                    // TODO: Send ad for missing keys and for entries that are stale
+                    sendAdvertisementToPeer(ad.sender());
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Error handling anti-entropy advertisement", e);
+        }
+    }
+
+    /**
+     * Processes anti-entropy ad from peer by taking following actions:
+     * 1. If peer has an old entry, updates peer.
+     * 2. If peer indicates an entry is removed and has a more recent
+     * timestamp than the local entry, update local state.
+     */
+    private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
+            AntiEntropyAdvertisement<K> ad) {
+        final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
+        final NodeId sender = ad.sender();
+        items.forEach((key, localValue) -> {
+            MapValue.Digest remoteValueDigest = ad.digest().get(key);
+            if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
+                // local value is more recent, push to sender
+                queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
+            }
+            if (remoteValueDigest != null
+                    && remoteValueDigest.isNewerThan(localValue.digest())
+                    && remoteValueDigest.isTombstone()) {
+                MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
+                MapValue<V> previousValue = removeInternal(key,
+                                                           Optional.empty(),
+                                                           Optional.of(tombstone));
+                if (previousValue != null && previousValue.isAlive()) {
+                    externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
+                }
+            }
+        });
+        return externalEvents;
+    }
+
+    private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
+        if (destroyed) {
+            return;
+        }
+        updates.forEach(update -> {
+            final K key = update.key();
+            final MapValue<V> value = update.value();
+            if (value == null || value.isTombstone()) {
+                MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
+                if (previousValue != null && previousValue.isAlive()) {
+                    notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
+                }
+            } else if (putInternal(key, value)) {
+                notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
+            }
+        });
+    }
+
+    // TODO pull this into the class if this gets pulled out...
+    private static final int DEFAULT_MAX_EVENTS = 1000;
+    private static final int DEFAULT_MAX_IDLE_MS = 10;
+    private static final int DEFAULT_MAX_BATCH_MS = 50;
+    private static final Timer TIMER = new Timer("onos-ecm-sender-events");
+
+    private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
+
+        private final NodeId peer;
+
+        private EventAccumulator(NodeId peer) {
+            super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
+            this.peer = peer;
+        }
+
+        @Override
+        public void processItems(List<UpdateEntry<K, V>> items) {
+            Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
+            items.forEach(item -> map.compute(item.key(), (key, existing) ->
+                    item.isNewerThan(existing) ? item : existing));
+            communicationExecutor.submit(() -> {
+                clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
+                                            updateMessageSubject,
+                                            serializer::encode,
+                                            peer)
+                                   .whenComplete((result, error) -> {
+                                       if (error != null) {
+                                           log.debug("Failed to send to {}", peer, error);
+                                       }
+                                   });
+            });
+        }
+    }
+}