/*
 * Copyright 2016-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.primitives.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;

/**
 * Distributed Map implementation which uses optimistic replication and gossip
 * based techniques to provide an eventually consistent data store.
 */
public class EventuallyConsistentMapImpl<K, V>
        implements EventuallyConsistentMap<K, V> {

    private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
    private static final String ERROR_DESTROYED = " map is already destroyed";
    private static final String ERROR_NULL_KEY = "Key cannot be null";
    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
    private static final int WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 2;
    private static final int LOAD_WINDOW = 2;

    private final Map<K, MapValue<V>> items;
    private final ClusterCommunicationService clusterCommunicator;
    private final Serializer serializer;
    private final PersistenceService persistenceService;
    private final BiFunction<K, V, Timestamp> timestampProvider;
    private final MessageSubject bootstrapMessageSubject;
    private final MessageSubject initializeMessageSubject;
    private final MessageSubject updateMessageSubject;
    private final MessageSubject antiEntropyAdvertisementSubject;
    private final MessageSubject updateRequestSubject;
    private final Set<EventuallyConsistentMapListener<K, V>> listeners
            = Sets.newCopyOnWriteArraySet();
    private final ExecutorService executor;
    private final ScheduledExecutorService backgroundExecutor;
    private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
    private final ExecutorService communicationExecutor;
    private final Map<NodeId, EventAccumulator> senderPending;
    private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
    private final String mapName;
    private final String destroyedMessage;
    private final long initialDelaySec = 5;
    private final boolean lightweightAntiEntropy;
    private final boolean tombstonesDisabled;
    private final boolean persistent;
    private final Supplier<List<NodeId>> peersSupplier;
    private final Supplier<List<NodeId>> bootstrapPeersSupplier;
    private final NodeId localNodeId;
    private long previousTombstonePurgeTime;
    private volatile boolean destroyed = false;
    private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);

    /**
     * Creates a new eventually consistent map shared amongst multiple instances.
     * <p>
     * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
     * for more description of the parameters expected by the map.
     * </p>
     *
     * @param localNodeId            local node id
     * @param mapName                a String identifier for the map.
     * @param clusterCommunicator    the cluster communications service
     * @param ns                     a Kryo namespace that can serialize both K and V
     * @param timestampProvider      provider of timestamps for K and V
     * @param peerUpdateFunction     function that provides a set of nodes to immediately
     *                               update to when there writes to the map
     * @param eventExecutor          executor to use for processing incoming events from peers
     * @param communicationExecutor  executor to use for sending events to peers
     * @param backgroundExecutor     executor to use for background anti-entropy tasks
     * @param tombstonesDisabled     true if this map should not maintain tombstones
     * @param antiEntropyPeriod      period that the anti-entropy task should run
     * @param antiEntropyTimeUnit    time unit for anti-entropy period
     * @param convergeFaster         make anti-entropy try to converge faster
     * @param persistent             persist data to disk
     * @param persistenceService     persistence service
     * @param peersSupplier          supplier for peers
     * @param bootstrapPeersSupplier supplier for bootstrap peers
     */
    //CHECKSTYLE:OFF
    EventuallyConsistentMapImpl(
            NodeId localNodeId,
            String mapName,
            ClusterCommunicationService clusterCommunicator,
            KryoNamespace ns,
            BiFunction<K, V, Timestamp> timestampProvider,
            BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
            ExecutorService eventExecutor,
            ExecutorService communicationExecutor,
            ScheduledExecutorService backgroundExecutor,
            boolean tombstonesDisabled,
            long antiEntropyPeriod,
            TimeUnit antiEntropyTimeUnit,
            boolean convergeFaster,
            boolean persistent,
            PersistenceService persistenceService,
            Supplier<List<NodeId>> peersSupplier,
            Supplier<List<NodeId>> bootstrapPeersSupplier
    ) {
        //CHECKSTYLE:ON
        this.localNodeId = localNodeId;
        this.mapName = mapName;
        this.serializer = createSerializer(ns);
        this.persistenceService = persistenceService;
        this.persistent =
                persistent;
        if (persistent) {
            items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
                    .withName(mapName)
                    .withSerializer(this.serializer)
                    .build();
        } else {
            items = Maps.newConcurrentMap();
        }
        senderPending = Maps.newConcurrentMap();
        destroyedMessage = mapName + ERROR_DESTROYED;

        this.clusterCommunicator = clusterCommunicator;

        this.timestampProvider = timestampProvider;

        this.peersSupplier = peersSupplier;
        this.bootstrapPeersSupplier = bootstrapPeersSupplier;

        if (peerUpdateFunction != null) {
            this.peerUpdateFunction = peerUpdateFunction.andThen(peers -> peersSupplier.get()
                    .stream()
                    .filter(peers::contains)
                    .collect(Collectors.toList()));
        } else {
            this.peerUpdateFunction = (key, value) -> peersSupplier.get();
        }

        if (eventExecutor != null) {
            this.executor = eventExecutor;
        } else {
            // should be a normal executor; it's used for receiving messages
            this.executor =
                    Executors.newFixedThreadPool(8,
                            groupedThreads("onos/ecm", mapName + "-fg-%d", log));
        }

        if (communicationExecutor != null) {
            this.communicationExecutor = communicationExecutor;
        } else {
            // sending executor; should be capped
            //TODO this probably doesn't need to be bounded anymore
            this.communicationExecutor =
                    newFixedThreadPool(8,
                            groupedThreads("onos/ecm", mapName + "-publish-%d", log));
        }


        if (backgroundExecutor != null) {
            this.backgroundExecutor = backgroundExecutor;
        } else {
            this.backgroundExecutor =
                    newSingleThreadScheduledExecutor(
                            groupedThreads("onos/ecm", mapName + "-bg-%d", log));
        }

        // start anti-entropy thread
        this.backgroundExecutor.scheduleAtFixedRate(
                this::sendAdvertisement,
                initialDelaySec, antiEntropyPeriod,
                antiEntropyTimeUnit
        );

        bootstrapMessageSubject = new MessageSubject("ecm-" + mapName + "-bootstrap");
        clusterCommunicator.addSubscriber(
                bootstrapMessageSubject,
                serializer::decode,
                (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
                serializer::encode
        );

        initializeMessageSubject = new MessageSubject("ecm-" + mapName + "-initialize");
        clusterCommunicator.addSubscriber(
                initializeMessageSubject,
                serializer::decode,
                (Function<Collection<UpdateEntry<K, V>>, Void>) u -> {
                    processUpdates(u);
                    return null;
                },
                serializer::encode,
                this.executor
        );

        updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
        clusterCommunicator.addSubscriber(
                updateMessageSubject,
                serializer::decode,
                this::processUpdates,
                this.executor
        );

        antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
        clusterCommunicator.addSubscriber(
                antiEntropyAdvertisementSubject,
                serializer::decode,
                this::handleAntiEntropyAdvertisement,
                serializer::encode,
                this.backgroundExecutor
        );

        updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
        clusterCommunicator.addSubscriber(
                updateRequestSubject,
                serializer::decode,
                this::handleUpdateRequests,
                this.backgroundExecutor
        );

        if (!tombstonesDisabled) {
            previousTombstonePurgeTime = 0;
            this.backgroundExecutor.scheduleWithFixedDelay(
                    this::purgeTombstones,
                    initialDelaySec,
                    antiEntropyPeriod,
                    TimeUnit.SECONDS
            );
        }

        this.tombstonesDisabled = tombstonesDisabled;
        this.lightweightAntiEntropy = !convergeFaster;

        // Initiate first round of Gossip
        this.bootstrap();
    }

    private Serializer createSerializer(KryoNamespace ns) {
        return Serializer.using(KryoNamespace.newBuilder()
                .register(ns)
                // not so robust way to avoid collision with other
                // user supplied registrations
                .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
                .register(KryoNamespaces.BASIC)
                .register(LogicalTimestamp.class)
                .register(WallClockTimestamp.class)
                .register(AntiEntropyAdvertisement.class)
                .register(AntiEntropyResponse.class)
                .register(UpdateEntry.class)
                .register(MapValue.class)
                .register(MapValue.Digest.class)
                .register(UpdateRequest.class)
                .build(name() + "-ecmap"));
    }

    @Override
    public String name() {
        return mapName;
    }

    @Override
    public int size() {
        checkState(!destroyed, destroyedMessage);
        // TODO: Maintain a separate counter for tracking live elements in map.
        return Maps.filterValues(items, MapValue::isAlive).size();
    }

    @Override
    public boolean isEmpty() {
        checkState(!destroyed, destroyedMessage);
        return size() == 0;
    }

    @Override
    public boolean containsKey(K key) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(key, ERROR_NULL_KEY);
        return get(key) != null;
    }

    @Override
    public boolean containsValue(V value) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(value, ERROR_NULL_VALUE);
        return items.values()
                .stream()
                .filter(MapValue::isAlive)
                .anyMatch(v -> value.equals(v.get()));
    }

    @Override
    public V get(K key) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(key, ERROR_NULL_KEY);

        MapValue<V> value = items.get(key);
        return (value == null || value.isTombstone()) ? null : value.get();
    }

    @Override
    public void put(K key, V value) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(key, ERROR_NULL_KEY);
        checkNotNull(value, ERROR_NULL_VALUE);

        MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
        if (putInternal(key, newValue)) {
            notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
            notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
        }
    }

    @Override
    public V remove(K key) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(key, ERROR_NULL_KEY);
        return removeAndNotify(key, null);
    }

    @Override
    public void remove(K key, V value) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(key, ERROR_NULL_KEY);
        checkNotNull(value, ERROR_NULL_VALUE);
        removeAndNotify(key, value);
    }

    private V removeAndNotify(K key, V value) {
        Timestamp timestamp = timestampProvider.apply(key, value);
        Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
                ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
        MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
        if (previousValue != null) {
            notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
                    peerUpdateFunction.apply(key, previousValue.get()));
            if (previousValue.isAlive()) {
                notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
            }
        }
        return previousValue != null ? previousValue.get() : null;
    }

    private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(key, ERROR_NULL_KEY);
        checkNotNull(value, ERROR_NULL_VALUE);
        tombstone.ifPresent(v -> checkState(v.isTombstone()));

        counter.incrementCount();
        AtomicBoolean updated = new AtomicBoolean(false);
        AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
        items.compute(key, (k, existing) -> {
            boolean valueMatches = true;
            if (value.isPresent() && existing != null && existing.isAlive()) {
                valueMatches = Objects.equals(value.get(), existing.get());
            }
            if (existing == null) {
                log.trace("ECMap Remove: Existing value for key {} is already null", k);
            }
            if (valueMatches) {
                if (existing == null) {
                    updated.set(tombstone.isPresent());
                } else {
                    updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
                }
            }
            if (updated.get()) {
                previousValue.set(existing);
                return tombstone.orElse(null);
            } else {
                return existing;
            }
        });
        return previousValue.get();
    }

    @Override
    public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(key, ERROR_NULL_KEY);
        checkNotNull(recomputeFunction, "Recompute function cannot be null");

        AtomicBoolean updated = new AtomicBoolean(false);
        AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
        MapValue<V> computedValue = items.compute(serializer.copy(key), (k, mv) -> {
            previousValue.set(mv);
            V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
            if (mv != null && Objects.equals(newRawValue, mv.get())) {
                // value was not updated
                return mv;
            }
            MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
            if (mv == null || newValue.isNewerThan(mv)) {
                updated.set(true);
                // We return a copy to ensure updates to peers can be serialized.
                // This prevents replica divergence due to serialization failures.
                return serializer.copy(newValue);
            } else {
                return mv;
            }
        });
        if (updated.get()) {
            notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
            EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
            V value = computedValue.isTombstone()
                    ? previousValue.get() == null ? null : previousValue.get().get()
                    : computedValue.get();
            if (value != null) {
                notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
            }
        }
        return computedValue.get();
    }

    @Override
    public void putAll(Map<? extends K, ? extends V> m) {
        checkState(!destroyed, destroyedMessage);
        m.forEach(this::put);
    }

    @Override
    public void clear() {
        checkState(!destroyed, destroyedMessage);
        Maps.filterValues(items, MapValue::isAlive)
                .forEach((k, v) -> remove(k));
    }

    @Override
    public Set<K> keySet() {
        checkState(!destroyed, destroyedMessage);
        return Maps.filterValues(items, MapValue::isAlive)
                .keySet();
    }

    @Override
    public Collection<V> values() {
        checkState(!destroyed, destroyedMessage);
        return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
    }

    @Override
    public Set<Map.Entry<K, V>> entrySet() {
        checkState(!destroyed, destroyedMessage);
        return Maps.filterValues(items, MapValue::isAlive)
                .entrySet()
                .stream()
                .map(e -> Pair.of(e.getKey(), e.getValue().get()))
                .collect(Collectors.toSet());
    }

    /**
     * Returns true if newValue was accepted i.e. map is updated.
     *
     * @param key      key
     * @param newValue proposed new value
     * @return true if update happened; false if map already contains a more recent value for the key
     */
    private boolean putInternal(K key, MapValue<V> newValue) {
        checkState(!destroyed, destroyedMessage);
        checkNotNull(key, ERROR_NULL_KEY);
        checkNotNull(newValue, ERROR_NULL_VALUE);
        checkState(newValue.isAlive());
        counter.incrementCount();
        AtomicBoolean updated = new AtomicBoolean(false);
        items.compute(key, (k, existing) -> {
            if (existing == null || newValue.isNewerThan(existing)) {
                updated.set(true);
                return newValue;
            }
            return existing;
        });
        return updated.get();
    }

    @Override
    public void addListener(EventuallyConsistentMapListener<K, V> listener) {
        checkState(!destroyed, destroyedMessage);

        listeners.add(checkNotNull(listener));
        items.forEach((k, v) -> {
            if (v.isAlive()) {
                listener.event(new EventuallyConsistentMapEvent<K, V>(mapName, PUT, k, v.get()));
            }
        });
    }

    @Override
    public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
        checkState(!destroyed, destroyedMessage);

        listeners.remove(checkNotNull(listener));
    }

    @Override
    public CompletableFuture<Void> destroy() {
        destroyed = true;

        executor.shutdown();
        backgroundExecutor.shutdown();
        communicationExecutor.shutdown();

        listeners.clear();

        clusterCommunicator.removeSubscriber(bootstrapMessageSubject);
        clusterCommunicator.removeSubscriber(initializeMessageSubject);
        clusterCommunicator.removeSubscriber(updateMessageSubject);
        clusterCommunicator.removeSubscriber(updateRequestSubject);
        clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
        return CompletableFuture.completedFuture(null);
    }

    private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
        listeners.forEach(listener -> listener.event(event));
    }

    private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
        queueUpdate(event, peers);
    }

    private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
        if (peers == null) {
            // we have no friends :(
            return;
        }
        peers.forEach(node ->
                senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
        );
    }

    private boolean underHighLoad() {
        return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
    }

    private void sendAdvertisement() {
        try {
            if (underHighLoad() || destroyed) {
                return;
            }
            pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
        } catch (Exception e) {
            // Catch all exceptions to avoid scheduled task being suppressed.
            log.error("Exception thrown while sending advertisement", e);
        }
    }

    private Optional<NodeId> pickRandomActivePeer() {
        List<NodeId> activePeers = peersSupplier.get();
        Collections.shuffle(activePeers);
        return activePeers.stream().findFirst();
    }

    private void sendAdvertisementToPeer(NodeId peer) {
        long adCreationTime = System.currentTimeMillis();
        AntiEntropyAdvertisement<K> ad = createAdvertisement();
        clusterCommunicator.sendAndReceive(ad,
                antiEntropyAdvertisementSubject,
                serializer::encode,
                serializer::decode,
                peer)
                .whenComplete((result, error) -> {
                    if (error != null) {
                        log.debug("Failed to send anti-entropy advertisement to {}: {}",
                                peer, error.getMessage());
                    } else if (result == AntiEntropyResponse.PROCESSED) {
                        antiEntropyTimes.put(peer, adCreationTime);
                    }
                });
    }

    private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
        UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
        clusterCommunicator.unicast(request,
                updateRequestSubject,
                serializer::encode,
                peer)
                .whenComplete((result, error) -> {
                    if (error != null) {
                        log.debug("Failed to send update request to {}: {}",
                                peer, error.getMessage());
                    }
                });
    }

    private AntiEntropyAdvertisement<K> createAdvertisement() {
        return new AntiEntropyAdvertisement<>(localNodeId,
                ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
    }

    private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
        if (destroyed || underHighLoad()) {
            return AntiEntropyResponse.IGNORED;
        }
        try {
            if (log.isTraceEnabled()) {
                log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
                        ad.sender(), mapName, ad.digest().size());
            }
            antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
        } catch (Exception e) {
            log.warn("Error handling anti-entropy advertisement", e);
            return AntiEntropyResponse.FAILED;
        }
        return AntiEntropyResponse.PROCESSED;
    }

    /**
     * Processes anti-entropy ad from peer by taking following actions:
     * 1. If peer has an old entry, updates peer.
     * 2. If peer indicates an entry is removed and has a more recent
     * timestamp than the local entry, update local state.
     */
    private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
            AntiEntropyAdvertisement<K> ad) {
        final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
        final NodeId sender = ad.sender();
        final List<NodeId> peers = ImmutableList.of(sender);
        Set<K> staleOrMissing = new HashSet<>();
        Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());

        items.forEach((key, localValue) -> {
            locallyUnknown.remove(key);
            MapValue.Digest remoteValueDigest = ad.digest().get(key);
            if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
                // local value is more recent, push to sender
                queueUpdate(new UpdateEntry<>(key, localValue), peers);
            } else if (remoteValueDigest.isNewerThan(localValue.digest()) && remoteValueDigest.isTombstone()) {
                // remote value is more recent and a tombstone: update local value
                MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
                MapValue<V> previousValue = removeInternal(key,
                        Optional.empty(),
                        Optional.of(tombstone));
                if (previousValue != null && previousValue.isAlive()) {
                    externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
                }
            } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
                // Not a tombstone and remote is newer
                staleOrMissing.add(key);
            }
        });
        // Keys missing in local map
        staleOrMissing.addAll(locallyUnknown);
        // Request updates that we missed out on
        sendUpdateRequestToPeer(sender, staleOrMissing);
        return externalEvents;
    }

    private void handleUpdateRequests(UpdateRequest<K> request) {
        final Set<K> keys = request.keys();
        final NodeId sender = request.sender();
        final List<NodeId> peers = ImmutableList.of(sender);
        keys.forEach(key ->
                queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
        );
    }

    private void purgeTombstones() {
        /*
         * In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
         * of tombstones we employ the following heuristic to purge old tombstones periodically.
         * First, we keep track of the time (local system time) when we were able to have a successful
         * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
         * as the time before which all tombstones are considered safe to purge.
         */
        long currentSafeTombstonePurgeTime = peersSupplier.get()
                .stream()
                .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
                .reduce(Math::min)
                .orElse(0L);
        if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
            return;
        }
        List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
                .stream()
                .filter(e -> e.getValue().isTombstone())
                .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
                .collect(Collectors.toList());
        previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
        tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
    }

    private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
        if (destroyed) {
            return;
        }
        updates.forEach(update -> {
            final K key = update.key();
            final MapValue<V> value = update.value() == null ? null : update.value().copy();
            if (value == null || value.isTombstone()) {
                MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
                if (previousValue != null && previousValue.isAlive()) {
                    notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
                }
            } else if (putInternal(key, value)) {
                notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
            }
        });
    }

    /**
     * Bootstraps the map to attempt to get in sync with existing instances of the same map on other nodes in the
     * cluster. This is necessary to ensure that a read immediately after the map is created doesn't return a null
     * value.
     */
    private void bootstrap() {
        List<NodeId> activePeers = bootstrapPeersSupplier.get();
        if (activePeers.isEmpty()) {
            return;
        }
        try {
            requestBootstrapFromPeers(activePeers)
                    .get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            log.debug("Failed to bootstrap ec map {}: {}", mapName, ExceptionUtils.getStackTrace(e));
        }
    }

    /**
     * Requests all updates from each peer in the provided list of peers.
     * <p>
     * The returned future will be completed once at least one peer bootstraps this map or bootstrap requests to all
     * peers fail.
     *
     * @param peers the list of peers from which to request updates
     * @return a future to be completed once updates have been received from at least one peer
     */
    private CompletableFuture<Void> requestBootstrapFromPeers(List<NodeId> peers) {
        if (peers.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = new CompletableFuture<>();
        final int totalPeers = peers.size();
        AtomicBoolean successful = new AtomicBoolean();
        AtomicInteger totalCount = new AtomicInteger();
        AtomicReference<Throwable> lastError = new AtomicReference<>();

        // Iterate through all of the peers and send a bootstrap request. On the first peer that returns
        // a successful bootstrap response, complete the future. Otherwise, if no peers respond with any
        // successful bootstrap response, the future will be completed with the last exception.
        for (NodeId peer : peers) {
            requestBootstrapFromPeer(peer).whenComplete((result, error) -> {
                if (error == null) {
                    if (successful.compareAndSet(false, true)) {
                        future.complete(null);
                    } else if (totalCount.incrementAndGet() == totalPeers) {
                        Throwable e = lastError.get();
                        if (e != null) {
                            future.completeExceptionally(e);
                        }
                    }
                } else {
                    if (!successful.get() && totalCount.incrementAndGet() == totalPeers) {
                        future.completeExceptionally(error);
                    } else {
                        lastError.set(error);
                    }
                }
            });
        }
        return future;
    }

    /**
     * Requests a bootstrap from the given peer.
     *
     * @param peer the peer from which to request updates
     * @return a future to be completed once the peer has sent bootstrap updates
     */
    private CompletableFuture<Void> requestBootstrapFromPeer(NodeId peer) {
        log.trace("Sending bootstrap request to {} for {}", peer, mapName);
        return clusterCommunicator.<NodeId, Void>sendAndReceive(
                localNodeId,
                bootstrapMessageSubject,
                serializer::encode,
                serializer::decode,
                peer)
                .whenComplete((updates, error) -> {
                    if (error != null) {
                        log.debug("Bootstrap request to {} failed: {}", peer, error.getMessage());
                    }
                });
    }

    /**
     * Handles a bootstrap request from a peer.
     * <p>
     * When handling a bootstrap request from a peer, the node sends batches of entries back to the peer and
     * completes the bootstrap request once all batches have been received and processed.
     *
     * @param peer the peer that sent the bootstrap request
     * @return a future to be completed once updates have been sent to the peer
     */
    private CompletableFuture<Void> handleBootstrap(NodeId peer) {
        log.trace("Received bootstrap request from {} for {}", peer, bootstrapMessageSubject);

        Function<List<UpdateEntry<K, V>>, CompletableFuture<Void>> sendUpdates = updates -> {
            log.trace("Initializing {} with {} entries", peer, updates.size());
            return clusterCommunicator.<List<UpdateEntry<K, V>>, Void>sendAndReceive(
                    ImmutableList.copyOf(updates),
                    initializeMessageSubject,
                    serializer::encode,
                    serializer::decode,
                    peer)
                    .whenComplete((result, error) -> {
                        if (error != null) {
                            log.debug("Failed to initialize {}", peer, error);
                        }
                    });
        };

        List<CompletableFuture<Void>> futures = Lists.newArrayList();
        List<UpdateEntry<K, V>> updates = Lists.newArrayList();
        for (Map.Entry<K, MapValue<V>> entry : items.entrySet()) {
            K key = entry.getKey();
            MapValue<V> value = entry.getValue();
            if (value.isAlive()) {
                updates.add(new UpdateEntry<>(key, value));
                if (updates.size() == DEFAULT_MAX_EVENTS) {
                    futures.add(sendUpdates.apply(updates));
                    updates = new ArrayList<>();
                }
            }
        }

        if (!updates.isEmpty()) {
            futures.add(sendUpdates.apply(updates));
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    }

    // TODO pull this into the class if this gets pulled out...
    private static final int DEFAULT_MAX_EVENTS = 1000;
    private static final int DEFAULT_MAX_IDLE_MS = 10;
    private static final int DEFAULT_MAX_BATCH_MS = 50;
    private static final Timer TIMER = new Timer("onos-ecm-sender-events");

    private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {

        private final NodeId peer;

        private EventAccumulator(NodeId peer) {
            super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
            this.peer = peer;
        }

        @Override
        public void processItems(List<UpdateEntry<K, V>> items) {
            Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
            items.forEach(item -> map.compute(item.key(), (key, existing) ->
                    item.isNewerThan(existing) ? item : existing));
            communicationExecutor.execute(() -> {
                try {
                    clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
                            updateMessageSubject,
                            serializer::encode,
                            peer)
                            .whenComplete((result, error) -> {
                                if (error != null) {
                                    log.debug("Failed to send to {}", peer, error);
                                }
                            });
                } catch (Exception e) {
                    log.warn("Failed to send to {}", peer, e);
                }
            });
        }
    }
}
