blob: bed19d5591b1d9d00a6f32a1a545d9516f85571a [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jonathan Hartdb3af892015-01-26 13:19:07 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hart9dc5f092016-06-17 15:15:17 -070018import com.google.common.collect.Collections2;
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.ImmutableMap;
21import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
Madan Jampanif4c88502016-01-21 12:35:36 -080024import org.apache.commons.lang3.tuple.Pair;
25import org.onlab.util.AbstractAccumulator;
26import org.onlab.util.KryoNamespace;
27import org.onlab.util.SlidingWindowCounter;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.persistence.PersistenceService;
32import org.onosproject.store.LogicalTimestamp;
33import org.onosproject.store.Timestamp;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.MessageSubject;
36import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman12d5ec42017-05-17 00:53:44 -070037import org.onosproject.store.service.DistributedPrimitive;
Madan Jampanif4c88502016-01-21 12:35:36 -080038import org.onosproject.store.service.EventuallyConsistentMap;
39import org.onosproject.store.service.EventuallyConsistentMapEvent;
40import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman2c83a102017-08-20 17:11:41 -070041import org.onosproject.store.service.Serializer;
Madan Jampanif4c88502016-01-21 12:35:36 -080042import org.onosproject.store.service.WallClockTimestamp;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
Jordan Halterman12d5ec42017-05-17 00:53:44 -070046import java.util.ArrayList;
Jonathan Hart9dc5f092016-06-17 15:15:17 -070047import java.util.Collection;
48import java.util.Collections;
49import java.util.HashSet;
50import java.util.List;
51import java.util.Map;
52import java.util.Objects;
53import java.util.Optional;
54import java.util.Set;
55import java.util.Timer;
56import java.util.concurrent.CompletableFuture;
Jordan Halterman12d5ec42017-05-17 00:53:44 -070057import java.util.concurrent.ExecutionException;
Jonathan Hart9dc5f092016-06-17 15:15:17 -070058import java.util.concurrent.ExecutorService;
59import java.util.concurrent.Executors;
60import java.util.concurrent.ScheduledExecutorService;
61import java.util.concurrent.TimeUnit;
Jordan Halterman12d5ec42017-05-17 00:53:44 -070062import java.util.concurrent.TimeoutException;
Jonathan Hart9dc5f092016-06-17 15:15:17 -070063import java.util.concurrent.atomic.AtomicBoolean;
Jordan Halterman12d5ec42017-05-17 00:53:44 -070064import java.util.concurrent.atomic.AtomicInteger;
Jonathan Hart9dc5f092016-06-17 15:15:17 -070065import java.util.concurrent.atomic.AtomicReference;
66import java.util.function.BiFunction;
Jordan Halterman12d5ec42017-05-17 00:53:44 -070067import java.util.function.Function;
Jonathan Hart9dc5f092016-06-17 15:15:17 -070068import java.util.stream.Collectors;
69
70import static com.google.common.base.Preconditions.checkNotNull;
71import static com.google.common.base.Preconditions.checkState;
72import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
73import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
74import static org.onlab.util.Tools.groupedThreads;
75import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
76import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Jonathan Hartdb3af892015-01-26 13:19:07 -080077
78/**
79 * Distributed Map implementation which uses optimistic replication and gossip
80 * based techniques to provide an eventually consistent data store.
81 */
82public class EventuallyConsistentMapImpl<K, V>
83 implements EventuallyConsistentMap<K, V> {
84
85 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
86
Madan Jampani3d76c942015-06-29 23:37:10 -070087 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
Jonathan Hartdb3af892015-01-26 13:19:07 -080089 private final ClusterService clusterService;
90 private final ClusterCommunicationService clusterCommunicator;
Jordan Halterman2c83a102017-08-20 17:11:41 -070091 private final Serializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070092 private final NodeId localNodeId;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070093 private final PersistenceService persistenceService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094
Madan Jampanibcf1a482015-06-24 19:05:56 -070095 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080096
Jordan Halterman12d5ec42017-05-17 00:53:44 -070097 private final MessageSubject bootstrapMessageSubject;
98 private final MessageSubject initializeMessageSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800100 private final MessageSubject antiEntropyAdvertisementSubject;
Jon Halld198b882016-05-18 16:44:40 -0700101 private final MessageSubject updateRequestSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
Jonathan Hartaaa56572015-01-28 21:56:35 -0800103 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -0700104 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800105
106 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800107 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800108 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800109
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700110 private final ExecutorService communicationExecutor;
111 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800112
Madan Jampani29f52a32016-04-18 15:20:52 -0700113 private long previousTombstonePurgeTime;
114 private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
115
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700116 private final String mapName;
117
Jonathan Hartdb3af892015-01-26 13:19:07 -0800118 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800119 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800120 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800121
Jonathan Hart4f397e82015-02-04 09:10:41 -0800122 private static final String ERROR_NULL_KEY = "Key cannot be null";
123 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
124
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700125 private final long initialDelaySec = 5;
126 private final boolean lightweightAntiEntropy;
127 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800128
Jonathan Hart233a18a2015-03-02 17:24:58 -0800129 private static final int WINDOW_SIZE = 5;
Jon Halldabee682016-05-17 11:29:51 -0700130 private static final int HIGH_LOAD_THRESHOLD = 2;
Jonathan Hart233a18a2015-03-02 17:24:58 -0800131 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700132 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800133
Jonathan Hartca335e92015-03-05 10:34:32 -0800134 private final boolean persistent;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700135
Jonathan Hartdb3af892015-01-26 13:19:07 -0800136 /**
137 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800138 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700139 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
140 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800141 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800142 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 * @param mapName a String identifier for the map.
144 * @param clusterService the cluster service
145 * @param clusterCommunicator the cluster communications service
Thomas Vachuskaf5896be2016-05-19 14:30:50 -0700146 * @param ns a Kryo namespace that can serialize
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700148 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700149 * @param peerUpdateFunction function that provides a set of nodes to immediately
150 * update to when there writes to the map
151 * @param eventExecutor executor to use for processing incoming
152 * events from peers
153 * @param communicationExecutor executor to use for sending events to peers
154 * @param backgroundExecutor executor to use for background anti-entropy
155 * tasks
156 * @param tombstonesDisabled true if this map should not maintain
157 * tombstones
158 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800159 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700160 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800161 * @param persistent persist data to disk
Jian Lidfba7392016-01-22 16:46:58 -0800162 * @param persistenceService persistence service
Jonathan Hartdb3af892015-01-26 13:19:07 -0800163 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700164 EventuallyConsistentMapImpl(String mapName,
165 ClusterService clusterService,
166 ClusterCommunicationService clusterCommunicator,
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700167 KryoNamespace ns,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700168 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700169 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
170 ExecutorService eventExecutor,
171 ExecutorService communicationExecutor,
172 ScheduledExecutorService backgroundExecutor,
173 boolean tombstonesDisabled,
174 long antiEntropyPeriod,
175 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800176 boolean convergeFaster,
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700177 boolean persistent,
178 PersistenceService persistenceService) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700179 this.mapName = mapName;
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700180 this.serializer = createSerializer(ns);
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700181 this.persistenceService = persistenceService;
182 this.persistent =
183 persistent;
184 if (persistent) {
185 items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
Aaron Kruglikov37210412016-12-06 12:55:57 -0800186 .withName(mapName)
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700187 .withSerializer(this.serializer)
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700188 .build();
189 } else {
190 items = Maps.newConcurrentMap();
191 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800192 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700193 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800194
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700195 this.clusterService = clusterService;
196 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700197 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700198
Madan Jampanibcf1a482015-06-24 19:05:56 -0700199 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700200
201 if (peerUpdateFunction != null) {
202 this.peerUpdateFunction = peerUpdateFunction;
203 } else {
204 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
205 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700206 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700207 .collect(Collectors.toList());
208 }
209
210 if (eventExecutor != null) {
211 this.executor = eventExecutor;
212 } else {
213 // should be a normal executor; it's used for receiving messages
214 this.executor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700215 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700216 }
217
218 if (communicationExecutor != null) {
219 this.communicationExecutor = communicationExecutor;
220 } else {
221 // sending executor; should be capped
222 //TODO this probably doesn't need to be bounded anymore
223 this.communicationExecutor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700224 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700225 }
226
Jonathan Hartca335e92015-03-05 10:34:32 -0800227
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700228 if (backgroundExecutor != null) {
229 this.backgroundExecutor = backgroundExecutor;
230 } else {
231 this.backgroundExecutor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700232 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700233 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800234
Jonathan Hartaaa56572015-01-28 21:56:35 -0800235 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700236 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700237 initialDelaySec, antiEntropyPeriod,
238 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800239
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700240 bootstrapMessageSubject = new MessageSubject("ecm-" + mapName + "-bootstrap");
241 clusterCommunicator.addSubscriber(bootstrapMessageSubject,
242 serializer::decode,
243 (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
244 serializer::encode);
245
246 initializeMessageSubject = new MessageSubject("ecm-" + mapName + "-initialize");
247 clusterCommunicator.addSubscriber(initializeMessageSubject,
248 serializer::decode,
249 (Function<Collection<UpdateEntry<K, V>>, Void>) u -> {
250 processUpdates(u);
251 return null;
252 },
253 serializer::encode,
254 this.executor);
255
Jonathan Hartdb3af892015-01-26 13:19:07 -0800256 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
257 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700258 serializer::decode,
259 this::processUpdates,
260 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800261
Jonathan Hartaaa56572015-01-28 21:56:35 -0800262 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
263 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700264 serializer::decode,
265 this::handleAntiEntropyAdvertisement,
Madan Jampani29f52a32016-04-18 15:20:52 -0700266 serializer::encode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700267 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800268
Jon Halld198b882016-05-18 16:44:40 -0700269 updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
270 clusterCommunicator.addSubscriber(updateRequestSubject,
271 serializer::decode,
272 this::handleUpdateRequests,
273 this.backgroundExecutor);
274
Madan Jampania8f919e2016-04-18 16:47:35 -0700275 if (!tombstonesDisabled) {
276 previousTombstonePurgeTime = 0;
277 this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
278 initialDelaySec,
279 antiEntropyPeriod,
280 TimeUnit.SECONDS);
281 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700282
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700283 this.tombstonesDisabled = tombstonesDisabled;
284 this.lightweightAntiEntropy = !convergeFaster;
Jon Halldabee682016-05-17 11:29:51 -0700285
286 // Initiate first round of Gossip
287 this.bootstrap();
Madan Jampanie1356282015-03-10 19:05:36 -0700288 }
289
Jordan Halterman2c83a102017-08-20 17:11:41 -0700290 private Serializer createSerializer(KryoNamespace ns) {
291 return Serializer.using(KryoNamespace.newBuilder()
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700292 .register(ns)
293 // not so robust way to avoid collision with other
294 // user supplied registrations
295 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
296 .register(KryoNamespaces.BASIC)
297 .register(LogicalTimestamp.class)
298 .register(WallClockTimestamp.class)
299 .register(AntiEntropyAdvertisement.class)
300 .register(AntiEntropyResponse.class)
301 .register(UpdateEntry.class)
302 .register(MapValue.class)
303 .register(MapValue.Digest.class)
Jonathan Hart9dc5f092016-06-17 15:15:17 -0700304 .register(UpdateRequest.class)
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700305 .build(name() + "-ecmap"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800306 }
307
308 @Override
Madan Jampania090a112016-01-18 16:38:17 -0800309 public String name() {
310 return mapName;
311 }
312
313 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800314 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800315 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700316 // TODO: Maintain a separate counter for tracking live elements in map.
317 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800318 }
319
320 @Override
321 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800322 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700323 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800324 }
325
326 @Override
327 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800328 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800329 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700330 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800331 }
332
333 @Override
334 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800335 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800336 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700337 return items.values()
338 .stream()
339 .filter(MapValue::isAlive)
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700340 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341 }
342
343 @Override
344 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800345 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800346 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800347
Madan Jampani3d76c942015-06-29 23:37:10 -0700348 MapValue<V> value = items.get(key);
349 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800350 }
351
352 @Override
353 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800354 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800355 checkNotNull(key, ERROR_NULL_KEY);
356 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800357
Madan Jampani3d76c942015-06-29 23:37:10 -0700358 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700359 if (putInternal(key, newValue)) {
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700360 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700361 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800362 }
363 }
364
Jonathan Hartdb3af892015-01-26 13:19:07 -0800365 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700366 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800367 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800368 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700369 return removeAndNotify(key, null);
370 }
371
372 @Override
373 public void remove(K key, V value) {
374 checkState(!destroyed, destroyedMessage);
375 checkNotNull(key, ERROR_NULL_KEY);
376 checkNotNull(value, ERROR_NULL_VALUE);
377 removeAndNotify(key, value);
378 }
379
380 private V removeAndNotify(K key, V value) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700381 Timestamp timestamp = timestampProvider.apply(key, value);
382 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
383 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700384 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700385 if (previousValue != null) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700386 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
387 peerUpdateFunction.apply(key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700388 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700389 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700390 }
391 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700392 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800393 }
394
Madan Jampani483d0a22015-08-19 17:33:00 -0700395 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700396 checkState(!destroyed, destroyedMessage);
397 checkNotNull(key, ERROR_NULL_KEY);
398 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani483d0a22015-08-19 17:33:00 -0700399 tombstone.ifPresent(v -> checkState(v.isTombstone()));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700400
401 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700402 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700403 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700404 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700405 boolean valueMatches = true;
406 if (value.isPresent() && existing != null && existing.isAlive()) {
407 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700408 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700409 if (existing == null) {
Jonathan Hart4a29c592015-09-23 17:55:07 -0700410 log.trace("ECMap Remove: Existing value for key {} is already null", k);
Brian O'Connor6325dad2015-07-07 15:36:29 -0700411 }
Madan Jampani483d0a22015-08-19 17:33:00 -0700412 if (valueMatches) {
413 if (existing == null) {
414 updated.set(tombstone.isPresent());
415 } else {
416 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
417 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700418 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700419 if (updated.get()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700420 previousValue.set(existing);
421 return tombstone.orElse(null);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700422 } else {
423 return existing;
424 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700425 });
Madan Jampanid13f3b82015-07-01 17:37:50 -0700426 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800427 }
428
429 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700430 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
431 checkState(!destroyed, destroyedMessage);
432 checkNotNull(key, ERROR_NULL_KEY);
433 checkNotNull(recomputeFunction, "Recompute function cannot be null");
434
435 AtomicBoolean updated = new AtomicBoolean(false);
436 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampanidc012972016-04-25 11:13:26 -0700437 MapValue<V> computedValue = items.compute(serializer.copy(key), (k, mv) -> {
Madan Jampani4727a112015-07-16 12:12:58 -0700438 previousValue.set(mv);
439 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
Brian O'Connor05acd642016-08-03 00:42:35 -0700440 if (mv != null && Objects.equals(newRawValue, mv.get())) {
441 // value was not updated
442 return mv;
443 }
Madan Jampani4727a112015-07-16 12:12:58 -0700444 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
445 if (mv == null || newValue.isNewerThan(mv)) {
446 updated.set(true);
Madan Jampanidc012972016-04-25 11:13:26 -0700447 // We return a copy to ensure updates to peers can be serialized.
448 // This prevents replica divergence due to serialization failures.
449 return serializer.copy(newValue);
Madan Jampani4727a112015-07-16 12:12:58 -0700450 } else {
451 return mv;
452 }
453 });
454 if (updated.get()) {
455 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
456 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
457 V value = computedValue.isTombstone()
458 ? previousValue.get() == null ? null : previousValue.get().get()
459 : computedValue.get();
460 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700461 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700462 }
463 }
464 return computedValue.get();
465 }
466
467 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800468 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800469 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800470 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800471 }
472
473 @Override
474 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800475 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700476 Maps.filterValues(items, MapValue::isAlive)
477 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800478 }
479
480 @Override
481 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800482 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700483 return Maps.filterValues(items, MapValue::isAlive)
484 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800485 }
486
487 @Override
488 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800489 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700490 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800491 }
492
493 @Override
494 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800495 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700496 return Maps.filterValues(items, MapValue::isAlive)
497 .entrySet()
498 .stream()
499 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
500 .collect(Collectors.toSet());
501 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800502
Madan Jampani3d76c942015-06-29 23:37:10 -0700503 /**
504 * Returns true if newValue was accepted i.e. map is updated.
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700505 *
Madan Jampani3d76c942015-06-29 23:37:10 -0700506 * @param key key
507 * @param newValue proposed new value
508 * @return true if update happened; false if map already contains a more recent value for the key
509 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700510 private boolean putInternal(K key, MapValue<V> newValue) {
511 checkState(!destroyed, destroyedMessage);
512 checkNotNull(key, ERROR_NULL_KEY);
513 checkNotNull(newValue, ERROR_NULL_VALUE);
514 checkState(newValue.isAlive());
515 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700516 AtomicBoolean updated = new AtomicBoolean(false);
517 items.compute(key, (k, existing) -> {
518 if (existing == null || newValue.isNewerThan(existing)) {
519 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700520 return newValue;
521 }
522 return existing;
523 });
Madan Jampani3d76c942015-06-29 23:37:10 -0700524 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800525 }
526
527 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800528 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800529 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800530
531 listeners.add(checkNotNull(listener));
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700532 items.forEach((k, v) -> {
533 if (v.isAlive()) {
534 listener.event(new EventuallyConsistentMapEvent<K, V>(mapName, PUT, k, v.get()));
535 }
536 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800537 }
538
539 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800540 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800541 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800542
543 listeners.remove(checkNotNull(listener));
544 }
545
546 @Override
Madan Jampanifa242182016-01-22 13:42:54 -0800547 public CompletableFuture<Void> destroy() {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800548 destroyed = true;
549
550 executor.shutdown();
551 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800552 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800553
Jonathan Hart584d2f32015-01-27 19:46:14 -0800554 listeners.clear();
555
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700556 clusterCommunicator.removeSubscriber(bootstrapMessageSubject);
557 clusterCommunicator.removeSubscriber(initializeMessageSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800558 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jon Halld198b882016-05-18 16:44:40 -0700559 clusterCommunicator.removeSubscriber(updateRequestSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800560 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Madan Jampanifa242182016-01-22 13:42:54 -0800561 return CompletableFuture.completedFuture(null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800562 }
563
Jonathan Hartaaa56572015-01-28 21:56:35 -0800564 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700565 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800566 }
567
Madan Jampani3d76c942015-06-29 23:37:10 -0700568 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800569 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800570 }
571
Madan Jampani3d76c942015-06-29 23:37:10 -0700572 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800573 if (peers == null) {
574 // we have no friends :(
575 return;
576 }
577 peers.forEach(node ->
Jonathan Hart9a426f82015-09-03 15:43:13 +0200578 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800579 );
580 }
581
Jonathan Hart233a18a2015-03-02 17:24:58 -0800582 private boolean underHighLoad() {
583 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
584 }
585
Madan Jampani3d76c942015-06-29 23:37:10 -0700586 private void sendAdvertisement() {
587 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700588 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800589 return;
590 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700591 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
592 } catch (Exception e) {
593 // Catch all exceptions to avoid scheduled task being suppressed.
594 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800595 }
596 }
597
Madan Jampani3d76c942015-06-29 23:37:10 -0700598 private Optional<NodeId> pickRandomActivePeer() {
599 List<NodeId> activePeers = clusterService.getNodes()
600 .stream()
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700601 .map(ControllerNode::id)
602 .filter(id -> !localNodeId.equals(id))
Thomas Vachuska7a8de842016-03-07 20:56:35 -0800603 .filter(id -> clusterService.getState(id).isActive())
Madan Jampani3d76c942015-06-29 23:37:10 -0700604 .collect(Collectors.toList());
605 Collections.shuffle(activePeers);
606 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
607 }
608
609 private void sendAdvertisementToPeer(NodeId peer) {
Madan Jampaniee35d552016-04-26 16:07:40 -0700610 long adCreationTime = System.currentTimeMillis();
Madan Jampani29f52a32016-04-18 15:20:52 -0700611 AntiEntropyAdvertisement<K> ad = createAdvertisement();
612 clusterCommunicator.sendAndReceive(ad,
Madan Jampani3d76c942015-06-29 23:37:10 -0700613 antiEntropyAdvertisementSubject,
614 serializer::encode,
Madan Jampani29f52a32016-04-18 15:20:52 -0700615 serializer::decode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700616 peer)
617 .whenComplete((result, error) -> {
618 if (error != null) {
Jonathan Harte255cc42016-09-12 14:50:24 -0700619 log.debug("Failed to send anti-entropy advertisement to {}: {}",
620 peer, error.getMessage());
Madan Jampani29f52a32016-04-18 15:20:52 -0700621 } else if (result == AntiEntropyResponse.PROCESSED) {
Madan Jampaniee35d552016-04-26 16:07:40 -0700622 antiEntropyTimes.put(peer, adCreationTime);
Madan Jampani3d76c942015-06-29 23:37:10 -0700623 }
624 });
625 }
626
Jon Halld198b882016-05-18 16:44:40 -0700627 private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
628 UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
629 clusterCommunicator.unicast(request,
630 updateRequestSubject,
631 serializer::encode,
632 peer)
633 .whenComplete((result, error) -> {
634 if (error != null) {
Jonathan Harte255cc42016-09-12 14:50:24 -0700635 log.debug("Failed to send update request to {}: {}",
636 peer, error.getMessage());
Jon Halld198b882016-05-18 16:44:40 -0700637 }
638 });
639 }
640
Jonathan Hartaaa56572015-01-28 21:56:35 -0800641 private AntiEntropyAdvertisement<K> createAdvertisement() {
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700642 return new AntiEntropyAdvertisement<>(localNodeId,
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700643 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800644 }
645
Madan Jampani29f52a32016-04-18 15:20:52 -0700646 private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700647 if (destroyed || underHighLoad()) {
Madan Jampani29f52a32016-04-18 15:20:52 -0700648 return AntiEntropyResponse.IGNORED;
Madan Jampani3d76c942015-06-29 23:37:10 -0700649 }
650 try {
Jonathan Hart9a426f82015-09-03 15:43:13 +0200651 if (log.isTraceEnabled()) {
652 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
Jon Halld198b882016-05-18 16:44:40 -0700653 ad.sender(), mapName, ad.digest().size());
Jonathan Hart9a426f82015-09-03 15:43:13 +0200654 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700655 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Madan Jampani3d76c942015-06-29 23:37:10 -0700656 } catch (Exception e) {
657 log.warn("Error handling anti-entropy advertisement", e);
Madan Jampani29f52a32016-04-18 15:20:52 -0700658 return AntiEntropyResponse.FAILED;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800659 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700660 return AntiEntropyResponse.PROCESSED;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800661 }
662
663 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700664 * Processes anti-entropy ad from peer by taking following actions:
665 * 1. If peer has an old entry, updates peer.
666 * 2. If peer indicates an entry is removed and has a more recent
667 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800668 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800669 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
670 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700671 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800672 final NodeId sender = ad.sender();
Jon Halld198b882016-05-18 16:44:40 -0700673 final List<NodeId> peers = ImmutableList.of(sender);
674 Set<K> staleOrMissing = new HashSet<>();
675 Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());
676
Madan Jampani3d76c942015-06-29 23:37:10 -0700677 items.forEach((key, localValue) -> {
Jon Halld198b882016-05-18 16:44:40 -0700678 locallyUnknown.remove(key);
Madan Jampani3d76c942015-06-29 23:37:10 -0700679 MapValue.Digest remoteValueDigest = ad.digest().get(key);
680 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800681 // local value is more recent, push to sender
Jon Halld198b882016-05-18 16:44:40 -0700682 queueUpdate(new UpdateEntry<>(key, localValue), peers);
683 } else if (remoteValueDigest != null
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700684 && remoteValueDigest.isNewerThan(localValue.digest())
685 && remoteValueDigest.isTombstone()) {
Jon Halld198b882016-05-18 16:44:40 -0700686 // remote value is more recent and a tombstone: update local value
Madan Jampani483d0a22015-08-19 17:33:00 -0700687 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
Madan Jampanid13f3b82015-07-01 17:37:50 -0700688 MapValue<V> previousValue = removeInternal(key,
689 Optional.empty(),
Madan Jampani483d0a22015-08-19 17:33:00 -0700690 Optional.of(tombstone));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700691 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700692 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800693 }
Jon Halld198b882016-05-18 16:44:40 -0700694 } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
695 // Not a tombstone and remote is newer
696 staleOrMissing.add(key);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800697 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700698 });
Jon Halld198b882016-05-18 16:44:40 -0700699 // Keys missing in local map
700 staleOrMissing.addAll(locallyUnknown);
701 // Request updates that we missed out on
702 sendUpdateRequestToPeer(sender, staleOrMissing);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800703 return externalEvents;
704 }
705
Jon Halld198b882016-05-18 16:44:40 -0700706 private void handleUpdateRequests(UpdateRequest<K> request) {
707 final Set<K> keys = request.keys();
708 final NodeId sender = request.sender();
709 final List<NodeId> peers = ImmutableList.of(sender);
710
711 keys.forEach(key ->
712 queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
713 );
714 }
715
Madan Jampani29f52a32016-04-18 15:20:52 -0700716 private void purgeTombstones() {
717 /*
Jon Halld198b882016-05-18 16:44:40 -0700718 * In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
Madan Jampani29f52a32016-04-18 15:20:52 -0700719 * of tombstones we employ the following heuristic to purge old tombstones periodically.
720 * First, we keep track of the time (local system time) when we were able to have a successful
721 * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
722 * as the time before which all tombstones are considered safe to purge.
723 */
Madan Jampania8f919e2016-04-18 16:47:35 -0700724 long currentSafeTombstonePurgeTime = clusterService.getNodes()
725 .stream()
726 .map(ControllerNode::id)
727 .filter(id -> !id.equals(localNodeId))
728 .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
729 .reduce(Math::min)
730 .orElse(0L);
Madan Jampani29f52a32016-04-18 15:20:52 -0700731 if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
732 return;
733 }
734 List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
735 .stream()
736 .filter(e -> e.getValue().isTombstone())
737 .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
738 .collect(Collectors.toList());
739 previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
740 tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
741 }
742
Madan Jampani3d76c942015-06-29 23:37:10 -0700743 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
744 if (destroyed) {
745 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800746 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700747 updates.forEach(update -> {
748 final K key = update.key();
Madan Jampani29f52a32016-04-18 15:20:52 -0700749 final MapValue<V> value = update.value() == null ? null : update.value().copy();
Madan Jampani483d0a22015-08-19 17:33:00 -0700750 if (value == null || value.isTombstone()) {
751 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700752 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700753 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700754 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700755 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700756 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800757 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700758 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800759 }
760
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700761 /**
762 * Bootstraps the map to attempt to get in sync with existing instances of the same map on other nodes in the
763 * cluster. This is necessary to ensure that a read immediately after the map is created doesn't return a null
764 * value.
765 */
Jon Halldabee682016-05-17 11:29:51 -0700766 private void bootstrap() {
Jon Halldabee682016-05-17 11:29:51 -0700767 List<NodeId> activePeers = clusterService.getNodes()
768 .stream()
769 .map(ControllerNode::id)
770 .filter(id -> !localNodeId.equals(id))
771 .filter(id -> clusterService.getState(id).isActive())
772 .collect(Collectors.toList());
773
774 if (activePeers.isEmpty()) {
775 return;
776 }
777
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700778 try {
779 requestBootstrapFromPeers(activePeers)
Jordan Halterman6440b092017-05-24 17:48:08 -0700780 .get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700781 } catch (ExecutionException e) {
782 log.debug("Failed to bootstrap ec map {}: {}", mapName, e.getCause());
783 } catch (InterruptedException | TimeoutException e) {
784 log.warn("Failed to bootstrap ec map {}: {}", mapName, e);
785 }
786 }
787
788 /**
789 * Requests all updates from each peer in the provided list of peers.
790 * <p>
791 * The returned future will be completed once at least one peer bootstraps this map or bootstrap requests to all
792 * peers fail.
793 *
794 * @param peers the list of peers from which to request updates
795 * @return a future to be completed once updates have been received from at least one peer
796 */
797 private CompletableFuture<Void> requestBootstrapFromPeers(List<NodeId> peers) {
798 if (peers.isEmpty()) {
799 return CompletableFuture.completedFuture(null);
Jon Halldabee682016-05-17 11:29:51 -0700800 }
801
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700802 CompletableFuture<Void> future = new CompletableFuture<>();
803
804 final int totalPeers = peers.size();
805
806 AtomicBoolean successful = new AtomicBoolean();
807 AtomicInteger totalCount = new AtomicInteger();
808 AtomicReference<Throwable> lastError = new AtomicReference<>();
809
810 // Iterate through all of the peers and send a bootstrap request. On the first peer that returns
811 // a successful bootstrap response, complete the future. Otherwise, if no peers respond with any
812 // successful bootstrap response, the future will be completed with the last exception.
813 for (NodeId peer : peers) {
814 requestBootstrapFromPeer(peer).whenComplete((result, error) -> {
815 if (error == null) {
816 if (successful.compareAndSet(false, true)) {
817 future.complete(null);
818 } else if (totalCount.incrementAndGet() == totalPeers) {
819 Throwable e = lastError.get();
820 if (e != null) {
821 future.completeExceptionally(e);
822 }
823 }
824 } else {
825 if (!successful.get() && totalCount.incrementAndGet() == totalPeers) {
826 future.completeExceptionally(error);
827 } else {
828 lastError.set(error);
829 }
830 }
831 });
Jon Halldabee682016-05-17 11:29:51 -0700832 }
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700833 return future;
834 }
835
836 /**
837 * Requests a bootstrap from the given peer.
838 *
839 * @param peer the peer from which to request updates
840 * @return a future to be completed once the peer has sent bootstrap updates
841 */
842 private CompletableFuture<Void> requestBootstrapFromPeer(NodeId peer) {
843 log.trace("Sending bootstrap request to {}", peer);
844 return clusterCommunicator.<NodeId, Void>sendAndReceive(
845 localNodeId,
846 bootstrapMessageSubject,
847 serializer::encode,
848 serializer::decode,
849 peer)
850 .whenComplete((updates, error) -> {
851 if (error != null) {
852 log.debug("Bootstrap request to {} failed: {}", peer, error.getMessage());
853 }
854 });
855 }
856
857 /**
858 * Handles a bootstrap request from a peer.
859 * <p>
860 * When handling a bootstrap request from a peer, the node sends batches of entries back to the peer and
861 * completes the bootstrap request once all batches have been received and processed.
862 *
863 * @param peer the peer that sent the bootstrap request
864 * @return a future to be completed once updates have been sent to the peer
865 */
866 private CompletableFuture<Void> handleBootstrap(NodeId peer) {
867 log.trace("Received bootstrap request from {}", peer);
868
869 Function<List<UpdateEntry<K, V>>, CompletableFuture<Void>> sendUpdates = updates -> {
870 log.trace("Initializing {} with {} entries", peer, updates.size());
871 return clusterCommunicator.<List<UpdateEntry<K, V>>, Void>sendAndReceive(
872 ImmutableList.copyOf(updates),
873 initializeMessageSubject,
874 serializer::encode,
875 serializer::decode,
876 peer)
877 .whenComplete((result, error) -> {
878 if (error != null) {
879 log.debug("Failed to initialize {}", peer, error);
880 }
881 });
882 };
883
884 List<CompletableFuture<Void>> futures = Lists.newArrayList();
885 List<UpdateEntry<K, V>> updates = Lists.newArrayList();
886 for (Map.Entry<K, MapValue<V>> entry : items.entrySet()) {
887 K key = entry.getKey();
888 MapValue<V> value = entry.getValue();
889 if (value.isAlive()) {
890 updates.add(new UpdateEntry<K, V>(key, value));
891 if (updates.size() == DEFAULT_MAX_EVENTS) {
892 futures.add(sendUpdates.apply(updates));
893 updates = new ArrayList<>();
894 }
895 }
896 }
897
898 if (!updates.isEmpty()) {
899 futures.add(sendUpdates.apply(updates));
900 }
901 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
Jon Halldabee682016-05-17 11:29:51 -0700902 }
903
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800904 // TODO pull this into the class if this gets pulled out...
905 private static final int DEFAULT_MAX_EVENTS = 1000;
906 private static final int DEFAULT_MAX_IDLE_MS = 10;
907 private static final int DEFAULT_MAX_BATCH_MS = 50;
908 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800909
Madan Jampani3d76c942015-06-29 23:37:10 -0700910 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800911
912 private final NodeId peer;
913
914 private EventAccumulator(NodeId peer) {
915 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
916 this.peer = peer;
917 }
918
919 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700920 public void processItems(List<UpdateEntry<K, V>> items) {
921 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
922 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700923 item.isNewerThan(existing) ? item : existing));
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700924 communicationExecutor.execute(() -> {
Jordan Haltermane0fae912017-05-18 12:52:51 -0700925 try {
926 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
927 updateMessageSubject,
928 serializer::encode,
929 peer)
930 .whenComplete((result, error) -> {
931 if (error != null) {
932 log.debug("Failed to send to {}", peer, error);
933 }
934 });
935 } catch (Exception e) {
936 log.warn("Failed to send to {}", peer, e);
937 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800938 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800939 }
940 }
Jonathan Hart4a29c592015-09-23 17:55:07 -0700941}