blob: 450f1c2439e52c4dd271679f40f9a41aab8435f7 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
18import com.google.common.base.MoreObjects;
Jonathan Hart584d2f32015-01-27 19:46:14 -080019import com.google.common.collect.ImmutableList;
Jonathan Hartaaa56572015-01-28 21:56:35 -080020import org.apache.commons.lang3.RandomUtils;
Jonathan Hartdb3af892015-01-26 13:19:07 -080021import org.onlab.util.KryoNamespace;
22import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080023import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080024import org.onosproject.cluster.NodeId;
25import org.onosproject.store.Timestamp;
26import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
27import org.onosproject.store.cluster.messaging.ClusterMessage;
28import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
29import org.onosproject.store.cluster.messaging.MessageSubject;
30import org.onosproject.store.serializers.KryoSerializer;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import java.io.IOException;
35import java.util.ArrayList;
36import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080037import java.util.HashMap;
38import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080039import java.util.List;
40import java.util.Map;
Jonathan Hart584d2f32015-01-27 19:46:14 -080041import java.util.Objects;
Jonathan Hartdb3af892015-01-26 13:19:07 -080042import java.util.Set;
43import java.util.concurrent.ConcurrentHashMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.ExecutorService;
46import java.util.concurrent.Executors;
47import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080048import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080049import java.util.stream.Collectors;
50
51import static com.google.common.base.Preconditions.checkNotNull;
52import static com.google.common.base.Preconditions.checkState;
53import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
54import static org.onlab.util.Tools.minPriority;
55import static org.onlab.util.Tools.namedThreads;
56
57/**
58 * Distributed Map implementation which uses optimistic replication and gossip
59 * based techniques to provide an eventually consistent data store.
60 */
61public class EventuallyConsistentMapImpl<K, V>
62 implements EventuallyConsistentMap<K, V> {
63
64 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
65
66 private final Map<K, Timestamped<V>> items;
67 private final Map<K, Timestamp> removedItems;
68
69 private final String mapName;
70 private final ClusterService clusterService;
71 private final ClusterCommunicationService clusterCommunicator;
72 private final KryoSerializer serializer;
73
74 private final ClockService<K> clockService;
75
76 private final MessageSubject updateMessageSubject;
77 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080078 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080079
Jonathan Hartaaa56572015-01-28 21:56:35 -080080 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080081 = new CopyOnWriteArraySet<>();
82
83 private final ExecutorService executor;
84
85 private final ScheduledExecutorService backgroundExecutor;
86
87 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080088 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080089
90 // TODO: Make these anti-entropy params configurable
91 private long initialDelaySec = 5;
92 private long periodSec = 5;
93
94 /**
95 * Creates a new eventually consistent map shared amongst multiple instances.
96 *
97 * Each map is identified by a string map name. EventuallyConsistentMapImpl
98 * objects in different JVMs that use the same map name will form a
99 * distributed map across JVMs (provided the cluster service is aware of
100 * both nodes).
101 *
102 * The client is expected to provide an
103 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
104 * will be stored in this map have been registered (including referenced
105 * classes). This serializer will be used to serialize both K and V for
106 * inter-node notifications.
107 *
108 * The client must provide an {@link org.onosproject.store.impl.ClockService}
109 * which can generate timestamps for a given key. The clock service is free
110 * to generate timestamps however it wishes, however these timestamps will
111 * be used to serialize updates to the map so they must be strict enough
112 * to ensure updates are properly ordered for the use case (i.e. in some
113 * cases wallclock time will suffice, whereas in other cases logical time
114 * will be necessary).
115 *
116 * @param mapName a String identifier for the map.
117 * @param clusterService the cluster service
118 * @param clusterCommunicator the cluster communications service
119 * @param serializerBuilder a Kryo namespace builder that can serialize
120 * both K and V
121 * @param clockService a clock service able to generate timestamps
122 * for K
123 */
124 public EventuallyConsistentMapImpl(String mapName,
125 ClusterService clusterService,
126 ClusterCommunicationService clusterCommunicator,
127 KryoNamespace.Builder serializerBuilder,
128 ClockService<K> clockService) {
129
130 this.mapName = checkNotNull(mapName);
131 this.clusterService = checkNotNull(clusterService);
132 this.clusterCommunicator = checkNotNull(clusterCommunicator);
133
134 serializer = createSerializer(checkNotNull(serializerBuilder));
135
136 this.clockService = checkNotNull(clockService);
137
138 items = new ConcurrentHashMap<>();
139 removedItems = new ConcurrentHashMap<>();
140
141 executor = Executors
142 .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
143
144 backgroundExecutor =
145 newSingleThreadScheduledExecutor(minPriority(
146 namedThreads("onos-ecm-" + mapName + "-bg-%d")));
147
Jonathan Hartaaa56572015-01-28 21:56:35 -0800148 // start anti-entropy thread
149 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
150 initialDelaySec, periodSec,
151 TimeUnit.SECONDS);
152
Jonathan Hartdb3af892015-01-26 13:19:07 -0800153 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
154 clusterCommunicator.addSubscriber(updateMessageSubject,
155 new InternalPutEventListener());
156 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
157 clusterCommunicator.addSubscriber(removeMessageSubject,
158 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800159 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
160 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
161 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800162 }
163
164 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
165 return new KryoSerializer() {
166 @Override
167 protected void setupKryoPool() {
168 // Add the map's internal helper classes to the user-supplied serializer
169 serializerPool = builder
170 .register(WallClockTimestamp.class)
171 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800172 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800173 .register(ArrayList.class)
174 .register(InternalPutEvent.class)
175 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800176 .register(AntiEntropyAdvertisement.class)
177 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800178 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800179 }
180 };
181 }
182
183 @Override
184 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800185 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800186 return items.size();
187 }
188
189 @Override
190 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800191 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800192 return items.isEmpty();
193 }
194
195 @Override
196 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800197 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800198 return items.containsKey(key);
199 }
200
201 @Override
202 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800203 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800204
205 return items.values().stream()
206 .anyMatch(timestamped -> timestamped.value().equals(value));
207 }
208
209 @Override
210 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800211 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800212
213 Timestamped<V> value = items.get(key);
214 if (value != null) {
215 return value.value();
216 }
217 return null;
218 }
219
220 @Override
221 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800222 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800223
224 Timestamp timestamp = clockService.getTimestamp(key);
225 if (putInternal(key, value, timestamp)) {
226 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
227 EventuallyConsistentMapEvent<K, V> externalEvent
228 = new EventuallyConsistentMapEvent<>(
229 EventuallyConsistentMapEvent.Type.PUT, key, value);
230 notifyListeners(externalEvent);
231 }
232 }
233
234 private boolean putInternal(K key, V value, Timestamp timestamp) {
235 synchronized (this) {
236 Timestamp removed = removedItems.get(key);
237 if (removed != null && removed.compareTo(timestamp) > 0) {
238 return false;
239 }
240
241 Timestamped<V> existing = items.get(key);
242 if (existing != null && existing.isNewer(timestamp)) {
243 return false;
244 } else {
245 items.put(key, new Timestamped<>(value, timestamp));
246 removedItems.remove(key);
247 return true;
248 }
249 }
250 }
251
252 @Override
253 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800254 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800255
256 Timestamp timestamp = clockService.getTimestamp(key);
257 if (removeInternal(key, timestamp)) {
258 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
259 EventuallyConsistentMapEvent<K, V> externalEvent
260 = new EventuallyConsistentMapEvent<>(
261 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
262 notifyListeners(externalEvent);
263 }
264 }
265
266 private boolean removeInternal(K key, Timestamp timestamp) {
267 synchronized (this) {
268 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
269 return false;
270 }
271
272 items.remove(key);
273 removedItems.put(key, timestamp);
274 return true;
275 }
276 }
277
278 @Override
279 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800280 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800281
282 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
283
284 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
285 K key = entry.getKey();
286 V value = entry.getValue();
287 Timestamp timestamp = clockService.getTimestamp(entry.getKey());
288
289 if (putInternal(key, value, timestamp)) {
290 updates.add(new PutEntry<>(key, value, timestamp));
291 }
292 }
293
Jonathan Hart584d2f32015-01-27 19:46:14 -0800294 if (!updates.isEmpty()) {
295 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800296
Jonathan Hart584d2f32015-01-27 19:46:14 -0800297 for (PutEntry<K, V> entry : updates) {
298 EventuallyConsistentMapEvent<K, V> externalEvent = new EventuallyConsistentMapEvent<>(
299 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
300 entry.value());
301 notifyListeners(externalEvent);
302 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800303 }
304 }
305
306 @Override
307 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800308 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309
310 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
311
312 for (K key : items.keySet()) {
313 Timestamp timestamp = clockService.getTimestamp(key);
314
315 if (removeInternal(key, timestamp)) {
316 removed.add(new RemoveEntry<>(key, timestamp));
317 }
318 }
319
Jonathan Hart584d2f32015-01-27 19:46:14 -0800320 if (!removed.isEmpty()) {
321 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800322
Jonathan Hart584d2f32015-01-27 19:46:14 -0800323 for (RemoveEntry<K> entry : removed) {
324 EventuallyConsistentMapEvent<K, V> externalEvent
325 = new EventuallyConsistentMapEvent<>(
326 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
327 null);
328 notifyListeners(externalEvent);
329 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800330 }
331 }
332
333 @Override
334 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800335 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800336
337 return items.keySet();
338 }
339
340 @Override
341 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800342 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800343
344 return items.values().stream()
345 .map(Timestamped::value)
346 .collect(Collectors.toList());
347 }
348
349 @Override
350 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800351 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800352
353 return items.entrySet().stream()
354 .map(e -> new Entry(e.getKey(), e.getValue().value()))
355 .collect(Collectors.toSet());
356 }
357
358 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800359 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
360 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800361
362 listeners.add(checkNotNull(listener));
363 }
364
365 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800366 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
367 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800368
369 listeners.remove(checkNotNull(listener));
370 }
371
372 @Override
373 public void destroy() {
374 destroyed = true;
375
376 executor.shutdown();
377 backgroundExecutor.shutdown();
378
Jonathan Hart584d2f32015-01-27 19:46:14 -0800379 listeners.clear();
380
Jonathan Hartdb3af892015-01-26 13:19:07 -0800381 clusterCommunicator.removeSubscriber(updateMessageSubject);
382 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800383 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800384 }
385
Jonathan Hartaaa56572015-01-28 21:56:35 -0800386 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
387 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800388 listener.event(event);
389 }
390 }
391
392 private void notifyPeers(InternalPutEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800393 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800394 }
395
396 private void notifyPeers(InternalRemoveEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800397 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800398 }
399
Jonathan Hart7d656f42015-01-27 14:07:23 -0800400 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800401 ClusterMessage message = new ClusterMessage(
402 clusterService.getLocalNode().id(),
403 subject,
404 serializer.encode(event));
405 clusterCommunicator.broadcast(message);
406 }
407
408 private void unicastMessage(NodeId peer,
409 MessageSubject subject,
410 Object event) throws IOException {
411 ClusterMessage message = new ClusterMessage(
412 clusterService.getLocalNode().id(),
413 subject,
414 serializer.encode(event));
415 clusterCommunicator.unicast(message, peer);
416 }
417
418 private final class Entry implements Map.Entry<K, V> {
419
420 private final K key;
421 private final V value;
422
423 public Entry(K key, V value) {
424 this.key = key;
425 this.value = value;
426 }
427
428 @Override
429 public K getKey() {
430 return key;
431 }
432
433 @Override
434 public V getValue() {
435 return value;
436 }
437
438 @Override
439 public V setValue(V value) {
440 throw new UnsupportedOperationException();
441 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800442
443 @Override
444 public boolean equals(Object o) {
445 if (!(o instanceof Map.Entry)) {
446 return false;
447 }
448
449 Map.Entry that = (Map.Entry) o;
450
451 return Objects.equals(this.key, that.getKey()) &&
452 Objects.equals(this.value, that.getValue());
453 }
454
455 @Override
456 public int hashCode() {
457 return Objects.hash(key, value);
458 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800459 }
460
Jonathan Hartaaa56572015-01-28 21:56:35 -0800461 private final class SendAdvertisementTask implements Runnable {
462 @Override
463 public void run() {
464 if (Thread.currentThread().isInterrupted()) {
465 log.info("Interrupted, quitting");
466 return;
467 }
468
469 try {
470 final NodeId self = clusterService.getLocalNode().id();
471 Set<ControllerNode> nodes = clusterService.getNodes();
472
473 List<NodeId> nodeIds = nodes.stream()
474 .map(node -> node.id())
475 .collect(Collectors.toList());
476
477 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
478 log.trace("No other peers in the cluster.");
479 return;
480 }
481
482 NodeId peer;
483 do {
484 int idx = RandomUtils.nextInt(0, nodeIds.size());
485 peer = nodeIds.get(idx);
486 } while (peer.equals(self));
487
488 if (Thread.currentThread().isInterrupted()) {
489 log.info("Interrupted, quitting");
490 return;
491 }
492
493 AntiEntropyAdvertisement<K> ad = createAdvertisement();
494
495 try {
496 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
497 } catch (IOException e) {
498 log.debug("Failed to send anti-entropy advertisement to {}", peer);
499 }
500 } catch (Exception e) {
501 // Catch all exceptions to avoid scheduled task being suppressed.
502 log.error("Exception thrown while sending advertisement", e);
503 }
504 }
505 }
506
507 private AntiEntropyAdvertisement<K> createAdvertisement() {
508 final NodeId self = clusterService.getLocalNode().id();
509
510 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
511
512 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
513
514 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
515
516 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
517 }
518
519 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
520 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
521
522 synchronized (this) {
523 final NodeId sender = ad.sender();
524
525 externalEvents = antiEntropyCheckLocalItems(ad);
526
527 antiEntropyCheckLocalRemoved(ad);
528
529 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
530
531 // if remote ad has something unknown, actively sync
532 for (K key : ad.timestamps().keySet()) {
533 if (!items.containsKey(key)) {
534 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
535 try {
536 unicastMessage(sender, antiEntropyAdvertisementSubject,
537 myAd);
538 break;
539 } catch (IOException e) {
540 log.debug(
541 "Failed to send reactive anti-entropy advertisement to {}",
542 sender);
543 }
544 }
545 }
546 } // synchronized (this)
547
548 externalEvents.forEach(this::notifyListeners);
549 }
550
551 /**
552 * Checks if any of the remote's live items or tombstones are out of date
553 * according to our local live item list, or if our live items are out of
554 * date according to the remote's tombstone list.
555 * If the local copy is more recent, it will be pushed to the remote. If the
556 * remote has a more recent remove, we apply that to the local state.
557 *
558 * @param ad remote anti-entropy advertisement
559 * @return list of external events relating to local operations performed
560 */
561 // Guarded by synchronized (this)
562 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
563 AntiEntropyAdvertisement<K> ad) {
564 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
565 = new LinkedList<>();
566 final NodeId sender = ad.sender();
567
568 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
569
570 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
571 K key = item.getKey();
572 Timestamped<V> localValue = item.getValue();
573
574 Timestamp remoteTimestamp = ad.timestamps().get(key);
575 if (remoteTimestamp == null) {
576 remoteTimestamp = ad.tombstones().get(key);
577 }
578 if (remoteTimestamp == null || localValue
579 .isNewer(remoteTimestamp)) {
580 // local value is more recent, push to sender
581 updatesToSend
582 .add(new PutEntry<>(key, localValue.value(),
583 localValue.timestamp()));
584 }
585
586 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
587 if (remoteDeadTimestamp != null &&
588 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
589 // sender has a more recent remove
590 if (removeInternal(key, remoteDeadTimestamp)) {
591 externalEvents.add(new EventuallyConsistentMapEvent<>(
592 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
593 }
594 }
595 }
596
597 // Send all updates to the peer at once
598 if (!updatesToSend.isEmpty()) {
599 try {
600 unicastMessage(sender, updateMessageSubject, new InternalPutEvent<>(updatesToSend));
601 } catch (IOException e) {
602 log.warn("Failed to send advertisement response", e);
603 }
604 }
605
606 return externalEvents;
607 }
608
609 /**
610 * Checks if any items in the remote live list are out of date according
611 * to our tombstone list. If we find we have a more up to date tombstone,
612 * we'll send it to the remote.
613 *
614 * @param ad remote anti-entropy advertisement
615 */
616 // Guarded by synchronized (this)
617 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
618 final NodeId sender = ad.sender();
619
620 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
621
622 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
623 K key = dead.getKey();
624 Timestamp localDeadTimestamp = dead.getValue();
625
626 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
627 if (remoteLiveTimestamp != null
628 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
629 // sender has zombie, push remove
630 removesToSend
631 .add(new RemoveEntry<>(key, localDeadTimestamp));
632 }
633 }
634
635 // Send all removes to the peer at once
636 if (!removesToSend.isEmpty()) {
637 try {
638 unicastMessage(sender, removeMessageSubject, new InternalRemoveEvent<>(removesToSend));
639 } catch (IOException e) {
640 log.warn("Failed to send advertisement response", e);
641 }
642 }
643 }
644
645 /**
646 * Checks if any of the local live items are out of date according to the
647 * remote's tombstone advertisements. If we find a local item is out of date,
648 * we'll apply the remove operation to the local state.
649 *
650 * @param ad remote anti-entropy advertisement
651 * @return list of external events relating to local operations performed
652 */
653 // Guarded by synchronized (this)
654 private List<EventuallyConsistentMapEvent<K, V>>
655 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
656 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
657 = new LinkedList<>();
658
659 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
660 K key = remoteDead.getKey();
661 Timestamp remoteDeadTimestamp = remoteDead.getValue();
662
663 Timestamped<V> local = items.get(key);
664 Timestamp localDead = removedItems.get(key);
665 if (local != null
666 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
667 // remove our version
668 if (removeInternal(key, remoteDeadTimestamp)) {
669 externalEvents.add(new EventuallyConsistentMapEvent<>(
670 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
671 }
672 } else if (localDead != null &&
673 remoteDeadTimestamp.compareTo(localDead) > 0) {
674 // If we both had the item as removed, but their timestamp is
675 // newer, update ours to the newer value
676 removedItems.put(key, remoteDeadTimestamp);
677 }
678 }
679
680 return externalEvents;
681 }
682
683 private final class InternalAntiEntropyListener
684 implements ClusterMessageHandler {
685
686 @Override
687 public void handle(ClusterMessage message) {
688 log.trace("Received anti-entropy advertisement from peer: {}", message.sender());
689 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
690 backgroundExecutor.submit(() -> {
691 try {
692 handleAntiEntropyAdvertisement(advertisement);
693 } catch (Exception e) {
694 log.warn("Exception thrown handling advertisements", e);
695 }
696 });
697 }
698 }
699
Jonathan Hartdb3af892015-01-26 13:19:07 -0800700 private final class InternalPutEventListener implements
701 ClusterMessageHandler {
702 @Override
703 public void handle(ClusterMessage message) {
704 log.debug("Received put event from peer: {}", message.sender());
705 InternalPutEvent<K, V> event = serializer.decode(message.payload());
706
707 executor.submit(() -> {
708 try {
709 for (PutEntry<K, V> entry : event.entries()) {
710 K key = entry.key();
711 V value = entry.value();
712 Timestamp timestamp = entry.timestamp();
713
714 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800715 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800716 new EventuallyConsistentMapEvent<>(
717 EventuallyConsistentMapEvent.Type.PUT, key,
718 value);
719 notifyListeners(externalEvent);
720 }
721 }
722 } catch (Exception e) {
723 log.warn("Exception thrown handling put", e);
724 }
725 });
726 }
727 }
728
729 private final class InternalRemoveEventListener implements
730 ClusterMessageHandler {
731 @Override
732 public void handle(ClusterMessage message) {
733 log.debug("Received remove event from peer: {}", message.sender());
734 InternalRemoveEvent<K> event = serializer.decode(message.payload());
735
736 executor.submit(() -> {
737 try {
738 for (RemoveEntry<K> entry : event.entries()) {
739 K key = entry.key();
740 Timestamp timestamp = entry.timestamp();
741
742 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800743 EventuallyConsistentMapEvent<K, V> externalEvent
744 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800745 EventuallyConsistentMapEvent.Type.REMOVE,
746 key, null);
747 notifyListeners(externalEvent);
748 }
749 }
750 } catch (Exception e) {
751 log.warn("Exception thrown handling remove", e);
752 }
753 });
754 }
755 }
756
Jonathan Hart584d2f32015-01-27 19:46:14 -0800757 static final class InternalPutEvent<K, V> {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800758 private final List<PutEntry<K, V>> entries;
759
760 public InternalPutEvent(K key, V value, Timestamp timestamp) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800761 entries = ImmutableList.of(new PutEntry<>(key, value, timestamp));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800762 }
763
764 public InternalPutEvent(List<PutEntry<K, V>> entries) {
765 this.entries = checkNotNull(entries);
766 }
767
768 // Needed for serialization.
769 @SuppressWarnings("unused")
770 private InternalPutEvent() {
771 entries = null;
772 }
773
774 public List<PutEntry<K, V>> entries() {
775 return entries;
776 }
777 }
778
Jonathan Hart584d2f32015-01-27 19:46:14 -0800779 static final class PutEntry<K, V> {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800780 private final K key;
781 private final V value;
782 private final Timestamp timestamp;
783
784 public PutEntry(K key, V value, Timestamp timestamp) {
785 this.key = checkNotNull(key);
786 this.value = checkNotNull(value);
787 this.timestamp = checkNotNull(timestamp);
788 }
789
790 // Needed for serialization.
791 @SuppressWarnings("unused")
792 private PutEntry() {
793 this.key = null;
794 this.value = null;
795 this.timestamp = null;
796 }
797
798 public K key() {
799 return key;
800 }
801
802 public V value() {
803 return value;
804 }
805
806 public Timestamp timestamp() {
807 return timestamp;
808 }
809
810 public String toString() {
811 return MoreObjects.toStringHelper(getClass())
812 .add("key", key)
813 .add("value", value)
814 .add("timestamp", timestamp)
815 .toString();
816 }
817 }
818
Jonathan Hart584d2f32015-01-27 19:46:14 -0800819 static final class InternalRemoveEvent<K> {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800820 private final List<RemoveEntry<K>> entries;
821
822 public InternalRemoveEvent(K key, Timestamp timestamp) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800823 entries = ImmutableList.of(new RemoveEntry<>(key, timestamp));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800824 }
825
826 public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
827 this.entries = checkNotNull(entries);
828 }
829
830 // Needed for serialization.
831 @SuppressWarnings("unused")
832 private InternalRemoveEvent() {
833 entries = null;
834 }
835
836 public List<RemoveEntry<K>> entries() {
837 return entries;
838 }
839 }
840
Jonathan Hart584d2f32015-01-27 19:46:14 -0800841 static final class RemoveEntry<K> {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800842 private final K key;
843 private final Timestamp timestamp;
844
845 public RemoveEntry(K key, Timestamp timestamp) {
846 this.key = checkNotNull(key);
847 this.timestamp = checkNotNull(timestamp);
848 }
849
850 // Needed for serialization.
851 @SuppressWarnings("unused")
852 private RemoveEntry() {
853 this.key = null;
854 this.timestamp = null;
855 }
856
857 public K key() {
858 return key;
859 }
860
861 public Timestamp timestamp() {
862 return timestamp;
863 }
864 }
865}