blob: 66e85bd733393f596edb2af5543e0efeb99fe8ed [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
Jonathan Hartf9108232015-02-02 16:37:35 -080019import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080020import org.onlab.util.KryoNamespace;
21import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.NodeId;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
29import org.onosproject.store.serializers.KryoSerializer;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import java.io.IOException;
34import java.util.ArrayList;
35import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080036import java.util.HashMap;
37import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import java.util.List;
39import java.util.Map;
40import java.util.Set;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.CopyOnWriteArraySet;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.Executors;
45import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080046import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.stream.Collectors;
48
49import static com.google.common.base.Preconditions.checkNotNull;
50import static com.google.common.base.Preconditions.checkState;
51import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
52import static org.onlab.util.Tools.minPriority;
53import static org.onlab.util.Tools.namedThreads;
54
55/**
56 * Distributed Map implementation which uses optimistic replication and gossip
57 * based techniques to provide an eventually consistent data store.
58 */
59public class EventuallyConsistentMapImpl<K, V>
60 implements EventuallyConsistentMap<K, V> {
61
62 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
63
64 private final Map<K, Timestamped<V>> items;
65 private final Map<K, Timestamp> removedItems;
66
67 private final String mapName;
68 private final ClusterService clusterService;
69 private final ClusterCommunicationService clusterCommunicator;
70 private final KryoSerializer serializer;
71
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080072 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080073
74 private final MessageSubject updateMessageSubject;
75 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080076 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080077
Jonathan Hartaaa56572015-01-28 21:56:35 -080078 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 = new CopyOnWriteArraySet<>();
80
81 private final ExecutorService executor;
82
83 private final ScheduledExecutorService backgroundExecutor;
84
85 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080086 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
Jonathan Hart4f397e82015-02-04 09:10:41 -080088 private static final String ERROR_NULL_KEY = "Key cannot be null";
89 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
90
Jonathan Hartdb3af892015-01-26 13:19:07 -080091 // TODO: Make these anti-entropy params configurable
92 private long initialDelaySec = 5;
93 private long periodSec = 5;
94
95 /**
96 * Creates a new eventually consistent map shared amongst multiple instances.
97 *
98 * Each map is identified by a string map name. EventuallyConsistentMapImpl
99 * objects in different JVMs that use the same map name will form a
100 * distributed map across JVMs (provided the cluster service is aware of
101 * both nodes).
102 *
103 * The client is expected to provide an
104 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
105 * will be stored in this map have been registered (including referenced
106 * classes). This serializer will be used to serialize both K and V for
107 * inter-node notifications.
108 *
109 * The client must provide an {@link org.onosproject.store.impl.ClockService}
110 * which can generate timestamps for a given key. The clock service is free
111 * to generate timestamps however it wishes, however these timestamps will
112 * be used to serialize updates to the map so they must be strict enough
113 * to ensure updates are properly ordered for the use case (i.e. in some
114 * cases wallclock time will suffice, whereas in other cases logical time
115 * will be necessary).
116 *
117 * @param mapName a String identifier for the map.
118 * @param clusterService the cluster service
119 * @param clusterCommunicator the cluster communications service
120 * @param serializerBuilder a Kryo namespace builder that can serialize
121 * both K and V
122 * @param clockService a clock service able to generate timestamps
123 * for K
124 */
125 public EventuallyConsistentMapImpl(String mapName,
126 ClusterService clusterService,
127 ClusterCommunicationService clusterCommunicator,
128 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800129 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800130
131 this.mapName = checkNotNull(mapName);
132 this.clusterService = checkNotNull(clusterService);
133 this.clusterCommunicator = checkNotNull(clusterCommunicator);
134
135 serializer = createSerializer(checkNotNull(serializerBuilder));
136
137 this.clockService = checkNotNull(clockService);
138
139 items = new ConcurrentHashMap<>();
140 removedItems = new ConcurrentHashMap<>();
141
142 executor = Executors
143 .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
144
145 backgroundExecutor =
146 newSingleThreadScheduledExecutor(minPriority(
147 namedThreads("onos-ecm-" + mapName + "-bg-%d")));
148
Jonathan Hartaaa56572015-01-28 21:56:35 -0800149 // start anti-entropy thread
150 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
151 initialDelaySec, periodSec,
152 TimeUnit.SECONDS);
153
Jonathan Hartdb3af892015-01-26 13:19:07 -0800154 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
155 clusterCommunicator.addSubscriber(updateMessageSubject,
156 new InternalPutEventListener());
157 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
158 clusterCommunicator.addSubscriber(removeMessageSubject,
159 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800160 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
161 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
162 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800163 }
164
165 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
166 return new KryoSerializer() {
167 @Override
168 protected void setupKryoPool() {
169 // Add the map's internal helper classes to the user-supplied serializer
170 serializerPool = builder
171 .register(WallClockTimestamp.class)
172 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800173 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800174 .register(ArrayList.class)
175 .register(InternalPutEvent.class)
176 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800177 .register(AntiEntropyAdvertisement.class)
178 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800179 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800180 }
181 };
182 }
183
184 @Override
185 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800186 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800187 return items.size();
188 }
189
190 @Override
191 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800192 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800193 return items.isEmpty();
194 }
195
196 @Override
197 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800198 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800199 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800200 return items.containsKey(key);
201 }
202
203 @Override
204 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800205 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800206 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800207
208 return items.values().stream()
209 .anyMatch(timestamped -> timestamped.value().equals(value));
210 }
211
212 @Override
213 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800214 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800215 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800216
217 Timestamped<V> value = items.get(key);
218 if (value != null) {
219 return value.value();
220 }
221 return null;
222 }
223
224 @Override
225 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800226 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800227 checkNotNull(key, ERROR_NULL_KEY);
228 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800229
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800230 Timestamp timestamp = clockService.getTimestamp(key, value);
231
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 if (putInternal(key, value, timestamp)) {
233 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
234 EventuallyConsistentMapEvent<K, V> externalEvent
235 = new EventuallyConsistentMapEvent<>(
236 EventuallyConsistentMapEvent.Type.PUT, key, value);
237 notifyListeners(externalEvent);
238 }
239 }
240
241 private boolean putInternal(K key, V value, Timestamp timestamp) {
242 synchronized (this) {
243 Timestamp removed = removedItems.get(key);
244 if (removed != null && removed.compareTo(timestamp) > 0) {
Jonathan Hart07e58be2015-02-12 09:57:16 -0800245 log.debug("ecmap - removed was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800246 return false;
247 }
248
249 Timestamped<V> existing = items.get(key);
250 if (existing != null && existing.isNewer(timestamp)) {
Jonathan Hart07e58be2015-02-12 09:57:16 -0800251 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800252 return false;
253 } else {
254 items.put(key, new Timestamped<>(value, timestamp));
255 removedItems.remove(key);
256 return true;
257 }
258 }
259 }
260
261 @Override
262 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800263 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800264 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800266 // TODO prevent calls here if value is important for timestamp
267 Timestamp timestamp = clockService.getTimestamp(key, null);
268
Jonathan Hartdb3af892015-01-26 13:19:07 -0800269 if (removeInternal(key, timestamp)) {
270 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
271 EventuallyConsistentMapEvent<K, V> externalEvent
272 = new EventuallyConsistentMapEvent<>(
273 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
274 notifyListeners(externalEvent);
275 }
276 }
277
278 private boolean removeInternal(K key, Timestamp timestamp) {
279 synchronized (this) {
280 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
281 return false;
282 }
283
284 items.remove(key);
285 removedItems.put(key, timestamp);
286 return true;
287 }
288 }
289
290 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800291 public void remove(K key, V value) {
292 checkState(!destroyed, mapName + ERROR_DESTROYED);
293 checkNotNull(key, ERROR_NULL_KEY);
294 checkNotNull(value, ERROR_NULL_VALUE);
295
296 Timestamp timestamp = clockService.getTimestamp(key, value);
297
298 if (removeInternal(key, timestamp)) {
299 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
300 EventuallyConsistentMapEvent<K, V> externalEvent
301 = new EventuallyConsistentMapEvent<>(
302 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
303 notifyListeners(externalEvent);
304 }
305 }
306
307 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800308 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800309 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800310
311 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
312
313 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
314 K key = entry.getKey();
315 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800316
317 checkNotNull(key, ERROR_NULL_KEY);
318 checkNotNull(value, ERROR_NULL_VALUE);
319
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800320 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800321
322 if (putInternal(key, value, timestamp)) {
323 updates.add(new PutEntry<>(key, value, timestamp));
324 }
325 }
326
Jonathan Hart584d2f32015-01-27 19:46:14 -0800327 if (!updates.isEmpty()) {
328 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800329
Jonathan Hart584d2f32015-01-27 19:46:14 -0800330 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800331 EventuallyConsistentMapEvent<K, V> externalEvent =
332 new EventuallyConsistentMapEvent<>(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800333 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
334 entry.value());
335 notifyListeners(externalEvent);
336 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800337 }
338 }
339
340 @Override
341 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800342 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800343
344 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
345
346 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800347 // TODO also this is not applicable if value is important for timestamp?
348 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800349
350 if (removeInternal(key, timestamp)) {
351 removed.add(new RemoveEntry<>(key, timestamp));
352 }
353 }
354
Jonathan Hart584d2f32015-01-27 19:46:14 -0800355 if (!removed.isEmpty()) {
356 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800357
Jonathan Hart584d2f32015-01-27 19:46:14 -0800358 for (RemoveEntry<K> entry : removed) {
359 EventuallyConsistentMapEvent<K, V> externalEvent
360 = new EventuallyConsistentMapEvent<>(
361 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
362 null);
363 notifyListeners(externalEvent);
364 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800365 }
366 }
367
368 @Override
369 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800370 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800371
372 return items.keySet();
373 }
374
375 @Override
376 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800377 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800378
379 return items.values().stream()
380 .map(Timestamped::value)
381 .collect(Collectors.toList());
382 }
383
384 @Override
385 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800386 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800387
388 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800389 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800390 .collect(Collectors.toSet());
391 }
392
393 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800394 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
395 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800396
397 listeners.add(checkNotNull(listener));
398 }
399
400 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800401 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
402 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800403
404 listeners.remove(checkNotNull(listener));
405 }
406
407 @Override
408 public void destroy() {
409 destroyed = true;
410
411 executor.shutdown();
412 backgroundExecutor.shutdown();
413
Jonathan Hart584d2f32015-01-27 19:46:14 -0800414 listeners.clear();
415
Jonathan Hartdb3af892015-01-26 13:19:07 -0800416 clusterCommunicator.removeSubscriber(updateMessageSubject);
417 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800418 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800419 }
420
Jonathan Hartaaa56572015-01-28 21:56:35 -0800421 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
422 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800423 listener.event(event);
424 }
425 }
426
427 private void notifyPeers(InternalPutEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800428 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800429 }
430
431 private void notifyPeers(InternalRemoveEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800432 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800433 }
434
Jonathan Hart7d656f42015-01-27 14:07:23 -0800435 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800436 ClusterMessage message = new ClusterMessage(
437 clusterService.getLocalNode().id(),
438 subject,
439 serializer.encode(event));
440 clusterCommunicator.broadcast(message);
441 }
442
443 private void unicastMessage(NodeId peer,
444 MessageSubject subject,
445 Object event) throws IOException {
446 ClusterMessage message = new ClusterMessage(
447 clusterService.getLocalNode().id(),
448 subject,
449 serializer.encode(event));
450 clusterCommunicator.unicast(message, peer);
451 }
452
Jonathan Hartaaa56572015-01-28 21:56:35 -0800453 private final class SendAdvertisementTask implements Runnable {
454 @Override
455 public void run() {
456 if (Thread.currentThread().isInterrupted()) {
457 log.info("Interrupted, quitting");
458 return;
459 }
460
461 try {
462 final NodeId self = clusterService.getLocalNode().id();
463 Set<ControllerNode> nodes = clusterService.getNodes();
464
465 List<NodeId> nodeIds = nodes.stream()
466 .map(node -> node.id())
467 .collect(Collectors.toList());
468
469 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
470 log.trace("No other peers in the cluster.");
471 return;
472 }
473
474 NodeId peer;
475 do {
476 int idx = RandomUtils.nextInt(0, nodeIds.size());
477 peer = nodeIds.get(idx);
478 } while (peer.equals(self));
479
480 if (Thread.currentThread().isInterrupted()) {
481 log.info("Interrupted, quitting");
482 return;
483 }
484
485 AntiEntropyAdvertisement<K> ad = createAdvertisement();
486
487 try {
488 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
489 } catch (IOException e) {
490 log.debug("Failed to send anti-entropy advertisement to {}", peer);
491 }
492 } catch (Exception e) {
493 // Catch all exceptions to avoid scheduled task being suppressed.
494 log.error("Exception thrown while sending advertisement", e);
495 }
496 }
497 }
498
499 private AntiEntropyAdvertisement<K> createAdvertisement() {
500 final NodeId self = clusterService.getLocalNode().id();
501
502 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
503
504 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
505
506 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
507
508 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
509 }
510
511 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
512 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
513
514 synchronized (this) {
515 final NodeId sender = ad.sender();
516
517 externalEvents = antiEntropyCheckLocalItems(ad);
518
519 antiEntropyCheckLocalRemoved(ad);
520
521 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
522
523 // if remote ad has something unknown, actively sync
524 for (K key : ad.timestamps().keySet()) {
525 if (!items.containsKey(key)) {
526 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
527 try {
528 unicastMessage(sender, antiEntropyAdvertisementSubject,
529 myAd);
530 break;
531 } catch (IOException e) {
532 log.debug(
533 "Failed to send reactive anti-entropy advertisement to {}",
534 sender);
535 }
536 }
537 }
538 } // synchronized (this)
539
540 externalEvents.forEach(this::notifyListeners);
541 }
542
543 /**
544 * Checks if any of the remote's live items or tombstones are out of date
545 * according to our local live item list, or if our live items are out of
546 * date according to the remote's tombstone list.
547 * If the local copy is more recent, it will be pushed to the remote. If the
548 * remote has a more recent remove, we apply that to the local state.
549 *
550 * @param ad remote anti-entropy advertisement
551 * @return list of external events relating to local operations performed
552 */
553 // Guarded by synchronized (this)
554 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
555 AntiEntropyAdvertisement<K> ad) {
556 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
557 = new LinkedList<>();
558 final NodeId sender = ad.sender();
559
560 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
561
562 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
563 K key = item.getKey();
564 Timestamped<V> localValue = item.getValue();
565
566 Timestamp remoteTimestamp = ad.timestamps().get(key);
567 if (remoteTimestamp == null) {
568 remoteTimestamp = ad.tombstones().get(key);
569 }
570 if (remoteTimestamp == null || localValue
571 .isNewer(remoteTimestamp)) {
572 // local value is more recent, push to sender
573 updatesToSend
574 .add(new PutEntry<>(key, localValue.value(),
575 localValue.timestamp()));
576 }
577
578 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
579 if (remoteDeadTimestamp != null &&
580 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
581 // sender has a more recent remove
582 if (removeInternal(key, remoteDeadTimestamp)) {
583 externalEvents.add(new EventuallyConsistentMapEvent<>(
584 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
585 }
586 }
587 }
588
589 // Send all updates to the peer at once
590 if (!updatesToSend.isEmpty()) {
591 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800592 unicastMessage(sender, updateMessageSubject,
593 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800594 } catch (IOException e) {
595 log.warn("Failed to send advertisement response", e);
596 }
597 }
598
599 return externalEvents;
600 }
601
602 /**
603 * Checks if any items in the remote live list are out of date according
604 * to our tombstone list. If we find we have a more up to date tombstone,
605 * we'll send it to the remote.
606 *
607 * @param ad remote anti-entropy advertisement
608 */
609 // Guarded by synchronized (this)
610 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
611 final NodeId sender = ad.sender();
612
613 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
614
615 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
616 K key = dead.getKey();
617 Timestamp localDeadTimestamp = dead.getValue();
618
619 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
620 if (remoteLiveTimestamp != null
621 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
622 // sender has zombie, push remove
623 removesToSend
624 .add(new RemoveEntry<>(key, localDeadTimestamp));
625 }
626 }
627
628 // Send all removes to the peer at once
629 if (!removesToSend.isEmpty()) {
630 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800631 unicastMessage(sender, removeMessageSubject,
632 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800633 } catch (IOException e) {
634 log.warn("Failed to send advertisement response", e);
635 }
636 }
637 }
638
639 /**
640 * Checks if any of the local live items are out of date according to the
641 * remote's tombstone advertisements. If we find a local item is out of date,
642 * we'll apply the remove operation to the local state.
643 *
644 * @param ad remote anti-entropy advertisement
645 * @return list of external events relating to local operations performed
646 */
647 // Guarded by synchronized (this)
648 private List<EventuallyConsistentMapEvent<K, V>>
649 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
650 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
651 = new LinkedList<>();
652
653 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
654 K key = remoteDead.getKey();
655 Timestamp remoteDeadTimestamp = remoteDead.getValue();
656
657 Timestamped<V> local = items.get(key);
658 Timestamp localDead = removedItems.get(key);
659 if (local != null
660 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
661 // remove our version
662 if (removeInternal(key, remoteDeadTimestamp)) {
663 externalEvents.add(new EventuallyConsistentMapEvent<>(
664 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
665 }
666 } else if (localDead != null &&
667 remoteDeadTimestamp.compareTo(localDead) > 0) {
668 // If we both had the item as removed, but their timestamp is
669 // newer, update ours to the newer value
670 removedItems.put(key, remoteDeadTimestamp);
671 }
672 }
673
674 return externalEvents;
675 }
676
677 private final class InternalAntiEntropyListener
678 implements ClusterMessageHandler {
679
680 @Override
681 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800682 log.trace("Received anti-entropy advertisement from peer: {}",
683 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800684 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
685 backgroundExecutor.submit(() -> {
686 try {
687 handleAntiEntropyAdvertisement(advertisement);
688 } catch (Exception e) {
689 log.warn("Exception thrown handling advertisements", e);
690 }
691 });
692 }
693 }
694
Jonathan Hartdb3af892015-01-26 13:19:07 -0800695 private final class InternalPutEventListener implements
696 ClusterMessageHandler {
697 @Override
698 public void handle(ClusterMessage message) {
699 log.debug("Received put event from peer: {}", message.sender());
700 InternalPutEvent<K, V> event = serializer.decode(message.payload());
701
702 executor.submit(() -> {
703 try {
704 for (PutEntry<K, V> entry : event.entries()) {
705 K key = entry.key();
706 V value = entry.value();
707 Timestamp timestamp = entry.timestamp();
708
709 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800710 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800711 new EventuallyConsistentMapEvent<>(
712 EventuallyConsistentMapEvent.Type.PUT, key,
713 value);
714 notifyListeners(externalEvent);
715 }
716 }
717 } catch (Exception e) {
718 log.warn("Exception thrown handling put", e);
719 }
720 });
721 }
722 }
723
724 private final class InternalRemoveEventListener implements
725 ClusterMessageHandler {
726 @Override
727 public void handle(ClusterMessage message) {
728 log.debug("Received remove event from peer: {}", message.sender());
729 InternalRemoveEvent<K> event = serializer.decode(message.payload());
730
731 executor.submit(() -> {
732 try {
733 for (RemoveEntry<K> entry : event.entries()) {
734 K key = entry.key();
735 Timestamp timestamp = entry.timestamp();
736
737 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800738 EventuallyConsistentMapEvent<K, V> externalEvent
739 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800740 EventuallyConsistentMapEvent.Type.REMOVE,
741 key, null);
742 notifyListeners(externalEvent);
743 }
744 }
745 } catch (Exception e) {
746 log.warn("Exception thrown handling remove", e);
747 }
748 });
749 }
750 }
751
Jonathan Hartdb3af892015-01-26 13:19:07 -0800752}