blob: 11b446ab0835b149e00f552035ba48c97dc35e6a [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jonathan Hartdb3af892015-01-26 13:19:07 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Madan Jampanif4c88502016-01-21 12:35:36 -080018import static com.google.common.base.Preconditions.checkNotNull;
19import static com.google.common.base.Preconditions.checkState;
20import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
21import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
22import static org.onlab.util.Tools.groupedThreads;
23import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
24import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
Jonathan Hartdb3af892015-01-26 13:19:07 -080025
Jonathan Hartdb3af892015-01-26 13:19:07 -080026import java.util.Collection;
Madan Jampani3d76c942015-06-29 23:37:10 -070027import java.util.Collections;
Jon Halld198b882016-05-18 16:44:40 -070028import java.util.HashSet;
Jonathan Hartdb3af892015-01-26 13:19:07 -080029import java.util.List;
30import java.util.Map;
Madan Jampanid13f3b82015-07-01 17:37:50 -070031import java.util.Objects;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070032import java.util.Optional;
Jonathan Hartdb3af892015-01-26 13:19:07 -080033import java.util.Set;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080034import java.util.Timer;
Madan Jampanifa242182016-01-22 13:42:54 -080035import java.util.concurrent.CompletableFuture;
Jonathan Hartdb3af892015-01-26 13:19:07 -080036import java.util.concurrent.ExecutorService;
37import java.util.concurrent.Executors;
38import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080039import java.util.concurrent.TimeUnit;
Madan Jampani3d76c942015-06-29 23:37:10 -070040import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani43e9c9c2015-06-26 14:16:46 -070041import java.util.concurrent.atomic.AtomicReference;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080042import java.util.function.BiFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -080043import java.util.stream.Collectors;
44
Madan Jampanif4c88502016-01-21 12:35:36 -080045import org.apache.commons.lang3.tuple.Pair;
46import org.onlab.util.AbstractAccumulator;
47import org.onlab.util.KryoNamespace;
48import org.onlab.util.SlidingWindowCounter;
49import org.onosproject.cluster.ClusterService;
50import org.onosproject.cluster.ControllerNode;
51import org.onosproject.cluster.NodeId;
52import org.onosproject.persistence.PersistenceService;
53import org.onosproject.store.LogicalTimestamp;
54import org.onosproject.store.Timestamp;
55import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
56import org.onosproject.store.cluster.messaging.MessageSubject;
57import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070058import org.onosproject.store.serializers.StoreSerializer;
Madan Jampanif4c88502016-01-21 12:35:36 -080059import org.onosproject.store.service.EventuallyConsistentMap;
60import org.onosproject.store.service.EventuallyConsistentMapEvent;
61import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanif4c88502016-01-21 12:35:36 -080062import org.onosproject.store.service.WallClockTimestamp;
63import org.slf4j.Logger;
64import org.slf4j.LoggerFactory;
65
66import com.google.common.collect.Collections2;
67import com.google.common.collect.ImmutableList;
68import com.google.common.collect.ImmutableMap;
69import com.google.common.collect.Lists;
70import com.google.common.collect.Maps;
71import com.google.common.collect.Sets;
Jonathan Hartdb3af892015-01-26 13:19:07 -080072
73/**
74 * Distributed Map implementation which uses optimistic replication and gossip
75 * based techniques to provide an eventually consistent data store.
76 */
77public class EventuallyConsistentMapImpl<K, V>
78 implements EventuallyConsistentMap<K, V> {
79
80 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
81
Madan Jampani3d76c942015-06-29 23:37:10 -070082 private final Map<K, MapValue<V>> items;
Jonathan Hartdb3af892015-01-26 13:19:07 -080083
Jonathan Hartdb3af892015-01-26 13:19:07 -080084 private final ClusterService clusterService;
85 private final ClusterCommunicationService clusterCommunicator;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070086 private final StoreSerializer serializer;
Madan Jampani3d76c942015-06-29 23:37:10 -070087 private final NodeId localNodeId;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -070088 private final PersistenceService persistenceService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
Madan Jampanibcf1a482015-06-24 19:05:56 -070090 private final BiFunction<K, V, Timestamp> timestampProvider;
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
92 private final MessageSubject updateMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080093 private final MessageSubject antiEntropyAdvertisementSubject;
Jon Halld198b882016-05-18 16:44:40 -070094 private final MessageSubject updateRequestSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080095
Jonathan Hartaaa56572015-01-28 21:56:35 -080096 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Madan Jampani3d76c942015-06-29 23:37:10 -070097 = Sets.newCopyOnWriteArraySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -080098
99 private final ExecutorService executor;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800100 private final ScheduledExecutorService backgroundExecutor;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800101 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800102
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700103 private final ExecutorService communicationExecutor;
104 private final Map<NodeId, EventAccumulator> senderPending;
Madan Jampani28726282015-02-19 11:40:23 -0800105
Madan Jampani29f52a32016-04-18 15:20:52 -0700106 private long previousTombstonePurgeTime;
107 private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
108
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700109 private final String mapName;
110
Jonathan Hartdb3af892015-01-26 13:19:07 -0800111 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -0800112 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800113 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800114
Jonathan Hart4f397e82015-02-04 09:10:41 -0800115 private static final String ERROR_NULL_KEY = "Key cannot be null";
116 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
117
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700118 private final long initialDelaySec = 5;
119 private final boolean lightweightAntiEntropy;
120 private final boolean tombstonesDisabled;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800121
Jonathan Hart233a18a2015-03-02 17:24:58 -0800122 private static final int WINDOW_SIZE = 5;
Jon Halldabee682016-05-17 11:29:51 -0700123 private static final int HIGH_LOAD_THRESHOLD = 2;
Jonathan Hart233a18a2015-03-02 17:24:58 -0800124 private static final int LOAD_WINDOW = 2;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700125 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
Jonathan Hart233a18a2015-03-02 17:24:58 -0800126
Jonathan Hartca335e92015-03-05 10:34:32 -0800127 private final boolean persistent;
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700128
129 private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
130
Jonathan Hartca335e92015-03-05 10:34:32 -0800131
Jonathan Hartdb3af892015-01-26 13:19:07 -0800132 /**
133 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800134 * <p>
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700135 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
136 * for more description of the parameters expected by the map.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800137 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800138 *
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700139 * @param mapName a String identifier for the map.
140 * @param clusterService the cluster service
141 * @param clusterCommunicator the cluster communications service
Thomas Vachuskaf5896be2016-05-19 14:30:50 -0700142 * @param ns a Kryo namespace that can serialize
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 * both K and V
Madan Jampanibcf1a482015-06-24 19:05:56 -0700144 * @param timestampProvider provider of timestamps for K and V
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700145 * @param peerUpdateFunction function that provides a set of nodes to immediately
146 * update to when there writes to the map
147 * @param eventExecutor executor to use for processing incoming
148 * events from peers
149 * @param communicationExecutor executor to use for sending events to peers
150 * @param backgroundExecutor executor to use for background anti-entropy
151 * tasks
152 * @param tombstonesDisabled true if this map should not maintain
153 * tombstones
154 * @param antiEntropyPeriod period that the anti-entropy task should run
Jonathan Hartca335e92015-03-05 10:34:32 -0800155 * @param antiEntropyTimeUnit time unit for anti-entropy period
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700156 * @param convergeFaster make anti-entropy try to converge faster
Jonathan Hartca335e92015-03-05 10:34:32 -0800157 * @param persistent persist data to disk
Jian Lidfba7392016-01-22 16:46:58 -0800158 * @param persistenceService persistence service
Jonathan Hartdb3af892015-01-26 13:19:07 -0800159 */
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700160 EventuallyConsistentMapImpl(String mapName,
161 ClusterService clusterService,
162 ClusterCommunicationService clusterCommunicator,
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700163 KryoNamespace ns,
Madan Jampanibcf1a482015-06-24 19:05:56 -0700164 BiFunction<K, V, Timestamp> timestampProvider,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700165 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
166 ExecutorService eventExecutor,
167 ExecutorService communicationExecutor,
168 ScheduledExecutorService backgroundExecutor,
169 boolean tombstonesDisabled,
170 long antiEntropyPeriod,
171 TimeUnit antiEntropyTimeUnit,
Jonathan Hartca335e92015-03-05 10:34:32 -0800172 boolean convergeFaster,
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700173 boolean persistent,
174 PersistenceService persistenceService) {
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700175 this.mapName = mapName;
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700176 this.serializer = createSerializer(ns);
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700177 this.persistenceService = persistenceService;
178 this.persistent =
179 persistent;
180 if (persistent) {
181 items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
182 .withName(PERSISTENT_LOCAL_MAP_NAME)
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700183 .withSerializer(this.serializer)
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700184 .build();
185 } else {
186 items = Maps.newConcurrentMap();
187 }
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800188 senderPending = Maps.newConcurrentMap();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700189 destroyedMessage = mapName + ERROR_DESTROYED;
Madan Jampani28726282015-02-19 11:40:23 -0800190
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700191 this.clusterService = clusterService;
192 this.clusterCommunicator = clusterCommunicator;
Madan Jampani3d76c942015-06-29 23:37:10 -0700193 this.localNodeId = clusterService.getLocalNode().id();
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700194
Madan Jampanibcf1a482015-06-24 19:05:56 -0700195 this.timestampProvider = timestampProvider;
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700196
197 if (peerUpdateFunction != null) {
198 this.peerUpdateFunction = peerUpdateFunction;
199 } else {
200 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
201 .map(ControllerNode::id)
Madan Jampani3d76c942015-06-29 23:37:10 -0700202 .filter(nodeId -> !nodeId.equals(localNodeId))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700203 .collect(Collectors.toList());
204 }
205
206 if (eventExecutor != null) {
207 this.executor = eventExecutor;
208 } else {
209 // should be a normal executor; it's used for receiving messages
210 this.executor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700211 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700212 }
213
214 if (communicationExecutor != null) {
215 this.communicationExecutor = communicationExecutor;
216 } else {
217 // sending executor; should be capped
218 //TODO this probably doesn't need to be bounded anymore
219 this.communicationExecutor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700220 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700221 }
222
Jonathan Hartca335e92015-03-05 10:34:32 -0800223
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700224 if (backgroundExecutor != null) {
225 this.backgroundExecutor = backgroundExecutor;
226 } else {
227 this.backgroundExecutor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700228 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700229 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800230
Jonathan Hartaaa56572015-01-28 21:56:35 -0800231 // start anti-entropy thread
Madan Jampani3d76c942015-06-29 23:37:10 -0700232 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700233 initialDelaySec, antiEntropyPeriod,
234 antiEntropyTimeUnit);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800235
Jonathan Hartdb3af892015-01-26 13:19:07 -0800236 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
237 clusterCommunicator.addSubscriber(updateMessageSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700238 serializer::decode,
239 this::processUpdates,
240 this.executor);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800241
Jonathan Hartaaa56572015-01-28 21:56:35 -0800242 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
243 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
Madan Jampani3d76c942015-06-29 23:37:10 -0700244 serializer::decode,
245 this::handleAntiEntropyAdvertisement,
Madan Jampani29f52a32016-04-18 15:20:52 -0700246 serializer::encode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700247 this.backgroundExecutor);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800248
Jon Halld198b882016-05-18 16:44:40 -0700249 updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
250 clusterCommunicator.addSubscriber(updateRequestSubject,
251 serializer::decode,
252 this::handleUpdateRequests,
253 this.backgroundExecutor);
254
Madan Jampania8f919e2016-04-18 16:47:35 -0700255 if (!tombstonesDisabled) {
256 previousTombstonePurgeTime = 0;
257 this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
258 initialDelaySec,
259 antiEntropyPeriod,
260 TimeUnit.SECONDS);
261 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700262
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700263 this.tombstonesDisabled = tombstonesDisabled;
264 this.lightweightAntiEntropy = !convergeFaster;
Jon Halldabee682016-05-17 11:29:51 -0700265
266 // Initiate first round of Gossip
267 this.bootstrap();
Madan Jampanie1356282015-03-10 19:05:36 -0700268 }
269
HIGUCHI Yuta163efb52016-05-18 19:24:19 -0700270 private StoreSerializer createSerializer(KryoNamespace ns) {
271 return StoreSerializer.using(KryoNamespace.newBuilder()
272 .register(ns)
273 // not so robust way to avoid collision with other
274 // user supplied registrations
275 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
276 .register(KryoNamespaces.BASIC)
277 .register(LogicalTimestamp.class)
278 .register(WallClockTimestamp.class)
279 .register(AntiEntropyAdvertisement.class)
280 .register(AntiEntropyResponse.class)
281 .register(UpdateEntry.class)
282 .register(MapValue.class)
283 .register(MapValue.Digest.class)
284 .build(name() + "-ecmap"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800285 }
286
287 @Override
Madan Jampania090a112016-01-18 16:38:17 -0800288 public String name() {
289 return mapName;
290 }
291
292 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800293 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800294 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700295 // TODO: Maintain a separate counter for tracking live elements in map.
296 return Maps.filterValues(items, MapValue::isAlive).size();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800297 }
298
299 @Override
300 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800301 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700302 return size() == 0;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800303 }
304
305 @Override
306 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800307 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800308 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani3d76c942015-06-29 23:37:10 -0700309 return get(key) != null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310 }
311
312 @Override
313 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800314 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800315 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani3d76c942015-06-29 23:37:10 -0700316 return items.values()
317 .stream()
318 .filter(MapValue::isAlive)
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700319 .anyMatch(v -> value.equals(v.get()));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800320 }
321
322 @Override
323 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800324 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800325 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800326
Madan Jampani3d76c942015-06-29 23:37:10 -0700327 MapValue<V> value = items.get(key);
328 return (value == null || value.isTombstone()) ? null : value.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800329 }
330
331 @Override
332 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800333 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800334 checkNotNull(key, ERROR_NULL_KEY);
335 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800336
Madan Jampani3d76c942015-06-29 23:37:10 -0700337 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700338 if (putInternal(key, newValue)) {
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700339 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
Madan Jampanicab114c2015-07-23 00:14:19 -0700340 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341 }
342 }
343
Jonathan Hartdb3af892015-01-26 13:19:07 -0800344 @Override
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700345 public V remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800346 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800347 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700348 return removeAndNotify(key, null);
349 }
350
351 @Override
352 public void remove(K key, V value) {
353 checkState(!destroyed, destroyedMessage);
354 checkNotNull(key, ERROR_NULL_KEY);
355 checkNotNull(value, ERROR_NULL_VALUE);
356 removeAndNotify(key, value);
357 }
358
359 private V removeAndNotify(K key, V value) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700360 Timestamp timestamp = timestampProvider.apply(key, value);
361 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
362 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700363 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
Madan Jampania0ac4872015-07-02 11:23:49 -0700364 if (previousValue != null) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700365 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
366 peerUpdateFunction.apply(key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700367 if (previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700368 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampania0ac4872015-07-02 11:23:49 -0700369 }
370 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700371 return previousValue != null ? previousValue.get() : null;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800372 }
373
Madan Jampani483d0a22015-08-19 17:33:00 -0700374 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700375 checkState(!destroyed, destroyedMessage);
376 checkNotNull(key, ERROR_NULL_KEY);
377 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani483d0a22015-08-19 17:33:00 -0700378 tombstone.ifPresent(v -> checkState(v.isTombstone()));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700379
380 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700381 AtomicBoolean updated = new AtomicBoolean(false);
Madan Jampanid13f3b82015-07-01 17:37:50 -0700382 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampani3d76c942015-06-29 23:37:10 -0700383 items.compute(key, (k, existing) -> {
Madan Jampanid13f3b82015-07-01 17:37:50 -0700384 boolean valueMatches = true;
385 if (value.isPresent() && existing != null && existing.isAlive()) {
386 valueMatches = Objects.equals(value.get(), existing.get());
Madan Jampani3d76c942015-06-29 23:37:10 -0700387 }
Brian O'Connor6325dad2015-07-07 15:36:29 -0700388 if (existing == null) {
Jonathan Hart4a29c592015-09-23 17:55:07 -0700389 log.trace("ECMap Remove: Existing value for key {} is already null", k);
Brian O'Connor6325dad2015-07-07 15:36:29 -0700390 }
Madan Jampani483d0a22015-08-19 17:33:00 -0700391 if (valueMatches) {
392 if (existing == null) {
393 updated.set(tombstone.isPresent());
394 } else {
395 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
396 }
Madan Jampanid13f3b82015-07-01 17:37:50 -0700397 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700398 if (updated.get()) {
Madan Jampani483d0a22015-08-19 17:33:00 -0700399 previousValue.set(existing);
400 return tombstone.orElse(null);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700401 } else {
402 return existing;
403 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700404 });
Madan Jampanid13f3b82015-07-01 17:37:50 -0700405 return previousValue.get();
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800406 }
407
408 @Override
Madan Jampani4727a112015-07-16 12:12:58 -0700409 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
410 checkState(!destroyed, destroyedMessage);
411 checkNotNull(key, ERROR_NULL_KEY);
412 checkNotNull(recomputeFunction, "Recompute function cannot be null");
413
414 AtomicBoolean updated = new AtomicBoolean(false);
415 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
Madan Jampanidc012972016-04-25 11:13:26 -0700416 MapValue<V> computedValue = items.compute(serializer.copy(key), (k, mv) -> {
Madan Jampani4727a112015-07-16 12:12:58 -0700417 previousValue.set(mv);
418 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
419 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
420 if (mv == null || newValue.isNewerThan(mv)) {
421 updated.set(true);
Madan Jampanidc012972016-04-25 11:13:26 -0700422 // We return a copy to ensure updates to peers can be serialized.
423 // This prevents replica divergence due to serialization failures.
424 return serializer.copy(newValue);
Madan Jampani4727a112015-07-16 12:12:58 -0700425 } else {
426 return mv;
427 }
428 });
429 if (updated.get()) {
430 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
431 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
432 V value = computedValue.isTombstone()
433 ? previousValue.get() == null ? null : previousValue.get().get()
434 : computedValue.get();
435 if (value != null) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700436 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
Madan Jampani4727a112015-07-16 12:12:58 -0700437 }
438 }
439 return computedValue.get();
440 }
441
442 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800443 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800444 checkState(!destroyed, destroyedMessage);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800445 m.forEach(this::put);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446 }
447
448 @Override
449 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800450 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700451 Maps.filterValues(items, MapValue::isAlive)
452 .forEach((k, v) -> remove(k));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800453 }
454
455 @Override
456 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800457 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700458 return Maps.filterValues(items, MapValue::isAlive)
459 .keySet();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800460 }
461
462 @Override
463 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800464 checkState(!destroyed, destroyedMessage);
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700465 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800466 }
467
468 @Override
469 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800470 checkState(!destroyed, destroyedMessage);
Madan Jampani3d76c942015-06-29 23:37:10 -0700471 return Maps.filterValues(items, MapValue::isAlive)
472 .entrySet()
473 .stream()
474 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
475 .collect(Collectors.toSet());
476 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800477
Madan Jampani3d76c942015-06-29 23:37:10 -0700478 /**
479 * Returns true if newValue was accepted i.e. map is updated.
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700480 *
Madan Jampani3d76c942015-06-29 23:37:10 -0700481 * @param key key
482 * @param newValue proposed new value
483 * @return true if update happened; false if map already contains a more recent value for the key
484 */
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700485 private boolean putInternal(K key, MapValue<V> newValue) {
486 checkState(!destroyed, destroyedMessage);
487 checkNotNull(key, ERROR_NULL_KEY);
488 checkNotNull(newValue, ERROR_NULL_VALUE);
489 checkState(newValue.isAlive());
490 counter.incrementCount();
Madan Jampani3d76c942015-06-29 23:37:10 -0700491 AtomicBoolean updated = new AtomicBoolean(false);
492 items.compute(key, (k, existing) -> {
493 if (existing == null || newValue.isNewerThan(existing)) {
494 updated.set(true);
Madan Jampani3d76c942015-06-29 23:37:10 -0700495 return newValue;
496 }
497 return existing;
498 });
Madan Jampani3d76c942015-06-29 23:37:10 -0700499 return updated.get();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800500 }
501
502 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800503 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800504 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800505
506 listeners.add(checkNotNull(listener));
507 }
508
509 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800510 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800511 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800512
513 listeners.remove(checkNotNull(listener));
514 }
515
516 @Override
Madan Jampanifa242182016-01-22 13:42:54 -0800517 public CompletableFuture<Void> destroy() {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800518 destroyed = true;
519
520 executor.shutdown();
521 backgroundExecutor.shutdown();
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800522 communicationExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800523
Jonathan Hart584d2f32015-01-27 19:46:14 -0800524 listeners.clear();
525
Jonathan Hartdb3af892015-01-26 13:19:07 -0800526 clusterCommunicator.removeSubscriber(updateMessageSubject);
Jon Halld198b882016-05-18 16:44:40 -0700527 clusterCommunicator.removeSubscriber(updateRequestSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800528 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Madan Jampanifa242182016-01-22 13:42:54 -0800529 return CompletableFuture.completedFuture(null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800530 }
531
Jonathan Hartaaa56572015-01-28 21:56:35 -0800532 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700533 listeners.forEach(listener -> listener.event(event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800534 }
535
Madan Jampani3d76c942015-06-29 23:37:10 -0700536 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800537 queueUpdate(event, peers);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800538 }
539
Madan Jampani3d76c942015-06-29 23:37:10 -0700540 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800541 if (peers == null) {
542 // we have no friends :(
543 return;
544 }
545 peers.forEach(node ->
Jonathan Hart9a426f82015-09-03 15:43:13 +0200546 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800547 );
548 }
549
Jonathan Hart233a18a2015-03-02 17:24:58 -0800550 private boolean underHighLoad() {
551 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
552 }
553
Madan Jampani3d76c942015-06-29 23:37:10 -0700554 private void sendAdvertisement() {
555 try {
Thomas Vachuska152f9fd2015-04-02 16:28:13 -0700556 if (underHighLoad() || destroyed) {
Jonathan Hart233a18a2015-03-02 17:24:58 -0800557 return;
558 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700559 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
560 } catch (Exception e) {
561 // Catch all exceptions to avoid scheduled task being suppressed.
562 log.error("Exception thrown while sending advertisement", e);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800563 }
564 }
565
Madan Jampani3d76c942015-06-29 23:37:10 -0700566 private Optional<NodeId> pickRandomActivePeer() {
567 List<NodeId> activePeers = clusterService.getNodes()
568 .stream()
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700569 .map(ControllerNode::id)
570 .filter(id -> !localNodeId.equals(id))
Thomas Vachuska7a8de842016-03-07 20:56:35 -0800571 .filter(id -> clusterService.getState(id).isActive())
Madan Jampani3d76c942015-06-29 23:37:10 -0700572 .collect(Collectors.toList());
573 Collections.shuffle(activePeers);
574 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
575 }
576
577 private void sendAdvertisementToPeer(NodeId peer) {
Madan Jampaniee35d552016-04-26 16:07:40 -0700578 long adCreationTime = System.currentTimeMillis();
Madan Jampani29f52a32016-04-18 15:20:52 -0700579 AntiEntropyAdvertisement<K> ad = createAdvertisement();
580 clusterCommunicator.sendAndReceive(ad,
Madan Jampani3d76c942015-06-29 23:37:10 -0700581 antiEntropyAdvertisementSubject,
582 serializer::encode,
Madan Jampani29f52a32016-04-18 15:20:52 -0700583 serializer::decode,
Madan Jampani3d76c942015-06-29 23:37:10 -0700584 peer)
585 .whenComplete((result, error) -> {
586 if (error != null) {
Madan Jampania0ac4872015-07-02 11:23:49 -0700587 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
Madan Jampani29f52a32016-04-18 15:20:52 -0700588 } else if (result == AntiEntropyResponse.PROCESSED) {
Madan Jampaniee35d552016-04-26 16:07:40 -0700589 antiEntropyTimes.put(peer, adCreationTime);
Madan Jampani3d76c942015-06-29 23:37:10 -0700590 }
591 });
592 }
593
Jon Halld198b882016-05-18 16:44:40 -0700594 private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
595 UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
596 clusterCommunicator.unicast(request,
597 updateRequestSubject,
598 serializer::encode,
599 peer)
600 .whenComplete((result, error) -> {
601 if (error != null) {
602 log.debug("Failed to send update request to {}", peer, error);
603 }
604 });
605 }
606
Jonathan Hartaaa56572015-01-28 21:56:35 -0800607 private AntiEntropyAdvertisement<K> createAdvertisement() {
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700608 return new AntiEntropyAdvertisement<>(localNodeId,
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700609 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800610 }
611
Madan Jampani29f52a32016-04-18 15:20:52 -0700612 private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700613 if (destroyed || underHighLoad()) {
Madan Jampani29f52a32016-04-18 15:20:52 -0700614 return AntiEntropyResponse.IGNORED;
Madan Jampani3d76c942015-06-29 23:37:10 -0700615 }
616 try {
Jonathan Hart9a426f82015-09-03 15:43:13 +0200617 if (log.isTraceEnabled()) {
618 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
Jon Halld198b882016-05-18 16:44:40 -0700619 ad.sender(), mapName, ad.digest().size());
Jonathan Hart9a426f82015-09-03 15:43:13 +0200620 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700621 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
Madan Jampani3d76c942015-06-29 23:37:10 -0700622 } catch (Exception e) {
623 log.warn("Error handling anti-entropy advertisement", e);
Madan Jampani29f52a32016-04-18 15:20:52 -0700624 return AntiEntropyResponse.FAILED;
HIGUCHI Yuta00c3f572015-02-25 07:33:50 -0800625 }
Madan Jampani29f52a32016-04-18 15:20:52 -0700626 return AntiEntropyResponse.PROCESSED;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800627 }
628
629 /**
Madan Jampani3d76c942015-06-29 23:37:10 -0700630 * Processes anti-entropy ad from peer by taking following actions:
631 * 1. If peer has an old entry, updates peer.
632 * 2. If peer indicates an entry is removed and has a more recent
633 * timestamp than the local entry, update local state.
Jonathan Hartaaa56572015-01-28 21:56:35 -0800634 */
Jonathan Hartaaa56572015-01-28 21:56:35 -0800635 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
636 AntiEntropyAdvertisement<K> ad) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700637 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
Jonathan Hartaaa56572015-01-28 21:56:35 -0800638 final NodeId sender = ad.sender();
Jon Halld198b882016-05-18 16:44:40 -0700639 final List<NodeId> peers = ImmutableList.of(sender);
640 Set<K> staleOrMissing = new HashSet<>();
641 Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());
642
Madan Jampani3d76c942015-06-29 23:37:10 -0700643 items.forEach((key, localValue) -> {
Jon Halld198b882016-05-18 16:44:40 -0700644 locallyUnknown.remove(key);
Madan Jampani3d76c942015-06-29 23:37:10 -0700645 MapValue.Digest remoteValueDigest = ad.digest().get(key);
646 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800647 // local value is more recent, push to sender
Jon Halld198b882016-05-18 16:44:40 -0700648 queueUpdate(new UpdateEntry<>(key, localValue), peers);
649 } else if (remoteValueDigest != null
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700650 && remoteValueDigest.isNewerThan(localValue.digest())
651 && remoteValueDigest.isTombstone()) {
Jon Halld198b882016-05-18 16:44:40 -0700652 // remote value is more recent and a tombstone: update local value
Madan Jampani483d0a22015-08-19 17:33:00 -0700653 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
Madan Jampanid13f3b82015-07-01 17:37:50 -0700654 MapValue<V> previousValue = removeInternal(key,
655 Optional.empty(),
Madan Jampani483d0a22015-08-19 17:33:00 -0700656 Optional.of(tombstone));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700657 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700658 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800659 }
Jon Halld198b882016-05-18 16:44:40 -0700660 } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
661 // Not a tombstone and remote is newer
662 staleOrMissing.add(key);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800663 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700664 });
Jon Halld198b882016-05-18 16:44:40 -0700665 // Keys missing in local map
666 staleOrMissing.addAll(locallyUnknown);
667 // Request updates that we missed out on
668 sendUpdateRequestToPeer(sender, staleOrMissing);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800669 return externalEvents;
670 }
671
Jon Halld198b882016-05-18 16:44:40 -0700672 private void handleUpdateRequests(UpdateRequest<K> request) {
673 final Set<K> keys = request.keys();
674 final NodeId sender = request.sender();
675 final List<NodeId> peers = ImmutableList.of(sender);
676
677 keys.forEach(key ->
678 queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
679 );
680 }
681
Madan Jampani29f52a32016-04-18 15:20:52 -0700682 private void purgeTombstones() {
683 /*
Jon Halld198b882016-05-18 16:44:40 -0700684 * In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
Madan Jampani29f52a32016-04-18 15:20:52 -0700685 * of tombstones we employ the following heuristic to purge old tombstones periodically.
686 * First, we keep track of the time (local system time) when we were able to have a successful
687 * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
688 * as the time before which all tombstones are considered safe to purge.
689 */
Madan Jampania8f919e2016-04-18 16:47:35 -0700690 long currentSafeTombstonePurgeTime = clusterService.getNodes()
691 .stream()
692 .map(ControllerNode::id)
693 .filter(id -> !id.equals(localNodeId))
694 .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
695 .reduce(Math::min)
696 .orElse(0L);
Madan Jampani29f52a32016-04-18 15:20:52 -0700697 if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
698 return;
699 }
700 List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
701 .stream()
702 .filter(e -> e.getValue().isTombstone())
703 .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
704 .collect(Collectors.toList());
705 previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
706 tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
707 }
708
Madan Jampani3d76c942015-06-29 23:37:10 -0700709 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
710 if (destroyed) {
711 return;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800712 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700713 updates.forEach(update -> {
714 final K key = update.key();
Madan Jampani29f52a32016-04-18 15:20:52 -0700715 final MapValue<V> value = update.value() == null ? null : update.value().copy();
Madan Jampani483d0a22015-08-19 17:33:00 -0700716 if (value == null || value.isTombstone()) {
717 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700718 if (previousValue != null && previousValue.isAlive()) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700719 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
Madan Jampanid13f3b82015-07-01 17:37:50 -0700720 }
Madan Jampani4f1f4cd2015-07-08 23:05:35 -0700721 } else if (putInternal(key, value)) {
Madan Jampanicab114c2015-07-23 00:14:19 -0700722 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800723 }
Madan Jampani3d76c942015-06-29 23:37:10 -0700724 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800725 }
726
Jon Halldabee682016-05-17 11:29:51 -0700727 private void bootstrap() {
728 /*
729 * Attempt to get in sync with the cluster when a map is created. This is to help avoid a new node
730 * writing to an ECM until it has a view of the map. Depending on how lightweight the map instance
731 * is, this will attempt to advertise to all or some of the peers.
732 */
733 int n = 0;
734 List<NodeId> activePeers = clusterService.getNodes()
735 .stream()
736 .map(ControllerNode::id)
737 .filter(id -> !localNodeId.equals(id))
738 .filter(id -> clusterService.getState(id).isActive())
739 .collect(Collectors.toList());
740
741 if (activePeers.isEmpty()) {
742 return;
743 }
744
745 if (lightweightAntiEntropy) {
746 n = activePeers.size() / 2;
747 } else {
748 n = activePeers.size();
749 }
750
751 for (int i = 0; i < n; i++) {
752 sendAdvertisementToPeer(activePeers.get(i));
753 }
754 }
755
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800756 // TODO pull this into the class if this gets pulled out...
757 private static final int DEFAULT_MAX_EVENTS = 1000;
758 private static final int DEFAULT_MAX_IDLE_MS = 10;
759 private static final int DEFAULT_MAX_BATCH_MS = 50;
760 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
Jonathan Hartdb3af892015-01-26 13:19:07 -0800761
Madan Jampani3d76c942015-06-29 23:37:10 -0700762 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800763
764 private final NodeId peer;
765
766 private EventAccumulator(NodeId peer) {
767 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
768 this.peer = peer;
769 }
770
771 @Override
Madan Jampani3d76c942015-06-29 23:37:10 -0700772 public void processItems(List<UpdateEntry<K, V>> items) {
773 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
774 items.forEach(item -> map.compute(item.key(), (key, existing) ->
Madan Jampani92c64eb2015-07-23 15:37:07 -0700775 item.isNewerThan(existing) ? item : existing));
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700776 communicationExecutor.execute(() -> {
Madan Jampani3d76c942015-06-29 23:37:10 -0700777 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
Madan Jampani175e8fd2015-05-20 14:10:45 -0700778 updateMessageSubject,
779 serializer::encode,
780 peer)
781 .whenComplete((result, error) -> {
782 if (error != null) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700783 log.debug("Failed to send to {}", peer, error);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700784 }
785 });
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800786 });
Jonathan Hartdb3af892015-01-26 13:19:07 -0800787 }
788 }
Jonathan Hart4a29c592015-09-23 17:55:07 -0700789}