blob: 79641876a20cbdda574154e6cf441edabfdb503f [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jonathan Hartdb3af892015-01-26 13:19:07 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hart9dc5f092016-06-17 15:15:17 -070018import com.google.common.collect.Collections2;
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.ImmutableMap;
21import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
23import com.google.common.collect.Sets;
Madan Jampanif4c88502016-01-21 12:35:36 -080024import org.apache.commons.lang3.tuple.Pair;
25import org.onlab.util.AbstractAccumulator;
26import org.onlab.util.KryoNamespace;
27import org.onlab.util.SlidingWindowCounter;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.persistence.PersistenceService;
32import org.onosproject.store.LogicalTimestamp;
33import org.onosproject.store.Timestamp;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.MessageSubject;
36import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070037import org.onosproject.store.serializers.StoreSerializer;
Madan Jampanif4c88502016-01-21 12:35:36 -080038import org.onosproject.store.service.EventuallyConsistentMap;
39import org.onosproject.store.service.EventuallyConsistentMapEvent;
40import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanif4c88502016-01-21 12:35:36 -080041import org.onosproject.store.service.WallClockTimestamp;
42import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
Jonathan Hart9dc5f092016-06-17 15:15:17 -070045import java.util.Collection;
46import java.util.Collections;
47import java.util.HashSet;
48import java.util.List;
49import java.util.Map;
50import java.util.Objects;
51import java.util.Optional;
52import java.util.Set;
53import java.util.Timer;
54import java.util.concurrent.CompletableFuture;
55import java.util.concurrent.ExecutorService;
56import java.util.concurrent.Executors;
57import java.util.concurrent.ScheduledExecutorService;
58import java.util.concurrent.TimeUnit;
59import java.util.concurrent.atomic.AtomicBoolean;
60import java.util.concurrent.atomic.AtomicReference;
61import java.util.function.BiFunction;
62import java.util.stream.Collectors;
63
64import static com.google.common.base.Preconditions.checkNotNull;
65import static com.google.common.base.Preconditions.checkState;
66import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
67import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
68import static org.onlab.util.Tools.groupedThreads;
69import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
70import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Jonathan Hartdb3af892015-01-26 13:19:07 -080071
72/**
73 * Distributed Map implementation which uses optimistic replication and gossip
74 * based techniques to provide an eventually consistent data store.
75 */
76public class EventuallyConsistentMapImpl<K, V>
77 implements EventuallyConsistentMap<K, V> {
78
79 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
80
Madan Jampani3d76c942015-06-29 23:37:10 -070081 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080082
Jonathan Hartdb3af892015-01-26 13:19:07 -080083 private final ClusterService clusterService;
84 private final ClusterCommunicationService clusterCommunicator;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070085 private final StoreSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070086 private final NodeId localNodeId;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070087 private final PersistenceService persistenceService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
Madan Jampanibcf1a482015-06-24 19:05:56 -070089 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080090
91 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080092 private final MessageSubject antiEntropyAdvertisementSubject;
Jon Halld198b882016-05-18 16:44:40 -070093 private final MessageSubject updateRequestSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080094
Jonathan Hartaaa56572015-01-28 21:56:35 -080095 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070096 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080097
98 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -080099 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800100 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800101
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700102 private final ExecutorService communicationExecutor;
103 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800104
Madan Jampani29f52a32016-04-18 15:20:52 -0700105 private long previousTombstonePurgeTime;
106 private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
107
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700108 private final String mapName;
109
Jonathan Hartdb3af892015-01-26 13:19:07 -0800110 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800111 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800112 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800113
Jonathan Hart4f397e82015-02-04 09:10:41 -0800114 private static final String ERROR_NULL_KEY = "Key cannot be null";
115 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
116
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700117 private final long initialDelaySec = 5;
118 private final boolean lightweightAntiEntropy;
119 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800120
Jonathan Hart233a18a2015-03-02 17:24:58 -0800121 private static final int WINDOW_SIZE = 5;
Jon Halldabee682016-05-17 11:29:51 -0700122 private static final int HIGH_LOAD_THRESHOLD = 2;
Jonathan Hart233a18a2015-03-02 17:24:58 -0800123 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700124 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800125
Jonathan Hartca335e92015-03-05 10:34:32 -0800126 private final boolean persistent;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700127
Jonathan Hartdb3af892015-01-26 13:19:07 -0800128 /**
129 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800130 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700131 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
132 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800133 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800134 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700135 * @param mapName a String identifier for the map.
136 * @param clusterService the cluster service
137 * @param clusterCommunicator the cluster communications service
Thomas Vachuskaf5896be2016-05-19 14:30:50 -0700138 * @param ns a Kryo namespace that can serialize
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700139 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700140 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700141 * @param peerUpdateFunction function that provides a set of nodes to immediately
142 * update to when there writes to the map
143 * @param eventExecutor executor to use for processing incoming
144 * events from peers
145 * @param communicationExecutor executor to use for sending events to peers
146 * @param backgroundExecutor executor to use for background anti-entropy
147 * tasks
148 * @param tombstonesDisabled true if this map should not maintain
149 * tombstones
150 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800151 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700152 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800153 * @param persistent persist data to disk
Jian Lidfba7392016-01-22 16:46:58 -0800154 * @param persistenceService persistence service
Jonathan Hartdb3af892015-01-26 13:19:07 -0800155 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700156 EventuallyConsistentMapImpl(String mapName,
157 ClusterService clusterService,
158 ClusterCommunicationService clusterCommunicator,
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700159 KryoNamespace ns,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700160 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700161 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
162 ExecutorService eventExecutor,
163 ExecutorService communicationExecutor,
164 ScheduledExecutorService backgroundExecutor,
165 boolean tombstonesDisabled,
166 long antiEntropyPeriod,
167 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800168 boolean convergeFaster,
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700169 boolean persistent,
170 PersistenceService persistenceService) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700171 this.mapName = mapName;
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700172 this.serializer = createSerializer(ns);
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700173 this.persistenceService = persistenceService;
174 this.persistent =
175 persistent;
176 if (persistent) {
177 items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
Aaron Kruglikov37210412016-12-06 12:55:57 -0800178 .withName(mapName)
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700179 .withSerializer(this.serializer)
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700180 .build();
181 } else {
182 items = Maps.newConcurrentMap();
183 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800184 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700185 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800186
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700187 this.clusterService = clusterService;
188 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700189 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700190
Madan Jampanibcf1a482015-06-24 19:05:56 -0700191 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700192
193 if (peerUpdateFunction != null) {
194 this.peerUpdateFunction = peerUpdateFunction;
195 } else {
196 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
197 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700198 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700199 .collect(Collectors.toList());
200 }
201
202 if (eventExecutor != null) {
203 this.executor = eventExecutor;
204 } else {
205 // should be a normal executor; it's used for receiving messages
206 this.executor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700207 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700208 }
209
210 if (communicationExecutor != null) {
211 this.communicationExecutor = communicationExecutor;
212 } else {
213 // sending executor; should be capped
214 //TODO this probably doesn't need to be bounded anymore
215 this.communicationExecutor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700216 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700217 }
218
Jonathan Hartca335e92015-03-05 10:34:32 -0800219
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700220 if (backgroundExecutor != null) {
221 this.backgroundExecutor = backgroundExecutor;
222 } else {
223 this.backgroundExecutor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700224 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700225 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800226
Jonathan Hartaaa56572015-01-28 21:56:35 -0800227 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700228 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700229 initialDelaySec, antiEntropyPeriod,
230 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800231
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
233 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700234 serializer::decode,
235 this::processUpdates,
236 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800237
Jonathan Hartaaa56572015-01-28 21:56:35 -0800238 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
239 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700240 serializer::decode,
241 this::handleAntiEntropyAdvertisement,
Madan Jampani29f52a32016-04-18 15:20:52 -0700242 serializer::encode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700243 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800244
Jon Halld198b882016-05-18 16:44:40 -0700245 updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
246 clusterCommunicator.addSubscriber(updateRequestSubject,
247 serializer::decode,
248 this::handleUpdateRequests,
249 this.backgroundExecutor);
250
Madan Jampania8f919e2016-04-18 16:47:35 -0700251 if (!tombstonesDisabled) {
252 previousTombstonePurgeTime = 0;
253 this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
254 initialDelaySec,
255 antiEntropyPeriod,
256 TimeUnit.SECONDS);
257 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700258
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700259 this.tombstonesDisabled = tombstonesDisabled;
260 this.lightweightAntiEntropy = !convergeFaster;
Jon Halldabee682016-05-17 11:29:51 -0700261
262 // Initiate first round of Gossip
263 this.bootstrap();
Madan Jampanie1356282015-03-10 19:05:36 -0700264 }
265
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700266 private StoreSerializer createSerializer(KryoNamespace ns) {
267 return StoreSerializer.using(KryoNamespace.newBuilder()
268 .register(ns)
269 // not so robust way to avoid collision with other
270 // user supplied registrations
271 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
272 .register(KryoNamespaces.BASIC)
273 .register(LogicalTimestamp.class)
274 .register(WallClockTimestamp.class)
275 .register(AntiEntropyAdvertisement.class)
276 .register(AntiEntropyResponse.class)
277 .register(UpdateEntry.class)
278 .register(MapValue.class)
279 .register(MapValue.Digest.class)
Jonathan Hart9dc5f092016-06-17 15:15:17 -0700280 .register(UpdateRequest.class)
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700281 .build(name() + "-ecmap"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800282 }
283
284 @Override
Madan Jampania090a112016-01-18 16:38:17 -0800285 public String name() {
286 return mapName;
287 }
288
289 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800290 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800291 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700292 // TODO: Maintain a separate counter for tracking live elements in map.
293 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800294 }
295
296 @Override
297 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800298 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700299 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800300 }
301
302 @Override
303 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800304 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800305 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700306 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307 }
308
309 @Override
310 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800311 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800312 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700313 return items.values()
314 .stream()
315 .filter(MapValue::isAlive)
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700316 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800317 }
318
319 @Override
320 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800321 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800322 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800323
Madan Jampani3d76c942015-06-29 23:37:10 -0700324 MapValue<V> value = items.get(key);
325 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800326 }
327
328 @Override
329 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800330 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800331 checkNotNull(key, ERROR_NULL_KEY);
332 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800333
Madan Jampani3d76c942015-06-29 23:37:10 -0700334 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700335 if (putInternal(key, newValue)) {
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700336 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700337 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800338 }
339 }
340
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700342 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800343 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800344 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700345 return removeAndNotify(key, null);
346 }
347
348 @Override
349 public void remove(K key, V value) {
350 checkState(!destroyed, destroyedMessage);
351 checkNotNull(key, ERROR_NULL_KEY);
352 checkNotNull(value, ERROR_NULL_VALUE);
353 removeAndNotify(key, value);
354 }
355
356 private V removeAndNotify(K key, V value) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700357 Timestamp timestamp = timestampProvider.apply(key, value);
358 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
359 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700360 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700361 if (previousValue != null) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700362 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
363 peerUpdateFunction.apply(key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700364 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700365 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700366 }
367 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700368 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800369 }
370
Madan Jampani483d0a22015-08-19 17:33:00 -0700371 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700372 checkState(!destroyed, destroyedMessage);
373 checkNotNull(key, ERROR_NULL_KEY);
374 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani483d0a22015-08-19 17:33:00 -0700375 tombstone.ifPresent(v -> checkState(v.isTombstone()));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700376
377 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700378 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700379 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700380 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700381 boolean valueMatches = true;
382 if (value.isPresent() && existing != null && existing.isAlive()) {
383 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700384 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700385 if (existing == null) {
Jonathan Hart4a29c592015-09-23 17:55:07 -0700386 log.trace("ECMap Remove: Existing value for key {} is already null", k);
Brian O'Connor6325dad2015-07-07 15:36:29 -0700387 }
Madan Jampani483d0a22015-08-19 17:33:00 -0700388 if (valueMatches) {
389 if (existing == null) {
390 updated.set(tombstone.isPresent());
391 } else {
392 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
393 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700394 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700395 if (updated.get()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700396 previousValue.set(existing);
397 return tombstone.orElse(null);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700398 } else {
399 return existing;
400 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700401 });
Madan Jampanid13f3b82015-07-01 17:37:50 -0700402 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800403 }
404
405 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700406 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
407 checkState(!destroyed, destroyedMessage);
408 checkNotNull(key, ERROR_NULL_KEY);
409 checkNotNull(recomputeFunction, "Recompute function cannot be null");
410
411 AtomicBoolean updated = new AtomicBoolean(false);
412 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampanidc012972016-04-25 11:13:26 -0700413 MapValue<V> computedValue = items.compute(serializer.copy(key), (k, mv) -> {
Madan Jampani4727a112015-07-16 12:12:58 -0700414 previousValue.set(mv);
415 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
Brian O'Connor05acd642016-08-03 00:42:35 -0700416 if (mv != null && Objects.equals(newRawValue, mv.get())) {
417 // value was not updated
418 return mv;
419 }
Madan Jampani4727a112015-07-16 12:12:58 -0700420 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
421 if (mv == null || newValue.isNewerThan(mv)) {
422 updated.set(true);
Madan Jampanidc012972016-04-25 11:13:26 -0700423 // We return a copy to ensure updates to peers can be serialized.
424 // This prevents replica divergence due to serialization failures.
425 return serializer.copy(newValue);
Madan Jampani4727a112015-07-16 12:12:58 -0700426 } else {
427 return mv;
428 }
429 });
430 if (updated.get()) {
431 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
432 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
433 V value = computedValue.isTombstone()
434 ? previousValue.get() == null ? null : previousValue.get().get()
435 : computedValue.get();
436 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700437 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700438 }
439 }
440 return computedValue.get();
441 }
442
443 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800444 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800445 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800446 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800447 }
448
449 @Override
450 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800451 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700452 Maps.filterValues(items, MapValue::isAlive)
453 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800454 }
455
456 @Override
457 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800458 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700459 return Maps.filterValues(items, MapValue::isAlive)
460 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800461 }
462
463 @Override
464 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800465 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700466 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800467 }
468
469 @Override
470 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800471 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700472 return Maps.filterValues(items, MapValue::isAlive)
473 .entrySet()
474 .stream()
475 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
476 .collect(Collectors.toSet());
477 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800478
Madan Jampani3d76c942015-06-29 23:37:10 -0700479 /**
480 * Returns true if newValue was accepted i.e. map is updated.
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700481 *
Madan Jampani3d76c942015-06-29 23:37:10 -0700482 * @param key key
483 * @param newValue proposed new value
484 * @return true if update happened; false if map already contains a more recent value for the key
485 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700486 private boolean putInternal(K key, MapValue<V> newValue) {
487 checkState(!destroyed, destroyedMessage);
488 checkNotNull(key, ERROR_NULL_KEY);
489 checkNotNull(newValue, ERROR_NULL_VALUE);
490 checkState(newValue.isAlive());
491 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700492 AtomicBoolean updated = new AtomicBoolean(false);
493 items.compute(key, (k, existing) -> {
494 if (existing == null || newValue.isNewerThan(existing)) {
495 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700496 return newValue;
497 }
498 return existing;
499 });
Madan Jampani3d76c942015-06-29 23:37:10 -0700500 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800501 }
502
503 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800504 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800505 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800506
507 listeners.add(checkNotNull(listener));
508 }
509
510 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800511 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800512 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800513
514 listeners.remove(checkNotNull(listener));
515 }
516
517 @Override
Madan Jampanifa242182016-01-22 13:42:54 -0800518 public CompletableFuture<Void> destroy() {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800519 destroyed = true;
520
521 executor.shutdown();
522 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800523 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800524
Jonathan Hart584d2f32015-01-27 19:46:14 -0800525 listeners.clear();
526
Jonathan Hartdb3af892015-01-26 13:19:07 -0800527 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jon Halld198b882016-05-18 16:44:40 -0700528 clusterCommunicator.removeSubscriber(updateRequestSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800529 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Madan Jampanifa242182016-01-22 13:42:54 -0800530 return CompletableFuture.completedFuture(null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800531 }
532
Jonathan Hartaaa56572015-01-28 21:56:35 -0800533 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700534 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800535 }
536
Madan Jampani3d76c942015-06-29 23:37:10 -0700537 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800538 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800539 }
540
Madan Jampani3d76c942015-06-29 23:37:10 -0700541 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800542 if (peers == null) {
543 // we have no friends :(
544 return;
545 }
546 peers.forEach(node ->
Jonathan Hart9a426f82015-09-03 15:43:13 +0200547 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800548 );
549 }
550
Jonathan Hart233a18a2015-03-02 17:24:58 -0800551 private boolean underHighLoad() {
552 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
553 }
554
Madan Jampani3d76c942015-06-29 23:37:10 -0700555 private void sendAdvertisement() {
556 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700557 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800558 return;
559 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700560 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
561 } catch (Exception e) {
562 // Catch all exceptions to avoid scheduled task being suppressed.
563 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800564 }
565 }
566
Madan Jampani3d76c942015-06-29 23:37:10 -0700567 private Optional<NodeId> pickRandomActivePeer() {
568 List<NodeId> activePeers = clusterService.getNodes()
569 .stream()
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700570 .map(ControllerNode::id)
571 .filter(id -> !localNodeId.equals(id))
Thomas Vachuska7a8de842016-03-07 20:56:35 -0800572 .filter(id -> clusterService.getState(id).isActive())
Madan Jampani3d76c942015-06-29 23:37:10 -0700573 .collect(Collectors.toList());
574 Collections.shuffle(activePeers);
575 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
576 }
577
578 private void sendAdvertisementToPeer(NodeId peer) {
Madan Jampaniee35d552016-04-26 16:07:40 -0700579 long adCreationTime = System.currentTimeMillis();
Madan Jampani29f52a32016-04-18 15:20:52 -0700580 AntiEntropyAdvertisement<K> ad = createAdvertisement();
581 clusterCommunicator.sendAndReceive(ad,
Madan Jampani3d76c942015-06-29 23:37:10 -0700582 antiEntropyAdvertisementSubject,
583 serializer::encode,
Madan Jampani29f52a32016-04-18 15:20:52 -0700584 serializer::decode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700585 peer)
586 .whenComplete((result, error) -> {
587 if (error != null) {
Jonathan Harte255cc42016-09-12 14:50:24 -0700588 log.debug("Failed to send anti-entropy advertisement to {}: {}",
589 peer, error.getMessage());
Madan Jampani29f52a32016-04-18 15:20:52 -0700590 } else if (result == AntiEntropyResponse.PROCESSED) {
Madan Jampaniee35d552016-04-26 16:07:40 -0700591 antiEntropyTimes.put(peer, adCreationTime);
Madan Jampani3d76c942015-06-29 23:37:10 -0700592 }
593 });
594 }
595
Jon Halld198b882016-05-18 16:44:40 -0700596 private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
597 UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
598 clusterCommunicator.unicast(request,
599 updateRequestSubject,
600 serializer::encode,
601 peer)
602 .whenComplete((result, error) -> {
603 if (error != null) {
Jonathan Harte255cc42016-09-12 14:50:24 -0700604 log.debug("Failed to send update request to {}: {}",
605 peer, error.getMessage());
Jon Halld198b882016-05-18 16:44:40 -0700606 }
607 });
608 }
609
Jonathan Hartaaa56572015-01-28 21:56:35 -0800610 private AntiEntropyAdvertisement<K> createAdvertisement() {
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700611 return new AntiEntropyAdvertisement<>(localNodeId,
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700612 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800613 }
614
Madan Jampani29f52a32016-04-18 15:20:52 -0700615 private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700616 if (destroyed || underHighLoad()) {
Madan Jampani29f52a32016-04-18 15:20:52 -0700617 return AntiEntropyResponse.IGNORED;
Madan Jampani3d76c942015-06-29 23:37:10 -0700618 }
619 try {
Jonathan Hart9a426f82015-09-03 15:43:13 +0200620 if (log.isTraceEnabled()) {
621 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
Jon Halld198b882016-05-18 16:44:40 -0700622 ad.sender(), mapName, ad.digest().size());
Jonathan Hart9a426f82015-09-03 15:43:13 +0200623 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700624 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Madan Jampani3d76c942015-06-29 23:37:10 -0700625 } catch (Exception e) {
626 log.warn("Error handling anti-entropy advertisement", e);
Madan Jampani29f52a32016-04-18 15:20:52 -0700627 return AntiEntropyResponse.FAILED;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800628 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700629 return AntiEntropyResponse.PROCESSED;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800630 }
631
632 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700633 * Processes anti-entropy ad from peer by taking following actions:
634 * 1. If peer has an old entry, updates peer.
635 * 2. If peer indicates an entry is removed and has a more recent
636 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800637 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800638 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
639 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700640 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800641 final NodeId sender = ad.sender();
Jon Halld198b882016-05-18 16:44:40 -0700642 final List<NodeId> peers = ImmutableList.of(sender);
643 Set<K> staleOrMissing = new HashSet<>();
644 Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());
645
Madan Jampani3d76c942015-06-29 23:37:10 -0700646 items.forEach((key, localValue) -> {
Jon Halld198b882016-05-18 16:44:40 -0700647 locallyUnknown.remove(key);
Madan Jampani3d76c942015-06-29 23:37:10 -0700648 MapValue.Digest remoteValueDigest = ad.digest().get(key);
649 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800650 // local value is more recent, push to sender
Jon Halld198b882016-05-18 16:44:40 -0700651 queueUpdate(new UpdateEntry<>(key, localValue), peers);
652 } else if (remoteValueDigest != null
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700653 && remoteValueDigest.isNewerThan(localValue.digest())
654 && remoteValueDigest.isTombstone()) {
Jon Halld198b882016-05-18 16:44:40 -0700655 // remote value is more recent and a tombstone: update local value
Madan Jampani483d0a22015-08-19 17:33:00 -0700656 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
Madan Jampanid13f3b82015-07-01 17:37:50 -0700657 MapValue<V> previousValue = removeInternal(key,
658 Optional.empty(),
Madan Jampani483d0a22015-08-19 17:33:00 -0700659 Optional.of(tombstone));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700660 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700661 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800662 }
Jon Halld198b882016-05-18 16:44:40 -0700663 } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
664 // Not a tombstone and remote is newer
665 staleOrMissing.add(key);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800666 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700667 });
Jon Halld198b882016-05-18 16:44:40 -0700668 // Keys missing in local map
669 staleOrMissing.addAll(locallyUnknown);
670 // Request updates that we missed out on
671 sendUpdateRequestToPeer(sender, staleOrMissing);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800672 return externalEvents;
673 }
674
Jon Halld198b882016-05-18 16:44:40 -0700675 private void handleUpdateRequests(UpdateRequest<K> request) {
676 final Set<K> keys = request.keys();
677 final NodeId sender = request.sender();
678 final List<NodeId> peers = ImmutableList.of(sender);
679
680 keys.forEach(key ->
681 queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
682 );
683 }
684
Madan Jampani29f52a32016-04-18 15:20:52 -0700685 private void purgeTombstones() {
686 /*
Jon Halld198b882016-05-18 16:44:40 -0700687 * In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
Madan Jampani29f52a32016-04-18 15:20:52 -0700688 * of tombstones we employ the following heuristic to purge old tombstones periodically.
689 * First, we keep track of the time (local system time) when we were able to have a successful
690 * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
691 * as the time before which all tombstones are considered safe to purge.
692 */
Madan Jampania8f919e2016-04-18 16:47:35 -0700693 long currentSafeTombstonePurgeTime = clusterService.getNodes()
694 .stream()
695 .map(ControllerNode::id)
696 .filter(id -> !id.equals(localNodeId))
697 .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
698 .reduce(Math::min)
699 .orElse(0L);
Madan Jampani29f52a32016-04-18 15:20:52 -0700700 if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
701 return;
702 }
703 List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
704 .stream()
705 .filter(e -> e.getValue().isTombstone())
706 .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
707 .collect(Collectors.toList());
708 previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
709 tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
710 }
711
Madan Jampani3d76c942015-06-29 23:37:10 -0700712 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
713 if (destroyed) {
714 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800715 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700716 updates.forEach(update -> {
717 final K key = update.key();
Madan Jampani29f52a32016-04-18 15:20:52 -0700718 final MapValue<V> value = update.value() == null ? null : update.value().copy();
Madan Jampani483d0a22015-08-19 17:33:00 -0700719 if (value == null || value.isTombstone()) {
720 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700721 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700722 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700723 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700724 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700725 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800726 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700727 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800728 }
729
Jon Halldabee682016-05-17 11:29:51 -0700730 private void bootstrap() {
731 /*
732 * Attempt to get in sync with the cluster when a map is created. This is to help avoid a new node
733 * writing to an ECM until it has a view of the map. Depending on how lightweight the map instance
734 * is, this will attempt to advertise to all or some of the peers.
735 */
736 int n = 0;
737 List<NodeId> activePeers = clusterService.getNodes()
738 .stream()
739 .map(ControllerNode::id)
740 .filter(id -> !localNodeId.equals(id))
741 .filter(id -> clusterService.getState(id).isActive())
742 .collect(Collectors.toList());
743
744 if (activePeers.isEmpty()) {
745 return;
746 }
747
748 if (lightweightAntiEntropy) {
749 n = activePeers.size() / 2;
750 } else {
751 n = activePeers.size();
752 }
753
754 for (int i = 0; i < n; i++) {
755 sendAdvertisementToPeer(activePeers.get(i));
756 }
757 }
758
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800759 // TODO pull this into the class if this gets pulled out...
760 private static final int DEFAULT_MAX_EVENTS = 1000;
761 private static final int DEFAULT_MAX_IDLE_MS = 10;
762 private static final int DEFAULT_MAX_BATCH_MS = 50;
763 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800764
Madan Jampani3d76c942015-06-29 23:37:10 -0700765 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800766
767 private final NodeId peer;
768
769 private EventAccumulator(NodeId peer) {
770 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
771 this.peer = peer;
772 }
773
774 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700775 public void processItems(List<UpdateEntry<K, V>> items) {
776 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
777 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700778 item.isNewerThan(existing) ? item : existing));
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700779 communicationExecutor.execute(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700780 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700781 updateMessageSubject,
782 serializer::encode,
783 peer)
784 .whenComplete((result, error) -> {
785 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700786 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700787 }
788 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800789 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800790 }
791 }
Jonathan Hart4a29c592015-09-23 17:55:07 -0700792}