blob: 2f5bc0a00d1f7beaa97a70d202103b6f795ff7a6 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jonathan Hartdb3af892015-01-26 13:19:07 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Madan Jampanif4c88502016-01-21 12:35:36 -080018import static com.google.common.base.Preconditions.checkNotNull;
19import static com.google.common.base.Preconditions.checkState;
20import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
21import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
22import static org.onlab.util.Tools.groupedThreads;
23import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
24import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070027import java.util.Collections;
Jonathan Hartdb3af892015-01-26 13:19:07 -080028import java.util.List;
29import java.util.Map;
Madan Jampanid13f3b82015-07-01 17:37:50 -070030import java.util.Objects;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070031import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080033import java.util.Timer;
Madan Jampanifa242182016-01-22 13:42:54 -080034import java.util.concurrent.CompletableFuture;
Jonathan Hartdb3af892015-01-26 13:19:07 -080035import java.util.concurrent.ExecutorService;
36import java.util.concurrent.Executors;
37import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080038import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070039import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070040import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080041import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.stream.Collectors;
43
Madan Jampanif4c88502016-01-21 12:35:36 -080044import org.apache.commons.lang3.tuple.Pair;
45import org.onlab.util.AbstractAccumulator;
46import org.onlab.util.KryoNamespace;
47import org.onlab.util.SlidingWindowCounter;
48import org.onosproject.cluster.ClusterService;
49import org.onosproject.cluster.ControllerNode;
50import org.onosproject.cluster.NodeId;
51import org.onosproject.persistence.PersistenceService;
52import org.onosproject.store.LogicalTimestamp;
53import org.onosproject.store.Timestamp;
54import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
55import org.onosproject.store.cluster.messaging.MessageSubject;
56import org.onosproject.store.serializers.KryoNamespaces;
57import org.onosproject.store.serializers.KryoSerializer;
58import org.onosproject.store.service.EventuallyConsistentMap;
59import org.onosproject.store.service.EventuallyConsistentMapEvent;
60import org.onosproject.store.service.EventuallyConsistentMapListener;
61import org.onosproject.store.service.Serializer;
62import org.onosproject.store.service.WallClockTimestamp;
63import org.slf4j.Logger;
64import org.slf4j.LoggerFactory;
65
66import com.google.common.collect.Collections2;
67import com.google.common.collect.ImmutableList;
68import com.google.common.collect.ImmutableMap;
69import com.google.common.collect.Lists;
70import com.google.common.collect.Maps;
71import com.google.common.collect.Sets;
Jonathan Hartdb3af892015-01-26 13:19:07 -080072
73/**
74 * Distributed Map implementation which uses optimistic replication and gossip
75 * based techniques to provide an eventually consistent data store.
76 */
77public class EventuallyConsistentMapImpl<K, V>
78 implements EventuallyConsistentMap<K, V> {
79
80 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
81
Madan Jampani3d76c942015-06-29 23:37:10 -070082 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080083
Jonathan Hartdb3af892015-01-26 13:19:07 -080084 private final ClusterService clusterService;
85 private final ClusterCommunicationService clusterCommunicator;
86 private final KryoSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070087 private final NodeId localNodeId;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070088 private final PersistenceService persistenceService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
Madan Jampanibcf1a482015-06-24 19:05:56 -070090 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
92 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080093 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094
Jonathan Hartaaa56572015-01-28 21:56:35 -080095 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070096 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080097
98 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800100 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800101
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700102 private final ExecutorService communicationExecutor;
103 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800104
Madan Jampani29f52a32016-04-18 15:20:52 -0700105 private long previousTombstonePurgeTime;
106 private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
107
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700108 private final String mapName;
109
Jonathan Hartdb3af892015-01-26 13:19:07 -0800110 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800111 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800112 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800113
Jonathan Hart4f397e82015-02-04 09:10:41 -0800114 private static final String ERROR_NULL_KEY = "Key cannot be null";
115 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
116
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700117 private final long initialDelaySec = 5;
118 private final boolean lightweightAntiEntropy;
119 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800120
Jonathan Hart233a18a2015-03-02 17:24:58 -0800121 private static final int WINDOW_SIZE = 5;
122 private static final int HIGH_LOAD_THRESHOLD = 0;
123 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700124 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800125
Jonathan Hartca335e92015-03-05 10:34:32 -0800126 private final boolean persistent;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700127
128 private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
129
Jonathan Hartca335e92015-03-05 10:34:32 -0800130
Jonathan Hartdb3af892015-01-26 13:19:07 -0800131 /**
132 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800133 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700134 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
135 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800136 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800137 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700138 * @param mapName a String identifier for the map.
139 * @param clusterService the cluster service
140 * @param clusterCommunicator the cluster communications service
141 * @param serializerBuilder a Kryo namespace builder that can serialize
142 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700143 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700144 * @param peerUpdateFunction function that provides a set of nodes to immediately
145 * update to when there writes to the map
146 * @param eventExecutor executor to use for processing incoming
147 * events from peers
148 * @param communicationExecutor executor to use for sending events to peers
149 * @param backgroundExecutor executor to use for background anti-entropy
150 * tasks
151 * @param tombstonesDisabled true if this map should not maintain
152 * tombstones
153 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800154 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700155 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800156 * @param persistent persist data to disk
Jian Lidfba7392016-01-22 16:46:58 -0800157 * @param persistenceService persistence service
Jonathan Hartdb3af892015-01-26 13:19:07 -0800158 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700159 EventuallyConsistentMapImpl(String mapName,
160 ClusterService clusterService,
161 ClusterCommunicationService clusterCommunicator,
162 KryoNamespace.Builder serializerBuilder,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700163 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700164 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
165 ExecutorService eventExecutor,
166 ExecutorService communicationExecutor,
167 ScheduledExecutorService backgroundExecutor,
168 boolean tombstonesDisabled,
169 long antiEntropyPeriod,
170 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800171 boolean convergeFaster,
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700172 boolean persistent,
173 PersistenceService persistenceService) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700174 this.mapName = mapName;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700175 this.serializer = createSerializer(serializerBuilder);
176 this.persistenceService = persistenceService;
177 this.persistent =
178 persistent;
179 if (persistent) {
180 items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
181 .withName(PERSISTENT_LOCAL_MAP_NAME)
182 .withSerializer(new Serializer() {
183
184 @Override
185 public <T> byte[] encode(T object) {
186 return EventuallyConsistentMapImpl.this.serializer.encode(object);
187 }
188
189 @Override
190 public <T> T decode(byte[] bytes) {
191 return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
192 }
193 })
194 .build();
195 } else {
196 items = Maps.newConcurrentMap();
197 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800198 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700199 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800200
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700201 this.clusterService = clusterService;
202 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700203 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700204
Madan Jampanibcf1a482015-06-24 19:05:56 -0700205 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700206
207 if (peerUpdateFunction != null) {
208 this.peerUpdateFunction = peerUpdateFunction;
209 } else {
210 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
211 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700212 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700213 .collect(Collectors.toList());
214 }
215
216 if (eventExecutor != null) {
217 this.executor = eventExecutor;
218 } else {
219 // should be a normal executor; it's used for receiving messages
220 this.executor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700221 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700222 }
223
224 if (communicationExecutor != null) {
225 this.communicationExecutor = communicationExecutor;
226 } else {
227 // sending executor; should be capped
228 //TODO this probably doesn't need to be bounded anymore
229 this.communicationExecutor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700230 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700231 }
232
Jonathan Hartca335e92015-03-05 10:34:32 -0800233
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700234 if (backgroundExecutor != null) {
235 this.backgroundExecutor = backgroundExecutor;
236 } else {
237 this.backgroundExecutor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700238 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700239 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240
Jonathan Hartaaa56572015-01-28 21:56:35 -0800241 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700242 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700243 initialDelaySec, antiEntropyPeriod,
244 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800245
Jonathan Hartdb3af892015-01-26 13:19:07 -0800246 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
247 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700248 serializer::decode,
249 this::processUpdates,
250 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800251
Jonathan Hartaaa56572015-01-28 21:56:35 -0800252 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
253 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700254 serializer::decode,
255 this::handleAntiEntropyAdvertisement,
Madan Jampani29f52a32016-04-18 15:20:52 -0700256 serializer::encode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700257 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800258
Madan Jampania8f919e2016-04-18 16:47:35 -0700259 if (!tombstonesDisabled) {
260 previousTombstonePurgeTime = 0;
261 this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
262 initialDelaySec,
263 antiEntropyPeriod,
264 TimeUnit.SECONDS);
265 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700266
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700267 this.tombstonesDisabled = tombstonesDisabled;
268 this.lightweightAntiEntropy = !convergeFaster;
Madan Jampanie1356282015-03-10 19:05:36 -0700269 }
270
Jonathan Hartdb3af892015-01-26 13:19:07 -0800271 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
272 return new KryoSerializer() {
273 @Override
274 protected void setupKryoPool() {
275 // Add the map's internal helper classes to the user-supplied serializer
276 serializerPool = builder
Madan Jampani3d76c942015-06-29 23:37:10 -0700277 .register(KryoNamespaces.BASIC)
Madan Jampanidb5d06a2015-06-30 11:16:48 -0700278 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
Madan Jampani3e033bd2015-04-08 13:03:49 -0700279 .register(LogicalTimestamp.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800280 .register(WallClockTimestamp.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800281 .register(AntiEntropyAdvertisement.class)
Madan Jampani29f52a32016-04-18 15:20:52 -0700282 .register(AntiEntropyResponse.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700283 .register(UpdateEntry.class)
284 .register(MapValue.class)
285 .register(MapValue.Digest.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800286 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800287 }
288 };
289 }
290
291 @Override
Madan Jampania090a112016-01-18 16:38:17 -0800292 public String name() {
293 return mapName;
294 }
295
296 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800297 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800298 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700299 // TODO: Maintain a separate counter for tracking live elements in map.
300 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800301 }
302
303 @Override
304 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800305 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700306 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307 }
308
309 @Override
310 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800311 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800312 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700313 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800314 }
315
316 @Override
317 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800318 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800319 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700320 return items.values()
321 .stream()
322 .filter(MapValue::isAlive)
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700323 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800324 }
325
326 @Override
327 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800328 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800329 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800330
Madan Jampani3d76c942015-06-29 23:37:10 -0700331 MapValue<V> value = items.get(key);
332 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800333 }
334
335 @Override
336 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800337 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800338 checkNotNull(key, ERROR_NULL_KEY);
339 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800340
Madan Jampani3d76c942015-06-29 23:37:10 -0700341 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700342 if (putInternal(key, newValue)) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700343 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700344 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800345 }
346 }
347
Jonathan Hartdb3af892015-01-26 13:19:07 -0800348 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700349 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800350 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800351 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700352 return removeAndNotify(key, null);
353 }
354
355 @Override
356 public void remove(K key, V value) {
357 checkState(!destroyed, destroyedMessage);
358 checkNotNull(key, ERROR_NULL_KEY);
359 checkNotNull(value, ERROR_NULL_VALUE);
360 removeAndNotify(key, value);
361 }
362
363 private V removeAndNotify(K key, V value) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700364 Timestamp timestamp = timestampProvider.apply(key, value);
365 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
366 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700367 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700368 if (previousValue != null) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700369 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
370 peerUpdateFunction.apply(key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700371 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700372 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700373 }
374 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700375 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800376 }
377
Madan Jampani483d0a22015-08-19 17:33:00 -0700378 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700379 checkState(!destroyed, destroyedMessage);
380 checkNotNull(key, ERROR_NULL_KEY);
381 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani483d0a22015-08-19 17:33:00 -0700382 tombstone.ifPresent(v -> checkState(v.isTombstone()));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700383
384 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700385 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700386 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700387 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700388 boolean valueMatches = true;
389 if (value.isPresent() && existing != null && existing.isAlive()) {
390 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700391 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700392 if (existing == null) {
Jonathan Hart4a29c592015-09-23 17:55:07 -0700393 log.trace("ECMap Remove: Existing value for key {} is already null", k);
Brian O'Connor6325dad2015-07-07 15:36:29 -0700394 }
Madan Jampani483d0a22015-08-19 17:33:00 -0700395 if (valueMatches) {
396 if (existing == null) {
397 updated.set(tombstone.isPresent());
398 } else {
399 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
400 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700401 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700402 if (updated.get()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700403 previousValue.set(existing);
404 return tombstone.orElse(null);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700405 } else {
406 return existing;
407 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700408 });
Madan Jampanid13f3b82015-07-01 17:37:50 -0700409 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800410 }
411
412 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700413 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
414 checkState(!destroyed, destroyedMessage);
415 checkNotNull(key, ERROR_NULL_KEY);
416 checkNotNull(recomputeFunction, "Recompute function cannot be null");
417
418 AtomicBoolean updated = new AtomicBoolean(false);
419 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
420 MapValue<V> computedValue = items.compute(key, (k, mv) -> {
421 previousValue.set(mv);
422 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
423 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
424 if (mv == null || newValue.isNewerThan(mv)) {
425 updated.set(true);
426 return newValue;
427 } else {
428 return mv;
429 }
430 });
431 if (updated.get()) {
432 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
433 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
434 V value = computedValue.isTombstone()
435 ? previousValue.get() == null ? null : previousValue.get().get()
436 : computedValue.get();
437 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700438 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700439 }
440 }
441 return computedValue.get();
442 }
443
444 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800445 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800446 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800447 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800448 }
449
450 @Override
451 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800452 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700453 Maps.filterValues(items, MapValue::isAlive)
454 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455 }
456
457 @Override
458 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800459 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700460 return Maps.filterValues(items, MapValue::isAlive)
461 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800462 }
463
464 @Override
465 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800466 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700467 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800468 }
469
470 @Override
471 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800472 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700473 return Maps.filterValues(items, MapValue::isAlive)
474 .entrySet()
475 .stream()
476 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
477 .collect(Collectors.toSet());
478 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800479
Madan Jampani3d76c942015-06-29 23:37:10 -0700480 /**
481 * Returns true if newValue was accepted i.e. map is updated.
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700482 *
Madan Jampani3d76c942015-06-29 23:37:10 -0700483 * @param key key
484 * @param newValue proposed new value
485 * @return true if update happened; false if map already contains a more recent value for the key
486 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700487 private boolean putInternal(K key, MapValue<V> newValue) {
488 checkState(!destroyed, destroyedMessage);
489 checkNotNull(key, ERROR_NULL_KEY);
490 checkNotNull(newValue, ERROR_NULL_VALUE);
491 checkState(newValue.isAlive());
492 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700493 AtomicBoolean updated = new AtomicBoolean(false);
494 items.compute(key, (k, existing) -> {
495 if (existing == null || newValue.isNewerThan(existing)) {
496 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700497 return newValue;
498 }
499 return existing;
500 });
Madan Jampani3d76c942015-06-29 23:37:10 -0700501 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800502 }
503
504 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800505 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800506 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800507
508 listeners.add(checkNotNull(listener));
509 }
510
511 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800512 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800513 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800514
515 listeners.remove(checkNotNull(listener));
516 }
517
518 @Override
Madan Jampanifa242182016-01-22 13:42:54 -0800519 public CompletableFuture<Void> destroy() {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800520 destroyed = true;
521
522 executor.shutdown();
523 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800524 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800525
Jonathan Hart584d2f32015-01-27 19:46:14 -0800526 listeners.clear();
527
Jonathan Hartdb3af892015-01-26 13:19:07 -0800528 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800529 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Madan Jampanifa242182016-01-22 13:42:54 -0800530 return CompletableFuture.completedFuture(null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800531 }
532
Jonathan Hartaaa56572015-01-28 21:56:35 -0800533 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700534 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800535 }
536
Madan Jampani3d76c942015-06-29 23:37:10 -0700537 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800538 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800539 }
540
Madan Jampani3d76c942015-06-29 23:37:10 -0700541 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800542 if (peers == null) {
543 // we have no friends :(
544 return;
545 }
546 peers.forEach(node ->
Jonathan Hart9a426f82015-09-03 15:43:13 +0200547 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800548 );
549 }
550
Jonathan Hart233a18a2015-03-02 17:24:58 -0800551 private boolean underHighLoad() {
552 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
553 }
554
Madan Jampani3d76c942015-06-29 23:37:10 -0700555 private void sendAdvertisement() {
556 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700557 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800558 return;
559 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700560 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
561 } catch (Exception e) {
562 // Catch all exceptions to avoid scheduled task being suppressed.
563 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800564 }
565 }
566
Madan Jampani3d76c942015-06-29 23:37:10 -0700567 private Optional<NodeId> pickRandomActivePeer() {
568 List<NodeId> activePeers = clusterService.getNodes()
569 .stream()
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700570 .map(ControllerNode::id)
571 .filter(id -> !localNodeId.equals(id))
Thomas Vachuska7a8de842016-03-07 20:56:35 -0800572 .filter(id -> clusterService.getState(id).isActive())
Madan Jampani3d76c942015-06-29 23:37:10 -0700573 .collect(Collectors.toList());
574 Collections.shuffle(activePeers);
575 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
576 }
577
578 private void sendAdvertisementToPeer(NodeId peer) {
Madan Jampani29f52a32016-04-18 15:20:52 -0700579 AntiEntropyAdvertisement<K> ad = createAdvertisement();
580 clusterCommunicator.sendAndReceive(ad,
Madan Jampani3d76c942015-06-29 23:37:10 -0700581 antiEntropyAdvertisementSubject,
582 serializer::encode,
Madan Jampani29f52a32016-04-18 15:20:52 -0700583 serializer::decode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700584 peer)
585 .whenComplete((result, error) -> {
586 if (error != null) {
Madan Jampania0ac4872015-07-02 11:23:49 -0700587 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
Madan Jampani29f52a32016-04-18 15:20:52 -0700588 } else if (result == AntiEntropyResponse.PROCESSED) {
589 antiEntropyTimes.put(peer, ad.creationTime());
Madan Jampani3d76c942015-06-29 23:37:10 -0700590 }
591 });
592 }
593
Jonathan Hartaaa56572015-01-28 21:56:35 -0800594 private AntiEntropyAdvertisement<K> createAdvertisement() {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700595 return new AntiEntropyAdvertisement<K>(localNodeId,
596 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800597 }
598
Madan Jampani29f52a32016-04-18 15:20:52 -0700599 private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700600 if (destroyed || underHighLoad()) {
Madan Jampani29f52a32016-04-18 15:20:52 -0700601 return AntiEntropyResponse.IGNORED;
Madan Jampani3d76c942015-06-29 23:37:10 -0700602 }
603 try {
Jonathan Hart9a426f82015-09-03 15:43:13 +0200604 if (log.isTraceEnabled()) {
605 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
606 mapName, ad.sender(), ad.digest().size());
607 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700608 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800609
Madan Jampani3d76c942015-06-29 23:37:10 -0700610 if (!lightweightAntiEntropy) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700611 // if remote ad has any entries that the local copy is missing, actively sync
612 // TODO: Missing keys is not the way local copy can be behind.
613 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700614 // TODO: Send ad for missing keys and for entries that are stale
615 sendAdvertisementToPeer(ad.sender());
Jonathan Hartf893be82015-02-24 17:35:51 -0800616 }
Jonathan Hartaaa56572015-01-28 21:56:35 -0800617 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700618 } catch (Exception e) {
619 log.warn("Error handling anti-entropy advertisement", e);
Madan Jampani29f52a32016-04-18 15:20:52 -0700620 return AntiEntropyResponse.FAILED;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800621 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700622 return AntiEntropyResponse.PROCESSED;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800623 }
624
625 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700626 * Processes anti-entropy ad from peer by taking following actions:
627 * 1. If peer has an old entry, updates peer.
628 * 2. If peer indicates an entry is removed and has a more recent
629 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800630 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800631 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
632 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700633 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800634 final NodeId sender = ad.sender();
Madan Jampani3d76c942015-06-29 23:37:10 -0700635 items.forEach((key, localValue) -> {
636 MapValue.Digest remoteValueDigest = ad.digest().get(key);
637 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800638 // local value is more recent, push to sender
Madan Jampani3d76c942015-06-29 23:37:10 -0700639 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700640 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700641 if (remoteValueDigest != null
642 && remoteValueDigest.isNewerThan(localValue.digest())
643 && remoteValueDigest.isTombstone()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700644 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
Madan Jampanid13f3b82015-07-01 17:37:50 -0700645 MapValue<V> previousValue = removeInternal(key,
646 Optional.empty(),
Madan Jampani483d0a22015-08-19 17:33:00 -0700647 Optional.of(tombstone));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700648 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700649 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800650 }
651 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700652 });
Jonathan Hartaaa56572015-01-28 21:56:35 -0800653 return externalEvents;
654 }
655
Madan Jampani29f52a32016-04-18 15:20:52 -0700656 private void purgeTombstones() {
657 /*
658 * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
659 * of tombstones we employ the following heuristic to purge old tombstones periodically.
660 * First, we keep track of the time (local system time) when we were able to have a successful
661 * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
662 * as the time before which all tombstones are considered safe to purge.
663 */
Madan Jampania8f919e2016-04-18 16:47:35 -0700664 long currentSafeTombstonePurgeTime = clusterService.getNodes()
665 .stream()
666 .map(ControllerNode::id)
667 .filter(id -> !id.equals(localNodeId))
668 .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
669 .reduce(Math::min)
670 .orElse(0L);
Madan Jampani29f52a32016-04-18 15:20:52 -0700671 if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
672 return;
673 }
674 List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
675 .stream()
676 .filter(e -> e.getValue().isTombstone())
677 .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
678 .collect(Collectors.toList());
679 previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
680 tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
681 }
682
Madan Jampani3d76c942015-06-29 23:37:10 -0700683 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
684 if (destroyed) {
685 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800686 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700687 updates.forEach(update -> {
688 final K key = update.key();
Madan Jampani29f52a32016-04-18 15:20:52 -0700689 final MapValue<V> value = update.value() == null ? null : update.value().copy();
Madan Jampani483d0a22015-08-19 17:33:00 -0700690 if (value == null || value.isTombstone()) {
691 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700692 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700693 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700694 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700695 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700696 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800697 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700698 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800699 }
700
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800701 // TODO pull this into the class if this gets pulled out...
702 private static final int DEFAULT_MAX_EVENTS = 1000;
703 private static final int DEFAULT_MAX_IDLE_MS = 10;
704 private static final int DEFAULT_MAX_BATCH_MS = 50;
705 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800706
Madan Jampani3d76c942015-06-29 23:37:10 -0700707 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800708
709 private final NodeId peer;
710
711 private EventAccumulator(NodeId peer) {
712 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
713 this.peer = peer;
714 }
715
716 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700717 public void processItems(List<UpdateEntry<K, V>> items) {
718 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
719 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700720 item.isNewerThan(existing) ? item : existing));
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700721 communicationExecutor.execute(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700722 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700723 updateMessageSubject,
724 serializer::encode,
725 peer)
726 .whenComplete((result, error) -> {
727 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700728 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700729 }
730 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800731 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800732 }
733 }
Jonathan Hart4a29c592015-09-23 17:55:07 -0700734}