blob: 762689cab5c6a6a4c36192068d833cec36206125 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jonathan Hartdb3af892015-01-26 13:19:07 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jordan Halterman00e92da2018-05-22 23:05:52 -070018import java.util.ArrayList;
19import java.util.Collection;
20import java.util.Collections;
21import java.util.HashSet;
22import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Optional;
26import java.util.Set;
27import java.util.Timer;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.ExecutionException;
30import java.util.concurrent.ExecutorService;
31import java.util.concurrent.Executors;
32import java.util.concurrent.ScheduledExecutorService;
33import java.util.concurrent.TimeUnit;
34import java.util.concurrent.TimeoutException;
35import java.util.concurrent.atomic.AtomicBoolean;
36import java.util.concurrent.atomic.AtomicInteger;
37import java.util.concurrent.atomic.AtomicReference;
38import java.util.function.BiFunction;
39import java.util.function.Function;
40import java.util.function.Supplier;
41import java.util.stream.Collectors;
42
slowr878625f2017-10-24 14:53:49 -070043import com.google.common.collect.Collections2;
44import com.google.common.collect.ImmutableList;
45import com.google.common.collect.ImmutableMap;
46import com.google.common.collect.Lists;
47import com.google.common.collect.Maps;
48import com.google.common.collect.Sets;
49import org.apache.commons.lang3.exception.ExceptionUtils;
50import org.apache.commons.lang3.tuple.Pair;
51import org.onlab.util.AbstractAccumulator;
52import org.onlab.util.KryoNamespace;
53import org.onlab.util.SlidingWindowCounter;
54import org.onosproject.cluster.NodeId;
55import org.onosproject.persistence.PersistenceService;
56import org.onosproject.store.LogicalTimestamp;
57import org.onosproject.store.Timestamp;
58import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
59import org.onosproject.store.cluster.messaging.MessageSubject;
60import org.onosproject.store.serializers.KryoNamespaces;
61import org.onosproject.store.service.DistributedPrimitive;
62import org.onosproject.store.service.EventuallyConsistentMap;
63import org.onosproject.store.service.EventuallyConsistentMapEvent;
64import org.onosproject.store.service.EventuallyConsistentMapListener;
65import org.onosproject.store.service.Serializer;
66import org.onosproject.store.service.WallClockTimestamp;
67import org.slf4j.Logger;
68import org.slf4j.LoggerFactory;
69
Jonathan Hart9dc5f092016-06-17 15:15:17 -070070import static com.google.common.base.Preconditions.checkNotNull;
71import static com.google.common.base.Preconditions.checkState;
72import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
73import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
74import static org.onlab.util.Tools.groupedThreads;
75import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
76import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Jonathan Hartdb3af892015-01-26 13:19:07 -080077
78/**
79 * Distributed Map implementation which uses optimistic replication and gossip
80 * based techniques to provide an eventually consistent data store.
81 */
82public class EventuallyConsistentMapImpl<K, V>
83 implements EventuallyConsistentMap<K, V> {
84
85 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
slowr878625f2017-10-24 14:53:49 -070086 private static final String ERROR_DESTROYED = " map is already destroyed";
87 private static final String ERROR_NULL_KEY = "Key cannot be null";
88 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
89 private static final int WINDOW_SIZE = 5;
90 private static final int HIGH_LOAD_THRESHOLD = 2;
91 private static final int LOAD_WINDOW = 2;
Jonathan Hartdb3af892015-01-26 13:19:07 -080092
Madan Jampani3d76c942015-06-29 23:37:10 -070093 private final Map<K, MapValue<V>> items;
Jordan Halterman28183ee2017-10-17 17:29:10 -070094 private final ClusterCommunicationService clusterCommunicator;
Jordan Halterman2c83a102017-08-20 17:11:41 -070095 private final Serializer serializer;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070096 private final PersistenceService persistenceService;
Madan Jampanibcf1a482015-06-24 19:05:56 -070097 private final BiFunction<K, V, Timestamp> timestampProvider;
Jordan Halterman12d5ec42017-05-17 00:53:44 -070098 private final MessageSubject bootstrapMessageSubject;
99 private final MessageSubject initializeMessageSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800100 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800101 private final MessageSubject antiEntropyAdvertisementSubject;
Jon Halld198b882016-05-18 16:44:40 -0700102 private final MessageSubject updateRequestSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800103 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -0700104 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800105 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800106 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800107 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700108 private final ExecutorService communicationExecutor;
109 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani29f52a32016-04-18 15:20:52 -0700110 private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700111 private final String mapName;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800112 private final String destroyedMessage;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700113 private final long initialDelaySec = 5;
114 private final boolean lightweightAntiEntropy;
115 private final boolean tombstonesDisabled;
Jonathan Hartca335e92015-03-05 10:34:32 -0800116 private final boolean persistent;
slowr878625f2017-10-24 14:53:49 -0700117 private final Supplier<List<NodeId>> peersSupplier;
118 private final Supplier<List<NodeId>> bootstrapPeersSupplier;
119 private final NodeId localNodeId;
120 private long previousTombstonePurgeTime;
121 private volatile boolean destroyed = false;
122 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700123
Jonathan Hartdb3af892015-01-26 13:19:07 -0800124 /**
125 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800126 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700127 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
128 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800129 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800130 *
slowr878625f2017-10-24 14:53:49 -0700131 * @param localNodeId local node id
132 * @param mapName a String identifier for the map.
133 * @param clusterCommunicator the cluster communications service
134 * @param ns a Kryo namespace that can serialize both K and V
135 * @param timestampProvider provider of timestamps for K and V
136 * @param peerUpdateFunction function that provides a set of nodes to immediately
137 * update to when there writes to the map
138 * @param eventExecutor executor to use for processing incoming events from peers
139 * @param communicationExecutor executor to use for sending events to peers
140 * @param backgroundExecutor executor to use for background anti-entropy tasks
141 * @param tombstonesDisabled true if this map should not maintain tombstones
142 * @param antiEntropyPeriod period that the anti-entropy task should run
143 * @param antiEntropyTimeUnit time unit for anti-entropy period
144 * @param convergeFaster make anti-entropy try to converge faster
145 * @param persistent persist data to disk
146 * @param persistenceService persistence service
147 * @param peersSupplier supplier for peers
148 * @param bootstrapPeersSupplier supplier for bootstrap peers
Jonathan Hartdb3af892015-01-26 13:19:07 -0800149 */
slowr878625f2017-10-24 14:53:49 -0700150 //CHECKSTYLE:OFF
151 EventuallyConsistentMapImpl(
152 NodeId localNodeId,
153 String mapName,
154 ClusterCommunicationService clusterCommunicator,
155 KryoNamespace ns,
156 BiFunction<K, V, Timestamp> timestampProvider,
157 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
158 ExecutorService eventExecutor,
159 ExecutorService communicationExecutor,
160 ScheduledExecutorService backgroundExecutor,
161 boolean tombstonesDisabled,
162 long antiEntropyPeriod,
163 TimeUnit antiEntropyTimeUnit,
164 boolean convergeFaster,
165 boolean persistent,
166 PersistenceService persistenceService,
167 Supplier<List<NodeId>> peersSupplier,
168 Supplier<List<NodeId>> bootstrapPeersSupplier
169 ) {
170 //CHECKSTYLE:ON
171 this.localNodeId = localNodeId;
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700172 this.mapName = mapName;
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700173 this.serializer = createSerializer(ns);
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700174 this.persistenceService = persistenceService;
175 this.persistent =
176 persistent;
177 if (persistent) {
178 items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
Aaron Kruglikov37210412016-12-06 12:55:57 -0800179 .withName(mapName)
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700180 .withSerializer(this.serializer)
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700181 .build();
182 } else {
183 items = Maps.newConcurrentMap();
184 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800185 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700186 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800187
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700188 this.clusterCommunicator = clusterCommunicator;
189
Madan Jampanibcf1a482015-06-24 19:05:56 -0700190 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700191
slowr878625f2017-10-24 14:53:49 -0700192 this.peersSupplier = peersSupplier;
193 this.bootstrapPeersSupplier = bootstrapPeersSupplier;
194
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700195 if (peerUpdateFunction != null) {
slowr878625f2017-10-24 14:53:49 -0700196 this.peerUpdateFunction = peerUpdateFunction.andThen(peers -> peersSupplier.get()
197 .stream()
198 .filter(peers::contains)
199 .collect(Collectors.toList()));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700200 } else {
slowr878625f2017-10-24 14:53:49 -0700201 this.peerUpdateFunction = (key, value) -> peersSupplier.get();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700202 }
203
204 if (eventExecutor != null) {
205 this.executor = eventExecutor;
206 } else {
207 // should be a normal executor; it's used for receiving messages
208 this.executor =
slowr878625f2017-10-24 14:53:49 -0700209 Executors.newFixedThreadPool(8,
210 groupedThreads("onos/ecm", mapName + "-fg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700211 }
212
213 if (communicationExecutor != null) {
214 this.communicationExecutor = communicationExecutor;
215 } else {
216 // sending executor; should be capped
217 //TODO this probably doesn't need to be bounded anymore
218 this.communicationExecutor =
slowr878625f2017-10-24 14:53:49 -0700219 newFixedThreadPool(8,
220 groupedThreads("onos/ecm", mapName + "-publish-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700221 }
222
Jonathan Hartca335e92015-03-05 10:34:32 -0800223
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700224 if (backgroundExecutor != null) {
225 this.backgroundExecutor = backgroundExecutor;
226 } else {
227 this.backgroundExecutor =
slowr878625f2017-10-24 14:53:49 -0700228 newSingleThreadScheduledExecutor(
229 groupedThreads("onos/ecm", mapName + "-bg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700230 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800231
Jonathan Hartaaa56572015-01-28 21:56:35 -0800232 // start anti-entropy thread
slowr878625f2017-10-24 14:53:49 -0700233 this.backgroundExecutor.scheduleAtFixedRate(
234 this::sendAdvertisement,
235 initialDelaySec, antiEntropyPeriod,
236 antiEntropyTimeUnit
237 );
Jonathan Hartaaa56572015-01-28 21:56:35 -0800238
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700239 bootstrapMessageSubject = new MessageSubject("ecm-" + mapName + "-bootstrap");
slowr878625f2017-10-24 14:53:49 -0700240 clusterCommunicator.addSubscriber(
241 bootstrapMessageSubject,
242 serializer::decode,
243 (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
244 serializer::encode
245 );
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700246
247 initializeMessageSubject = new MessageSubject("ecm-" + mapName + "-initialize");
slowr878625f2017-10-24 14:53:49 -0700248 clusterCommunicator.addSubscriber(
249 initializeMessageSubject,
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700250 serializer::decode,
251 (Function<Collection<UpdateEntry<K, V>>, Void>) u -> {
252 processUpdates(u);
253 return null;
254 },
255 serializer::encode,
slowr878625f2017-10-24 14:53:49 -0700256 this.executor
257 );
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700258
Jonathan Hartdb3af892015-01-26 13:19:07 -0800259 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
slowr878625f2017-10-24 14:53:49 -0700260 clusterCommunicator.addSubscriber(
261 updateMessageSubject,
262 serializer::decode,
263 this::processUpdates,
264 this.executor
265 );
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800266
Jonathan Hartaaa56572015-01-28 21:56:35 -0800267 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
slowr878625f2017-10-24 14:53:49 -0700268 clusterCommunicator.addSubscriber(
269 antiEntropyAdvertisementSubject,
270 serializer::decode,
271 this::handleAntiEntropyAdvertisement,
272 serializer::encode,
273 this.backgroundExecutor
274 );
Jonathan Hartdb3af892015-01-26 13:19:07 -0800275
Jon Halld198b882016-05-18 16:44:40 -0700276 updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
slowr878625f2017-10-24 14:53:49 -0700277 clusterCommunicator.addSubscriber(
278 updateRequestSubject,
279 serializer::decode,
280 this::handleUpdateRequests,
281 this.backgroundExecutor
282 );
Jon Halld198b882016-05-18 16:44:40 -0700283
Madan Jampania8f919e2016-04-18 16:47:35 -0700284 if (!tombstonesDisabled) {
285 previousTombstonePurgeTime = 0;
slowr878625f2017-10-24 14:53:49 -0700286 this.backgroundExecutor.scheduleWithFixedDelay(
287 this::purgeTombstones,
288 initialDelaySec,
289 antiEntropyPeriod,
290 TimeUnit.SECONDS
291 );
Madan Jampania8f919e2016-04-18 16:47:35 -0700292 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700293
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700294 this.tombstonesDisabled = tombstonesDisabled;
295 this.lightweightAntiEntropy = !convergeFaster;
Jon Halldabee682016-05-17 11:29:51 -0700296
297 // Initiate first round of Gossip
298 this.bootstrap();
Madan Jampanie1356282015-03-10 19:05:36 -0700299 }
300
Jordan Halterman2c83a102017-08-20 17:11:41 -0700301 private Serializer createSerializer(KryoNamespace ns) {
302 return Serializer.using(KryoNamespace.newBuilder()
slowr878625f2017-10-24 14:53:49 -0700303 .register(ns)
304 // not so robust way to avoid collision with other
305 // user supplied registrations
306 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
307 .register(KryoNamespaces.BASIC)
308 .register(LogicalTimestamp.class)
309 .register(WallClockTimestamp.class)
310 .register(AntiEntropyAdvertisement.class)
311 .register(AntiEntropyResponse.class)
312 .register(UpdateEntry.class)
313 .register(MapValue.class)
314 .register(MapValue.Digest.class)
315 .register(UpdateRequest.class)
316 .build(name() + "-ecmap"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800317 }
318
319 @Override
Madan Jampania090a112016-01-18 16:38:17 -0800320 public String name() {
321 return mapName;
322 }
323
324 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800325 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800326 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700327 // TODO: Maintain a separate counter for tracking live elements in map.
328 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800329 }
330
331 @Override
332 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800333 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700334 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800335 }
336
337 @Override
338 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800339 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800340 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700341 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800342 }
343
344 @Override
345 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800346 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800347 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700348 return items.values()
slowr878625f2017-10-24 14:53:49 -0700349 .stream()
350 .filter(MapValue::isAlive)
351 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800352 }
353
354 @Override
355 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800356 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800357 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800358
Madan Jampani3d76c942015-06-29 23:37:10 -0700359 MapValue<V> value = items.get(key);
360 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800361 }
362
363 @Override
364 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800365 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800366 checkNotNull(key, ERROR_NULL_KEY);
367 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800368
Madan Jampani3d76c942015-06-29 23:37:10 -0700369 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700370 if (putInternal(key, newValue)) {
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700371 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700372 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800373 }
374 }
375
Jonathan Hartdb3af892015-01-26 13:19:07 -0800376 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700377 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800378 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800379 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700380 return removeAndNotify(key, null);
381 }
382
383 @Override
384 public void remove(K key, V value) {
385 checkState(!destroyed, destroyedMessage);
386 checkNotNull(key, ERROR_NULL_KEY);
387 checkNotNull(value, ERROR_NULL_VALUE);
388 removeAndNotify(key, value);
389 }
390
391 private V removeAndNotify(K key, V value) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700392 Timestamp timestamp = timestampProvider.apply(key, value);
393 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
394 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700395 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700396 if (previousValue != null) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700397 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
slowr878625f2017-10-24 14:53:49 -0700398 peerUpdateFunction.apply(key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700399 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700400 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700401 }
402 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700403 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800404 }
405
Madan Jampani483d0a22015-08-19 17:33:00 -0700406 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700407 checkState(!destroyed, destroyedMessage);
408 checkNotNull(key, ERROR_NULL_KEY);
409 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani483d0a22015-08-19 17:33:00 -0700410 tombstone.ifPresent(v -> checkState(v.isTombstone()));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700411
412 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700413 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700414 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700415 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700416 boolean valueMatches = true;
417 if (value.isPresent() && existing != null && existing.isAlive()) {
418 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700419 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700420 if (existing == null) {
Jonathan Hart4a29c592015-09-23 17:55:07 -0700421 log.trace("ECMap Remove: Existing value for key {} is already null", k);
Brian O'Connor6325dad2015-07-07 15:36:29 -0700422 }
Madan Jampani483d0a22015-08-19 17:33:00 -0700423 if (valueMatches) {
424 if (existing == null) {
425 updated.set(tombstone.isPresent());
426 } else {
427 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
428 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700429 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700430 if (updated.get()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700431 previousValue.set(existing);
432 return tombstone.orElse(null);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700433 } else {
434 return existing;
435 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700436 });
Madan Jampanid13f3b82015-07-01 17:37:50 -0700437 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800438 }
439
440 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700441 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
442 checkState(!destroyed, destroyedMessage);
443 checkNotNull(key, ERROR_NULL_KEY);
444 checkNotNull(recomputeFunction, "Recompute function cannot be null");
445
446 AtomicBoolean updated = new AtomicBoolean(false);
447 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampanidc012972016-04-25 11:13:26 -0700448 MapValue<V> computedValue = items.compute(serializer.copy(key), (k, mv) -> {
Madan Jampani4727a112015-07-16 12:12:58 -0700449 previousValue.set(mv);
450 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
Brian O'Connor05acd642016-08-03 00:42:35 -0700451 if (mv != null && Objects.equals(newRawValue, mv.get())) {
452 // value was not updated
453 return mv;
454 }
Madan Jampani4727a112015-07-16 12:12:58 -0700455 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
456 if (mv == null || newValue.isNewerThan(mv)) {
457 updated.set(true);
Madan Jampanidc012972016-04-25 11:13:26 -0700458 // We return a copy to ensure updates to peers can be serialized.
459 // This prevents replica divergence due to serialization failures.
460 return serializer.copy(newValue);
Madan Jampani4727a112015-07-16 12:12:58 -0700461 } else {
462 return mv;
463 }
464 });
465 if (updated.get()) {
466 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
467 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
468 V value = computedValue.isTombstone()
469 ? previousValue.get() == null ? null : previousValue.get().get()
470 : computedValue.get();
471 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700472 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700473 }
474 }
475 return computedValue.get();
476 }
477
478 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800479 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800480 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800481 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800482 }
483
484 @Override
485 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800486 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700487 Maps.filterValues(items, MapValue::isAlive)
slowr878625f2017-10-24 14:53:49 -0700488 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800489 }
490
491 @Override
492 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800493 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700494 return Maps.filterValues(items, MapValue::isAlive)
slowr878625f2017-10-24 14:53:49 -0700495 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800496 }
497
498 @Override
499 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800500 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700501 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800502 }
503
504 @Override
505 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800506 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700507 return Maps.filterValues(items, MapValue::isAlive)
slowr878625f2017-10-24 14:53:49 -0700508 .entrySet()
509 .stream()
510 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
511 .collect(Collectors.toSet());
Madan Jampani3d76c942015-06-29 23:37:10 -0700512 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800513
Madan Jampani3d76c942015-06-29 23:37:10 -0700514 /**
515 * Returns true if newValue was accepted i.e. map is updated.
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700516 *
slowr878625f2017-10-24 14:53:49 -0700517 * @param key key
Madan Jampani3d76c942015-06-29 23:37:10 -0700518 * @param newValue proposed new value
519 * @return true if update happened; false if map already contains a more recent value for the key
520 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700521 private boolean putInternal(K key, MapValue<V> newValue) {
522 checkState(!destroyed, destroyedMessage);
523 checkNotNull(key, ERROR_NULL_KEY);
524 checkNotNull(newValue, ERROR_NULL_VALUE);
525 checkState(newValue.isAlive());
526 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700527 AtomicBoolean updated = new AtomicBoolean(false);
528 items.compute(key, (k, existing) -> {
529 if (existing == null || newValue.isNewerThan(existing)) {
530 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700531 return newValue;
532 }
533 return existing;
534 });
Madan Jampani3d76c942015-06-29 23:37:10 -0700535 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800536 }
537
538 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800539 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800540 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800541
542 listeners.add(checkNotNull(listener));
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700543 items.forEach((k, v) -> {
544 if (v.isAlive()) {
545 listener.event(new EventuallyConsistentMapEvent<K, V>(mapName, PUT, k, v.get()));
546 }
547 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800548 }
549
550 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800551 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800552 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800553
554 listeners.remove(checkNotNull(listener));
555 }
556
557 @Override
Madan Jampanifa242182016-01-22 13:42:54 -0800558 public CompletableFuture<Void> destroy() {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800559 destroyed = true;
560
561 executor.shutdown();
562 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800563 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800564
Jonathan Hart584d2f32015-01-27 19:46:14 -0800565 listeners.clear();
566
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700567 clusterCommunicator.removeSubscriber(bootstrapMessageSubject);
568 clusterCommunicator.removeSubscriber(initializeMessageSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800569 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jon Halld198b882016-05-18 16:44:40 -0700570 clusterCommunicator.removeSubscriber(updateRequestSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800571 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Madan Jampanifa242182016-01-22 13:42:54 -0800572 return CompletableFuture.completedFuture(null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800573 }
574
Jonathan Hartaaa56572015-01-28 21:56:35 -0800575 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700576 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800577 }
578
Madan Jampani3d76c942015-06-29 23:37:10 -0700579 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800580 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800581 }
582
Madan Jampani3d76c942015-06-29 23:37:10 -0700583 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800584 if (peers == null) {
585 // we have no friends :(
586 return;
587 }
588 peers.forEach(node ->
slowr878625f2017-10-24 14:53:49 -0700589 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800590 );
591 }
592
Jonathan Hart233a18a2015-03-02 17:24:58 -0800593 private boolean underHighLoad() {
594 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
595 }
596
Madan Jampani3d76c942015-06-29 23:37:10 -0700597 private void sendAdvertisement() {
598 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700599 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800600 return;
601 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700602 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
603 } catch (Exception e) {
604 // Catch all exceptions to avoid scheduled task being suppressed.
605 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800606 }
607 }
608
Madan Jampani3d76c942015-06-29 23:37:10 -0700609 private Optional<NodeId> pickRandomActivePeer() {
slowr878625f2017-10-24 14:53:49 -0700610 List<NodeId> activePeers = peersSupplier.get();
Madan Jampani3d76c942015-06-29 23:37:10 -0700611 Collections.shuffle(activePeers);
slowr878625f2017-10-24 14:53:49 -0700612 return activePeers.stream().findFirst();
Madan Jampani3d76c942015-06-29 23:37:10 -0700613 }
614
615 private void sendAdvertisementToPeer(NodeId peer) {
Madan Jampaniee35d552016-04-26 16:07:40 -0700616 long adCreationTime = System.currentTimeMillis();
Madan Jampani29f52a32016-04-18 15:20:52 -0700617 AntiEntropyAdvertisement<K> ad = createAdvertisement();
618 clusterCommunicator.sendAndReceive(ad,
Madan Jampani3d76c942015-06-29 23:37:10 -0700619 antiEntropyAdvertisementSubject,
620 serializer::encode,
Madan Jampani29f52a32016-04-18 15:20:52 -0700621 serializer::decode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700622 peer)
623 .whenComplete((result, error) -> {
624 if (error != null) {
Jonathan Harte255cc42016-09-12 14:50:24 -0700625 log.debug("Failed to send anti-entropy advertisement to {}: {}",
626 peer, error.getMessage());
Madan Jampani29f52a32016-04-18 15:20:52 -0700627 } else if (result == AntiEntropyResponse.PROCESSED) {
Madan Jampaniee35d552016-04-26 16:07:40 -0700628 antiEntropyTimes.put(peer, adCreationTime);
Madan Jampani3d76c942015-06-29 23:37:10 -0700629 }
630 });
631 }
632
Jon Halld198b882016-05-18 16:44:40 -0700633 private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
634 UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
635 clusterCommunicator.unicast(request,
636 updateRequestSubject,
637 serializer::encode,
638 peer)
639 .whenComplete((result, error) -> {
640 if (error != null) {
Jonathan Harte255cc42016-09-12 14:50:24 -0700641 log.debug("Failed to send update request to {}: {}",
642 peer, error.getMessage());
Jon Halld198b882016-05-18 16:44:40 -0700643 }
644 });
645 }
646
Jonathan Hartaaa56572015-01-28 21:56:35 -0800647 private AntiEntropyAdvertisement<K> createAdvertisement() {
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700648 return new AntiEntropyAdvertisement<>(localNodeId,
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700649 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800650 }
651
Madan Jampani29f52a32016-04-18 15:20:52 -0700652 private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700653 if (destroyed || underHighLoad()) {
Madan Jampani29f52a32016-04-18 15:20:52 -0700654 return AntiEntropyResponse.IGNORED;
Madan Jampani3d76c942015-06-29 23:37:10 -0700655 }
656 try {
Jonathan Hart9a426f82015-09-03 15:43:13 +0200657 if (log.isTraceEnabled()) {
658 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
Jon Halld198b882016-05-18 16:44:40 -0700659 ad.sender(), mapName, ad.digest().size());
Jonathan Hart9a426f82015-09-03 15:43:13 +0200660 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700661 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Madan Jampani3d76c942015-06-29 23:37:10 -0700662 } catch (Exception e) {
663 log.warn("Error handling anti-entropy advertisement", e);
Madan Jampani29f52a32016-04-18 15:20:52 -0700664 return AntiEntropyResponse.FAILED;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800665 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700666 return AntiEntropyResponse.PROCESSED;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800667 }
668
669 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700670 * Processes anti-entropy ad from peer by taking following actions:
671 * 1. If peer has an old entry, updates peer.
672 * 2. If peer indicates an entry is removed and has a more recent
673 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800674 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800675 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
676 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700677 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800678 final NodeId sender = ad.sender();
Jon Halld198b882016-05-18 16:44:40 -0700679 final List<NodeId> peers = ImmutableList.of(sender);
680 Set<K> staleOrMissing = new HashSet<>();
681 Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());
682
Madan Jampani3d76c942015-06-29 23:37:10 -0700683 items.forEach((key, localValue) -> {
Jon Halld198b882016-05-18 16:44:40 -0700684 locallyUnknown.remove(key);
Madan Jampani3d76c942015-06-29 23:37:10 -0700685 MapValue.Digest remoteValueDigest = ad.digest().get(key);
686 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800687 // local value is more recent, push to sender
Jon Halld198b882016-05-18 16:44:40 -0700688 queueUpdate(new UpdateEntry<>(key, localValue), peers);
slowr878625f2017-10-24 14:53:49 -0700689 } else if (remoteValueDigest.isNewerThan(localValue.digest()) && remoteValueDigest.isTombstone()) {
Jon Halld198b882016-05-18 16:44:40 -0700690 // remote value is more recent and a tombstone: update local value
Madan Jampani483d0a22015-08-19 17:33:00 -0700691 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
Madan Jampanid13f3b82015-07-01 17:37:50 -0700692 MapValue<V> previousValue = removeInternal(key,
slowr878625f2017-10-24 14:53:49 -0700693 Optional.empty(),
694 Optional.of(tombstone));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700695 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700696 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800697 }
Jon Halld198b882016-05-18 16:44:40 -0700698 } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
699 // Not a tombstone and remote is newer
700 staleOrMissing.add(key);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800701 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700702 });
Jon Halld198b882016-05-18 16:44:40 -0700703 // Keys missing in local map
704 staleOrMissing.addAll(locallyUnknown);
705 // Request updates that we missed out on
706 sendUpdateRequestToPeer(sender, staleOrMissing);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800707 return externalEvents;
708 }
709
Jon Halld198b882016-05-18 16:44:40 -0700710 private void handleUpdateRequests(UpdateRequest<K> request) {
711 final Set<K> keys = request.keys();
712 final NodeId sender = request.sender();
713 final List<NodeId> peers = ImmutableList.of(sender);
Jon Halld198b882016-05-18 16:44:40 -0700714 keys.forEach(key ->
slowr878625f2017-10-24 14:53:49 -0700715 queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
Jon Halld198b882016-05-18 16:44:40 -0700716 );
717 }
718
Madan Jampani29f52a32016-04-18 15:20:52 -0700719 private void purgeTombstones() {
720 /*
Jon Halld198b882016-05-18 16:44:40 -0700721 * In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
Madan Jampani29f52a32016-04-18 15:20:52 -0700722 * of tombstones we employ the following heuristic to purge old tombstones periodically.
723 * First, we keep track of the time (local system time) when we were able to have a successful
724 * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
725 * as the time before which all tombstones are considered safe to purge.
726 */
slowr878625f2017-10-24 14:53:49 -0700727 long currentSafeTombstonePurgeTime = peersSupplier.get()
728 .stream()
729 .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
730 .reduce(Math::min)
731 .orElse(0L);
Madan Jampani29f52a32016-04-18 15:20:52 -0700732 if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
733 return;
734 }
735 List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
slowr878625f2017-10-24 14:53:49 -0700736 .stream()
737 .filter(e -> e.getValue().isTombstone())
738 .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
739 .collect(Collectors.toList());
Madan Jampani29f52a32016-04-18 15:20:52 -0700740 previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
741 tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
742 }
743
Madan Jampani3d76c942015-06-29 23:37:10 -0700744 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
745 if (destroyed) {
746 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800747 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700748 updates.forEach(update -> {
749 final K key = update.key();
Madan Jampani29f52a32016-04-18 15:20:52 -0700750 final MapValue<V> value = update.value() == null ? null : update.value().copy();
Madan Jampani483d0a22015-08-19 17:33:00 -0700751 if (value == null || value.isTombstone()) {
752 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700753 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700754 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700755 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700756 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700757 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800758 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700759 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800760 }
761
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700762 /**
763 * Bootstraps the map to attempt to get in sync with existing instances of the same map on other nodes in the
764 * cluster. This is necessary to ensure that a read immediately after the map is created doesn't return a null
765 * value.
766 */
Jon Halldabee682016-05-17 11:29:51 -0700767 private void bootstrap() {
slowr878625f2017-10-24 14:53:49 -0700768 List<NodeId> activePeers = bootstrapPeersSupplier.get();
Jon Halldabee682016-05-17 11:29:51 -0700769 if (activePeers.isEmpty()) {
770 return;
771 }
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700772 try {
773 requestBootstrapFromPeers(activePeers)
Jordan Halterman6440b092017-05-24 17:48:08 -0700774 .get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
slowr878625f2017-10-24 14:53:49 -0700775 } catch (ExecutionException | InterruptedException | TimeoutException e) {
776 log.debug("Failed to bootstrap ec map {}: {}", mapName, ExceptionUtils.getStackTrace(e));
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700777 }
778 }
779
780 /**
781 * Requests all updates from each peer in the provided list of peers.
782 * <p>
783 * The returned future will be completed once at least one peer bootstraps this map or bootstrap requests to all
784 * peers fail.
785 *
786 * @param peers the list of peers from which to request updates
787 * @return a future to be completed once updates have been received from at least one peer
788 */
789 private CompletableFuture<Void> requestBootstrapFromPeers(List<NodeId> peers) {
790 if (peers.isEmpty()) {
791 return CompletableFuture.completedFuture(null);
Jon Halldabee682016-05-17 11:29:51 -0700792 }
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700793 CompletableFuture<Void> future = new CompletableFuture<>();
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700794 final int totalPeers = peers.size();
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700795 AtomicBoolean successful = new AtomicBoolean();
796 AtomicInteger totalCount = new AtomicInteger();
797 AtomicReference<Throwable> lastError = new AtomicReference<>();
798
799 // Iterate through all of the peers and send a bootstrap request. On the first peer that returns
800 // a successful bootstrap response, complete the future. Otherwise, if no peers respond with any
801 // successful bootstrap response, the future will be completed with the last exception.
802 for (NodeId peer : peers) {
803 requestBootstrapFromPeer(peer).whenComplete((result, error) -> {
804 if (error == null) {
805 if (successful.compareAndSet(false, true)) {
806 future.complete(null);
807 } else if (totalCount.incrementAndGet() == totalPeers) {
808 Throwable e = lastError.get();
809 if (e != null) {
810 future.completeExceptionally(e);
811 }
812 }
813 } else {
814 if (!successful.get() && totalCount.incrementAndGet() == totalPeers) {
815 future.completeExceptionally(error);
816 } else {
817 lastError.set(error);
818 }
819 }
820 });
Jon Halldabee682016-05-17 11:29:51 -0700821 }
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700822 return future;
823 }
824
825 /**
826 * Requests a bootstrap from the given peer.
827 *
828 * @param peer the peer from which to request updates
829 * @return a future to be completed once the peer has sent bootstrap updates
830 */
831 private CompletableFuture<Void> requestBootstrapFromPeer(NodeId peer) {
slowr878625f2017-10-24 14:53:49 -0700832 log.trace("Sending bootstrap request to {} for {}", peer, mapName);
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700833 return clusterCommunicator.<NodeId, Void>sendAndReceive(
834 localNodeId,
835 bootstrapMessageSubject,
836 serializer::encode,
837 serializer::decode,
838 peer)
839 .whenComplete((updates, error) -> {
840 if (error != null) {
841 log.debug("Bootstrap request to {} failed: {}", peer, error.getMessage());
842 }
843 });
844 }
845
846 /**
847 * Handles a bootstrap request from a peer.
848 * <p>
849 * When handling a bootstrap request from a peer, the node sends batches of entries back to the peer and
850 * completes the bootstrap request once all batches have been received and processed.
851 *
852 * @param peer the peer that sent the bootstrap request
853 * @return a future to be completed once updates have been sent to the peer
854 */
855 private CompletableFuture<Void> handleBootstrap(NodeId peer) {
slowr878625f2017-10-24 14:53:49 -0700856 log.trace("Received bootstrap request from {} for {}", peer, bootstrapMessageSubject);
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700857
858 Function<List<UpdateEntry<K, V>>, CompletableFuture<Void>> sendUpdates = updates -> {
859 log.trace("Initializing {} with {} entries", peer, updates.size());
860 return clusterCommunicator.<List<UpdateEntry<K, V>>, Void>sendAndReceive(
861 ImmutableList.copyOf(updates),
862 initializeMessageSubject,
863 serializer::encode,
864 serializer::decode,
865 peer)
866 .whenComplete((result, error) -> {
867 if (error != null) {
868 log.debug("Failed to initialize {}", peer, error);
869 }
870 });
871 };
872
873 List<CompletableFuture<Void>> futures = Lists.newArrayList();
874 List<UpdateEntry<K, V>> updates = Lists.newArrayList();
875 for (Map.Entry<K, MapValue<V>> entry : items.entrySet()) {
876 K key = entry.getKey();
877 MapValue<V> value = entry.getValue();
878 if (value.isAlive()) {
slowr878625f2017-10-24 14:53:49 -0700879 updates.add(new UpdateEntry<>(key, value));
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700880 if (updates.size() == DEFAULT_MAX_EVENTS) {
881 futures.add(sendUpdates.apply(updates));
882 updates = new ArrayList<>();
883 }
884 }
885 }
886
887 if (!updates.isEmpty()) {
888 futures.add(sendUpdates.apply(updates));
889 }
890 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
Jon Halldabee682016-05-17 11:29:51 -0700891 }
892
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800893 // TODO pull this into the class if this gets pulled out...
894 private static final int DEFAULT_MAX_EVENTS = 1000;
895 private static final int DEFAULT_MAX_IDLE_MS = 10;
896 private static final int DEFAULT_MAX_BATCH_MS = 50;
897 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800898
Madan Jampani3d76c942015-06-29 23:37:10 -0700899 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800900
901 private final NodeId peer;
902
903 private EventAccumulator(NodeId peer) {
904 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
905 this.peer = peer;
906 }
907
908 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700909 public void processItems(List<UpdateEntry<K, V>> items) {
910 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
911 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700912 item.isNewerThan(existing) ? item : existing));
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700913 communicationExecutor.execute(() -> {
Jordan Haltermane0fae912017-05-18 12:52:51 -0700914 try {
915 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
916 updateMessageSubject,
917 serializer::encode,
918 peer)
919 .whenComplete((result, error) -> {
920 if (error != null) {
921 log.debug("Failed to send to {}", peer, error);
922 }
923 });
924 } catch (Exception e) {
925 log.warn("Failed to send to {}", peer, e);
926 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800927 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800928 }
929 }
Jonathan Hart4a29c592015-09-23 17:55:07 -0700930}