blob: 2f5bc0a00d1f7beaa97a70d202103b6f795ff7a6 [file] [log] [blame]
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Distributed Map implementation which uses optimistic replication and gossip
* based techniques to provide an eventually consistent data store.
*/
public class EventuallyConsistentMapImpl<K, V>
implements EventuallyConsistentMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
private final Map<K, MapValue<V>> items;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
private final NodeId localNodeId;
private final PersistenceService persistenceService;
private final BiFunction<K, V, Timestamp> timestampProvider;
private final MessageSubject updateMessageSubject;
private final MessageSubject antiEntropyAdvertisementSubject;
private final Set<EventuallyConsistentMapListener<K, V>> listeners
= Sets.newCopyOnWriteArraySet();
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
private final ExecutorService communicationExecutor;
private final Map<NodeId, EventAccumulator> senderPending;
private long previousTombstonePurgeTime;
private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
private final String mapName;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
private final String destroyedMessage;
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
private final long initialDelaySec = 5;
private final boolean lightweightAntiEntropy;
private final boolean tombstonesDisabled;
private static final int WINDOW_SIZE = 5;
private static final int HIGH_LOAD_THRESHOLD = 0;
private static final int LOAD_WINDOW = 2;
private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
private final boolean persistent;
private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
* See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
* for more description of the parameters expected by the map.
* </p>
*
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
* @param clusterCommunicator the cluster communications service
* @param serializerBuilder a Kryo namespace builder that can serialize
* both K and V
* @param timestampProvider provider of timestamps for K and V
* @param peerUpdateFunction function that provides a set of nodes to immediately
* update to when there writes to the map
* @param eventExecutor executor to use for processing incoming
* events from peers
* @param communicationExecutor executor to use for sending events to peers
* @param backgroundExecutor executor to use for background anti-entropy
* tasks
* @param tombstonesDisabled true if this map should not maintain
* tombstones
* @param antiEntropyPeriod period that the anti-entropy task should run
* @param antiEntropyTimeUnit time unit for anti-entropy period
* @param convergeFaster make anti-entropy try to converge faster
* @param persistent persist data to disk
* @param persistenceService persistence service
*/
EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
ExecutorService eventExecutor,
ExecutorService communicationExecutor,
ScheduledExecutorService backgroundExecutor,
boolean tombstonesDisabled,
long antiEntropyPeriod,
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster,
boolean persistent,
PersistenceService persistenceService) {
this.mapName = mapName;
this.serializer = createSerializer(serializerBuilder);
this.persistenceService = persistenceService;
this.persistent =
persistent;
if (persistent) {
items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
.withName(PERSISTENT_LOCAL_MAP_NAME)
.withSerializer(new Serializer() {
@Override
public <T> byte[] encode(T object) {
return EventuallyConsistentMapImpl.this.serializer.encode(object);
}
@Override
public <T> T decode(byte[] bytes) {
return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
}
})
.build();
} else {
items = Maps.newConcurrentMap();
}
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
this.localNodeId = clusterService.getLocalNode().id();
this.timestampProvider = timestampProvider;
if (peerUpdateFunction != null) {
this.peerUpdateFunction = peerUpdateFunction;
} else {
this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
.map(ControllerNode::id)
.filter(nodeId -> !nodeId.equals(localNodeId))
.collect(Collectors.toList());
}
if (eventExecutor != null) {
this.executor = eventExecutor;
} else {
// should be a normal executor; it's used for receiving messages
this.executor =
Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
}
if (communicationExecutor != null) {
this.communicationExecutor = communicationExecutor;
} else {
// sending executor; should be capped
//TODO this probably doesn't need to be bounded anymore
this.communicationExecutor =
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
}
if (backgroundExecutor != null) {
this.backgroundExecutor = backgroundExecutor;
} else {
this.backgroundExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
}
// start anti-entropy thread
this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
initialDelaySec, antiEntropyPeriod,
antiEntropyTimeUnit);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
serializer::decode,
this::processUpdates,
this.executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
serializer::decode,
this::handleAntiEntropyAdvertisement,
serializer::encode,
this.backgroundExecutor);
if (!tombstonesDisabled) {
previousTombstonePurgeTime = 0;
this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
initialDelaySec,
antiEntropyPeriod,
TimeUnit.SECONDS);
}
this.tombstonesDisabled = tombstonesDisabled;
this.lightweightAntiEntropy = !convergeFaster;
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
return new KryoSerializer() {
@Override
protected void setupKryoPool() {
// Add the map's internal helper classes to the user-supplied serializer
serializerPool = builder
.register(KryoNamespaces.BASIC)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(AntiEntropyAdvertisement.class)
.register(AntiEntropyResponse.class)
.register(UpdateEntry.class)
.register(MapValue.class)
.register(MapValue.Digest.class)
.build();
}
};
}
@Override
public String name() {
return mapName;
}
@Override
public int size() {
checkState(!destroyed, destroyedMessage);
// TODO: Maintain a separate counter for tracking live elements in map.
return Maps.filterValues(items, MapValue::isAlive).size();
}
@Override
public boolean isEmpty() {
checkState(!destroyed, destroyedMessage);
return size() == 0;
}
@Override
public boolean containsKey(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
return get(key) != null;
}
@Override
public boolean containsValue(V value) {
checkState(!destroyed, destroyedMessage);
checkNotNull(value, ERROR_NULL_VALUE);
return items.values()
.stream()
.filter(MapValue::isAlive)
.anyMatch(v -> value.equals(v.get()));
}
@Override
public V get(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
MapValue<V> value = items.get(key);
return (value == null || value.isTombstone()) ? null : value.get();
}
@Override
public void put(K key, V value) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
if (putInternal(key, newValue)) {
notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
}
}
@Override
public V remove(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
return removeAndNotify(key, null);
}
@Override
public void remove(K key, V value) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
removeAndNotify(key, value);
}
private V removeAndNotify(K key, V value) {
Timestamp timestamp = timestampProvider.apply(key, value);
Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
if (previousValue != null) {
notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
peerUpdateFunction.apply(key, previousValue.get()));
if (previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
}
return previousValue != null ? previousValue.get() : null;
}
private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
tombstone.ifPresent(v -> checkState(v.isTombstone()));
counter.incrementCount();
AtomicBoolean updated = new AtomicBoolean(false);
AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
items.compute(key, (k, existing) -> {
boolean valueMatches = true;
if (value.isPresent() && existing != null && existing.isAlive()) {
valueMatches = Objects.equals(value.get(), existing.get());
}
if (existing == null) {
log.trace("ECMap Remove: Existing value for key {} is already null", k);
}
if (valueMatches) {
if (existing == null) {
updated.set(tombstone.isPresent());
} else {
updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
}
}
if (updated.get()) {
previousValue.set(existing);
return tombstone.orElse(null);
} else {
return existing;
}
});
return previousValue.get();
}
@Override
public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(recomputeFunction, "Recompute function cannot be null");
AtomicBoolean updated = new AtomicBoolean(false);
AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
MapValue<V> computedValue = items.compute(key, (k, mv) -> {
previousValue.set(mv);
V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
if (mv == null || newValue.isNewerThan(mv)) {
updated.set(true);
return newValue;
} else {
return mv;
}
});
if (updated.get()) {
notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
V value = computedValue.isTombstone()
? previousValue.get() == null ? null : previousValue.get().get()
: computedValue.get();
if (value != null) {
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
}
}
return computedValue.get();
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
checkState(!destroyed, destroyedMessage);
m.forEach(this::put);
}
@Override
public void clear() {
checkState(!destroyed, destroyedMessage);
Maps.filterValues(items, MapValue::isAlive)
.forEach((k, v) -> remove(k));
}
@Override
public Set<K> keySet() {
checkState(!destroyed, destroyedMessage);
return Maps.filterValues(items, MapValue::isAlive)
.keySet();
}
@Override
public Collection<V> values() {
checkState(!destroyed, destroyedMessage);
return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
}
@Override
public Set<Map.Entry<K, V>> entrySet() {
checkState(!destroyed, destroyedMessage);
return Maps.filterValues(items, MapValue::isAlive)
.entrySet()
.stream()
.map(e -> Pair.of(e.getKey(), e.getValue().get()))
.collect(Collectors.toSet());
}
/**
* Returns true if newValue was accepted i.e. map is updated.
*
* @param key key
* @param newValue proposed new value
* @return true if update happened; false if map already contains a more recent value for the key
*/
private boolean putInternal(K key, MapValue<V> newValue) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
checkState(newValue.isAlive());
counter.incrementCount();
AtomicBoolean updated = new AtomicBoolean(false);
items.compute(key, (k, existing) -> {
if (existing == null || newValue.isNewerThan(existing)) {
updated.set(true);
return newValue;
}
return existing;
});
return updated.get();
}
@Override
public void addListener(EventuallyConsistentMapListener<K, V> listener) {
checkState(!destroyed, destroyedMessage);
listeners.add(checkNotNull(listener));
}
@Override
public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
checkState(!destroyed, destroyedMessage);
listeners.remove(checkNotNull(listener));
}
@Override
public CompletableFuture<Void> destroy() {
destroyed = true;
executor.shutdown();
backgroundExecutor.shutdown();
communicationExecutor.shutdown();
listeners.clear();
clusterCommunicator.removeSubscriber(updateMessageSubject);
clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
return CompletableFuture.completedFuture(null);
}
private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
listeners.forEach(listener -> listener.event(event));
}
private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
queueUpdate(event, peers);
}
private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
if (peers == null) {
// we have no friends :(
return;
}
peers.forEach(node ->
senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
);
}
private boolean underHighLoad() {
return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
}
private void sendAdvertisement() {
try {
if (underHighLoad() || destroyed) {
return;
}
pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
}
}
private Optional<NodeId> pickRandomActivePeer() {
List<NodeId> activePeers = clusterService.getNodes()
.stream()
.map(ControllerNode::id)
.filter(id -> !localNodeId.equals(id))
.filter(id -> clusterService.getState(id).isActive())
.collect(Collectors.toList());
Collections.shuffle(activePeers);
return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
}
private void sendAdvertisementToPeer(NodeId peer) {
AntiEntropyAdvertisement<K> ad = createAdvertisement();
clusterCommunicator.sendAndReceive(ad,
antiEntropyAdvertisementSubject,
serializer::encode,
serializer::decode,
peer)
.whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
} else if (result == AntiEntropyResponse.PROCESSED) {
antiEntropyTimes.put(peer, ad.creationTime());
}
});
}
private AntiEntropyAdvertisement<K> createAdvertisement() {
return new AntiEntropyAdvertisement<K>(localNodeId,
ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
}
private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
if (destroyed || underHighLoad()) {
return AntiEntropyResponse.IGNORED;
}
try {
if (log.isTraceEnabled()) {
log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
mapName, ad.sender(), ad.digest().size());
}
antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
if (!lightweightAntiEntropy) {
// if remote ad has any entries that the local copy is missing, actively sync
// TODO: Missing keys is not the way local copy can be behind.
if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
// TODO: Send ad for missing keys and for entries that are stale
sendAdvertisementToPeer(ad.sender());
}
}
} catch (Exception e) {
log.warn("Error handling anti-entropy advertisement", e);
return AntiEntropyResponse.FAILED;
}
return AntiEntropyResponse.PROCESSED;
}
/**
* Processes anti-entropy ad from peer by taking following actions:
* 1. If peer has an old entry, updates peer.
* 2. If peer indicates an entry is removed and has a more recent
* timestamp than the local entry, update local state.
*/
private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
final NodeId sender = ad.sender();
items.forEach((key, localValue) -> {
MapValue.Digest remoteValueDigest = ad.digest().get(key);
if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
// local value is more recent, push to sender
queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
}
if (remoteValueDigest != null
&& remoteValueDigest.isNewerThan(localValue.digest())
&& remoteValueDigest.isTombstone()) {
MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
MapValue<V> previousValue = removeInternal(key,
Optional.empty(),
Optional.of(tombstone));
if (previousValue != null && previousValue.isAlive()) {
externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
}
});
return externalEvents;
}
private void purgeTombstones() {
/*
* In order to mitigate the resource exhausation that can ensue due to an ever-growing set
* of tombstones we employ the following heuristic to purge old tombstones periodically.
* First, we keep track of the time (local system time) when we were able to have a successful
* AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
* as the time before which all tombstones are considered safe to purge.
*/
long currentSafeTombstonePurgeTime = clusterService.getNodes()
.stream()
.map(ControllerNode::id)
.filter(id -> !id.equals(localNodeId))
.map(id -> antiEntropyTimes.getOrDefault(id, 0L))
.reduce(Math::min)
.orElse(0L);
if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
return;
}
List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
.stream()
.filter(e -> e.getValue().isTombstone())
.filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
.collect(Collectors.toList());
previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
}
private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
if (destroyed) {
return;
}
updates.forEach(update -> {
final K key = update.key();
final MapValue<V> value = update.value() == null ? null : update.value().copy();
if (value == null || value.isTombstone()) {
MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
if (previousValue != null && previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
} else if (putInternal(key, value)) {
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
}
});
}
// TODO pull this into the class if this gets pulled out...
private static final int DEFAULT_MAX_EVENTS = 1000;
private static final int DEFAULT_MAX_IDLE_MS = 10;
private static final int DEFAULT_MAX_BATCH_MS = 50;
private static final Timer TIMER = new Timer("onos-ecm-sender-events");
private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
private final NodeId peer;
private EventAccumulator(NodeId peer) {
super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
this.peer = peer;
}
@Override
public void processItems(List<UpdateEntry<K, V>> items) {
Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
items.forEach(item -> map.compute(item.key(), (key, existing) ->
item.isNewerThan(existing) ? item : existing));
communicationExecutor.execute(() -> {
clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
updateMessageSubject,
serializer::encode,
peer)
.whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to send to {}", peer, error);
}
});
});
}
}
}