blob: 00b2004fc56420e825a03b785775fae21b93b7cd [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
18import com.google.common.base.MoreObjects;
Jonathan Hartaaa56572015-01-28 21:56:35 -080019import org.apache.commons.lang3.RandomUtils;
Jonathan Hartdb3af892015-01-26 13:19:07 -080020import org.onlab.util.KryoNamespace;
21import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.NodeId;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
29import org.onosproject.store.serializers.KryoSerializer;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import java.io.IOException;
34import java.util.ArrayList;
35import java.util.Collection;
36import java.util.Collections;
Jonathan Hartaaa56572015-01-28 21:56:35 -080037import java.util.HashMap;
38import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080039import java.util.List;
40import java.util.Map;
41import java.util.Set;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.CopyOnWriteArraySet;
44import java.util.concurrent.ExecutorService;
45import java.util.concurrent.Executors;
46import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080047import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080048import java.util.stream.Collectors;
49
50import static com.google.common.base.Preconditions.checkNotNull;
51import static com.google.common.base.Preconditions.checkState;
52import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
53import static org.onlab.util.Tools.minPriority;
54import static org.onlab.util.Tools.namedThreads;
55
56/**
57 * Distributed Map implementation which uses optimistic replication and gossip
58 * based techniques to provide an eventually consistent data store.
59 */
60public class EventuallyConsistentMapImpl<K, V>
61 implements EventuallyConsistentMap<K, V> {
62
63 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
64
65 private final Map<K, Timestamped<V>> items;
66 private final Map<K, Timestamp> removedItems;
67
68 private final String mapName;
69 private final ClusterService clusterService;
70 private final ClusterCommunicationService clusterCommunicator;
71 private final KryoSerializer serializer;
72
73 private final ClockService<K> clockService;
74
75 private final MessageSubject updateMessageSubject;
76 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080077 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080078
Jonathan Hartaaa56572015-01-28 21:56:35 -080079 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080080 = new CopyOnWriteArraySet<>();
81
82 private final ExecutorService executor;
83
84 private final ScheduledExecutorService backgroundExecutor;
85
86 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080087 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080088
89 // TODO: Make these anti-entropy params configurable
90 private long initialDelaySec = 5;
91 private long periodSec = 5;
92
93 /**
94 * Creates a new eventually consistent map shared amongst multiple instances.
95 *
96 * Each map is identified by a string map name. EventuallyConsistentMapImpl
97 * objects in different JVMs that use the same map name will form a
98 * distributed map across JVMs (provided the cluster service is aware of
99 * both nodes).
100 *
101 * The client is expected to provide an
102 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
103 * will be stored in this map have been registered (including referenced
104 * classes). This serializer will be used to serialize both K and V for
105 * inter-node notifications.
106 *
107 * The client must provide an {@link org.onosproject.store.impl.ClockService}
108 * which can generate timestamps for a given key. The clock service is free
109 * to generate timestamps however it wishes, however these timestamps will
110 * be used to serialize updates to the map so they must be strict enough
111 * to ensure updates are properly ordered for the use case (i.e. in some
112 * cases wallclock time will suffice, whereas in other cases logical time
113 * will be necessary).
114 *
115 * @param mapName a String identifier for the map.
116 * @param clusterService the cluster service
117 * @param clusterCommunicator the cluster communications service
118 * @param serializerBuilder a Kryo namespace builder that can serialize
119 * both K and V
120 * @param clockService a clock service able to generate timestamps
121 * for K
122 */
123 public EventuallyConsistentMapImpl(String mapName,
124 ClusterService clusterService,
125 ClusterCommunicationService clusterCommunicator,
126 KryoNamespace.Builder serializerBuilder,
127 ClockService<K> clockService) {
128
129 this.mapName = checkNotNull(mapName);
130 this.clusterService = checkNotNull(clusterService);
131 this.clusterCommunicator = checkNotNull(clusterCommunicator);
132
133 serializer = createSerializer(checkNotNull(serializerBuilder));
134
135 this.clockService = checkNotNull(clockService);
136
137 items = new ConcurrentHashMap<>();
138 removedItems = new ConcurrentHashMap<>();
139
140 executor = Executors
141 .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
142
143 backgroundExecutor =
144 newSingleThreadScheduledExecutor(minPriority(
145 namedThreads("onos-ecm-" + mapName + "-bg-%d")));
146
Jonathan Hartaaa56572015-01-28 21:56:35 -0800147 // start anti-entropy thread
148 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
149 initialDelaySec, periodSec,
150 TimeUnit.SECONDS);
151
Jonathan Hartdb3af892015-01-26 13:19:07 -0800152 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
153 clusterCommunicator.addSubscriber(updateMessageSubject,
154 new InternalPutEventListener());
155 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
156 clusterCommunicator.addSubscriber(removeMessageSubject,
157 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800158 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
159 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
160 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800161 }
162
163 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
164 return new KryoSerializer() {
165 @Override
166 protected void setupKryoPool() {
167 // Add the map's internal helper classes to the user-supplied serializer
168 serializerPool = builder
169 .register(WallClockTimestamp.class)
170 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800171 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800172 .register(ArrayList.class)
173 .register(InternalPutEvent.class)
174 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800175 .register(AntiEntropyAdvertisement.class)
176 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800177 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800178 }
179 };
180 }
181
182 @Override
183 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800184 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800185 return items.size();
186 }
187
188 @Override
189 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800190 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800191 return items.isEmpty();
192 }
193
194 @Override
195 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800196 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800197 return items.containsKey(key);
198 }
199
200 @Override
201 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800202 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800203
204 return items.values().stream()
205 .anyMatch(timestamped -> timestamped.value().equals(value));
206 }
207
208 @Override
209 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800210 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800211
212 Timestamped<V> value = items.get(key);
213 if (value != null) {
214 return value.value();
215 }
216 return null;
217 }
218
219 @Override
220 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800221 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800222
223 Timestamp timestamp = clockService.getTimestamp(key);
224 if (putInternal(key, value, timestamp)) {
225 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
226 EventuallyConsistentMapEvent<K, V> externalEvent
227 = new EventuallyConsistentMapEvent<>(
228 EventuallyConsistentMapEvent.Type.PUT, key, value);
229 notifyListeners(externalEvent);
230 }
231 }
232
233 private boolean putInternal(K key, V value, Timestamp timestamp) {
234 synchronized (this) {
235 Timestamp removed = removedItems.get(key);
236 if (removed != null && removed.compareTo(timestamp) > 0) {
237 return false;
238 }
239
240 Timestamped<V> existing = items.get(key);
241 if (existing != null && existing.isNewer(timestamp)) {
242 return false;
243 } else {
244 items.put(key, new Timestamped<>(value, timestamp));
245 removedItems.remove(key);
246 return true;
247 }
248 }
249 }
250
251 @Override
252 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800253 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800254
255 Timestamp timestamp = clockService.getTimestamp(key);
256 if (removeInternal(key, timestamp)) {
257 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
258 EventuallyConsistentMapEvent<K, V> externalEvent
259 = new EventuallyConsistentMapEvent<>(
260 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
261 notifyListeners(externalEvent);
262 }
263 }
264
265 private boolean removeInternal(K key, Timestamp timestamp) {
266 synchronized (this) {
267 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
268 return false;
269 }
270
271 items.remove(key);
272 removedItems.put(key, timestamp);
273 return true;
274 }
275 }
276
277 @Override
278 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800279 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800280
281 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
282
283 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
284 K key = entry.getKey();
285 V value = entry.getValue();
286 Timestamp timestamp = clockService.getTimestamp(entry.getKey());
287
288 if (putInternal(key, value, timestamp)) {
289 updates.add(new PutEntry<>(key, value, timestamp));
290 }
291 }
292
293 notifyPeers(new InternalPutEvent<>(updates));
294
295 for (PutEntry<K, V> entry : updates) {
296 EventuallyConsistentMapEvent<K, V> externalEvent =
297 new EventuallyConsistentMapEvent<>(
298 EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
299 notifyListeners(externalEvent);
300 }
301 }
302
303 @Override
304 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800305 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800306
307 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
308
309 for (K key : items.keySet()) {
310 Timestamp timestamp = clockService.getTimestamp(key);
311
312 if (removeInternal(key, timestamp)) {
313 removed.add(new RemoveEntry<>(key, timestamp));
314 }
315 }
316
317 notifyPeers(new InternalRemoveEvent<>(removed));
318
319 for (RemoveEntry<K> entry : removed) {
320 EventuallyConsistentMapEvent<K, V> externalEvent =
321 new EventuallyConsistentMapEvent<>(
322 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
323 notifyListeners(externalEvent);
324 }
325 }
326
327 @Override
328 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800329 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800330
331 return items.keySet();
332 }
333
334 @Override
335 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800336 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800337
338 return items.values().stream()
339 .map(Timestamped::value)
340 .collect(Collectors.toList());
341 }
342
343 @Override
344 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800345 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800346
347 return items.entrySet().stream()
348 .map(e -> new Entry(e.getKey(), e.getValue().value()))
349 .collect(Collectors.toSet());
350 }
351
352 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800353 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
354 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800355
356 listeners.add(checkNotNull(listener));
357 }
358
359 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800360 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
361 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800362
363 listeners.remove(checkNotNull(listener));
364 }
365
366 @Override
367 public void destroy() {
368 destroyed = true;
369
370 executor.shutdown();
371 backgroundExecutor.shutdown();
372
373 clusterCommunicator.removeSubscriber(updateMessageSubject);
374 clusterCommunicator.removeSubscriber(removeMessageSubject);
375 }
376
Jonathan Hartaaa56572015-01-28 21:56:35 -0800377 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
378 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800379 listener.event(event);
380 }
381 }
382
383 private void notifyPeers(InternalPutEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800384 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800385 }
386
387 private void notifyPeers(InternalRemoveEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800388 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800389 }
390
Jonathan Hart7d656f42015-01-27 14:07:23 -0800391 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800392 ClusterMessage message = new ClusterMessage(
393 clusterService.getLocalNode().id(),
394 subject,
395 serializer.encode(event));
396 clusterCommunicator.broadcast(message);
397 }
398
399 private void unicastMessage(NodeId peer,
400 MessageSubject subject,
401 Object event) throws IOException {
402 ClusterMessage message = new ClusterMessage(
403 clusterService.getLocalNode().id(),
404 subject,
405 serializer.encode(event));
406 clusterCommunicator.unicast(message, peer);
407 }
408
409 private final class Entry implements Map.Entry<K, V> {
410
411 private final K key;
412 private final V value;
413
414 public Entry(K key, V value) {
415 this.key = key;
416 this.value = value;
417 }
418
419 @Override
420 public K getKey() {
421 return key;
422 }
423
424 @Override
425 public V getValue() {
426 return value;
427 }
428
429 @Override
430 public V setValue(V value) {
431 throw new UnsupportedOperationException();
432 }
433 }
434
Jonathan Hartaaa56572015-01-28 21:56:35 -0800435 private final class SendAdvertisementTask implements Runnable {
436 @Override
437 public void run() {
438 if (Thread.currentThread().isInterrupted()) {
439 log.info("Interrupted, quitting");
440 return;
441 }
442
443 try {
444 final NodeId self = clusterService.getLocalNode().id();
445 Set<ControllerNode> nodes = clusterService.getNodes();
446
447 List<NodeId> nodeIds = nodes.stream()
448 .map(node -> node.id())
449 .collect(Collectors.toList());
450
451 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
452 log.trace("No other peers in the cluster.");
453 return;
454 }
455
456 NodeId peer;
457 do {
458 int idx = RandomUtils.nextInt(0, nodeIds.size());
459 peer = nodeIds.get(idx);
460 } while (peer.equals(self));
461
462 if (Thread.currentThread().isInterrupted()) {
463 log.info("Interrupted, quitting");
464 return;
465 }
466
467 AntiEntropyAdvertisement<K> ad = createAdvertisement();
468
469 try {
470 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
471 } catch (IOException e) {
472 log.debug("Failed to send anti-entropy advertisement to {}", peer);
473 }
474 } catch (Exception e) {
475 // Catch all exceptions to avoid scheduled task being suppressed.
476 log.error("Exception thrown while sending advertisement", e);
477 }
478 }
479 }
480
481 private AntiEntropyAdvertisement<K> createAdvertisement() {
482 final NodeId self = clusterService.getLocalNode().id();
483
484 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
485
486 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
487
488 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
489
490 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
491 }
492
493 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
494 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
495
496 synchronized (this) {
497 final NodeId sender = ad.sender();
498
499 externalEvents = antiEntropyCheckLocalItems(ad);
500
501 antiEntropyCheckLocalRemoved(ad);
502
503 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
504
505 // if remote ad has something unknown, actively sync
506 for (K key : ad.timestamps().keySet()) {
507 if (!items.containsKey(key)) {
508 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
509 try {
510 unicastMessage(sender, antiEntropyAdvertisementSubject,
511 myAd);
512 break;
513 } catch (IOException e) {
514 log.debug(
515 "Failed to send reactive anti-entropy advertisement to {}",
516 sender);
517 }
518 }
519 }
520 } // synchronized (this)
521
522 externalEvents.forEach(this::notifyListeners);
523 }
524
525 /**
526 * Checks if any of the remote's live items or tombstones are out of date
527 * according to our local live item list, or if our live items are out of
528 * date according to the remote's tombstone list.
529 * If the local copy is more recent, it will be pushed to the remote. If the
530 * remote has a more recent remove, we apply that to the local state.
531 *
532 * @param ad remote anti-entropy advertisement
533 * @return list of external events relating to local operations performed
534 */
535 // Guarded by synchronized (this)
536 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
537 AntiEntropyAdvertisement<K> ad) {
538 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
539 = new LinkedList<>();
540 final NodeId sender = ad.sender();
541
542 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
543
544 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
545 K key = item.getKey();
546 Timestamped<V> localValue = item.getValue();
547
548 Timestamp remoteTimestamp = ad.timestamps().get(key);
549 if (remoteTimestamp == null) {
550 remoteTimestamp = ad.tombstones().get(key);
551 }
552 if (remoteTimestamp == null || localValue
553 .isNewer(remoteTimestamp)) {
554 // local value is more recent, push to sender
555 updatesToSend
556 .add(new PutEntry<>(key, localValue.value(),
557 localValue.timestamp()));
558 }
559
560 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
561 if (remoteDeadTimestamp != null &&
562 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
563 // sender has a more recent remove
564 if (removeInternal(key, remoteDeadTimestamp)) {
565 externalEvents.add(new EventuallyConsistentMapEvent<>(
566 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
567 }
568 }
569 }
570
571 // Send all updates to the peer at once
572 if (!updatesToSend.isEmpty()) {
573 try {
574 unicastMessage(sender, updateMessageSubject, new InternalPutEvent<>(updatesToSend));
575 } catch (IOException e) {
576 log.warn("Failed to send advertisement response", e);
577 }
578 }
579
580 return externalEvents;
581 }
582
583 /**
584 * Checks if any items in the remote live list are out of date according
585 * to our tombstone list. If we find we have a more up to date tombstone,
586 * we'll send it to the remote.
587 *
588 * @param ad remote anti-entropy advertisement
589 */
590 // Guarded by synchronized (this)
591 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
592 final NodeId sender = ad.sender();
593
594 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
595
596 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
597 K key = dead.getKey();
598 Timestamp localDeadTimestamp = dead.getValue();
599
600 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
601 if (remoteLiveTimestamp != null
602 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
603 // sender has zombie, push remove
604 removesToSend
605 .add(new RemoveEntry<>(key, localDeadTimestamp));
606 }
607 }
608
609 // Send all removes to the peer at once
610 if (!removesToSend.isEmpty()) {
611 try {
612 unicastMessage(sender, removeMessageSubject, new InternalRemoveEvent<>(removesToSend));
613 } catch (IOException e) {
614 log.warn("Failed to send advertisement response", e);
615 }
616 }
617 }
618
619 /**
620 * Checks if any of the local live items are out of date according to the
621 * remote's tombstone advertisements. If we find a local item is out of date,
622 * we'll apply the remove operation to the local state.
623 *
624 * @param ad remote anti-entropy advertisement
625 * @return list of external events relating to local operations performed
626 */
627 // Guarded by synchronized (this)
628 private List<EventuallyConsistentMapEvent<K, V>>
629 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
630 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
631 = new LinkedList<>();
632
633 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
634 K key = remoteDead.getKey();
635 Timestamp remoteDeadTimestamp = remoteDead.getValue();
636
637 Timestamped<V> local = items.get(key);
638 Timestamp localDead = removedItems.get(key);
639 if (local != null
640 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
641 // remove our version
642 if (removeInternal(key, remoteDeadTimestamp)) {
643 externalEvents.add(new EventuallyConsistentMapEvent<>(
644 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
645 }
646 } else if (localDead != null &&
647 remoteDeadTimestamp.compareTo(localDead) > 0) {
648 // If we both had the item as removed, but their timestamp is
649 // newer, update ours to the newer value
650 removedItems.put(key, remoteDeadTimestamp);
651 }
652 }
653
654 return externalEvents;
655 }
656
657 private final class InternalAntiEntropyListener
658 implements ClusterMessageHandler {
659
660 @Override
661 public void handle(ClusterMessage message) {
662 log.trace("Received anti-entropy advertisement from peer: {}", message.sender());
663 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
664 backgroundExecutor.submit(() -> {
665 try {
666 handleAntiEntropyAdvertisement(advertisement);
667 } catch (Exception e) {
668 log.warn("Exception thrown handling advertisements", e);
669 }
670 });
671 }
672 }
673
Jonathan Hartdb3af892015-01-26 13:19:07 -0800674 private final class InternalPutEventListener implements
675 ClusterMessageHandler {
676 @Override
677 public void handle(ClusterMessage message) {
678 log.debug("Received put event from peer: {}", message.sender());
679 InternalPutEvent<K, V> event = serializer.decode(message.payload());
680
681 executor.submit(() -> {
682 try {
683 for (PutEntry<K, V> entry : event.entries()) {
684 K key = entry.key();
685 V value = entry.value();
686 Timestamp timestamp = entry.timestamp();
687
688 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800689 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800690 new EventuallyConsistentMapEvent<>(
691 EventuallyConsistentMapEvent.Type.PUT, key,
692 value);
693 notifyListeners(externalEvent);
694 }
695 }
696 } catch (Exception e) {
697 log.warn("Exception thrown handling put", e);
698 }
699 });
700 }
701 }
702
703 private final class InternalRemoveEventListener implements
704 ClusterMessageHandler {
705 @Override
706 public void handle(ClusterMessage message) {
707 log.debug("Received remove event from peer: {}", message.sender());
708 InternalRemoveEvent<K> event = serializer.decode(message.payload());
709
710 executor.submit(() -> {
711 try {
712 for (RemoveEntry<K> entry : event.entries()) {
713 K key = entry.key();
714 Timestamp timestamp = entry.timestamp();
715
716 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800717 EventuallyConsistentMapEvent<K, V> externalEvent
718 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800719 EventuallyConsistentMapEvent.Type.REMOVE,
720 key, null);
721 notifyListeners(externalEvent);
722 }
723 }
724 } catch (Exception e) {
725 log.warn("Exception thrown handling remove", e);
726 }
727 });
728 }
729 }
730
731 private static final class InternalPutEvent<K, V> {
732 private final List<PutEntry<K, V>> entries;
733
734 public InternalPutEvent(K key, V value, Timestamp timestamp) {
735 entries = Collections
736 .singletonList(new PutEntry<>(key, value, timestamp));
737 }
738
739 public InternalPutEvent(List<PutEntry<K, V>> entries) {
740 this.entries = checkNotNull(entries);
741 }
742
743 // Needed for serialization.
744 @SuppressWarnings("unused")
745 private InternalPutEvent() {
746 entries = null;
747 }
748
749 public List<PutEntry<K, V>> entries() {
750 return entries;
751 }
752 }
753
754 private static final class PutEntry<K, V> {
755 private final K key;
756 private final V value;
757 private final Timestamp timestamp;
758
759 public PutEntry(K key, V value, Timestamp timestamp) {
760 this.key = checkNotNull(key);
761 this.value = checkNotNull(value);
762 this.timestamp = checkNotNull(timestamp);
763 }
764
765 // Needed for serialization.
766 @SuppressWarnings("unused")
767 private PutEntry() {
768 this.key = null;
769 this.value = null;
770 this.timestamp = null;
771 }
772
773 public K key() {
774 return key;
775 }
776
777 public V value() {
778 return value;
779 }
780
781 public Timestamp timestamp() {
782 return timestamp;
783 }
784
785 public String toString() {
786 return MoreObjects.toStringHelper(getClass())
787 .add("key", key)
788 .add("value", value)
789 .add("timestamp", timestamp)
790 .toString();
791 }
792 }
793
794 private static final class InternalRemoveEvent<K> {
795 private final List<RemoveEntry<K>> entries;
796
797 public InternalRemoveEvent(K key, Timestamp timestamp) {
798 entries = Collections.singletonList(
799 new RemoveEntry<>(key, timestamp));
800 }
801
802 public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
803 this.entries = checkNotNull(entries);
804 }
805
806 // Needed for serialization.
807 @SuppressWarnings("unused")
808 private InternalRemoveEvent() {
809 entries = null;
810 }
811
812 public List<RemoveEntry<K>> entries() {
813 return entries;
814 }
815 }
816
817 private static final class RemoveEntry<K> {
818 private final K key;
819 private final Timestamp timestamp;
820
821 public RemoveEntry(K key, Timestamp timestamp) {
822 this.key = checkNotNull(key);
823 this.timestamp = checkNotNull(timestamp);
824 }
825
826 // Needed for serialization.
827 @SuppressWarnings("unused")
828 private RemoveEntry() {
829 this.key = null;
830 this.timestamp = null;
831 }
832
833 public K key() {
834 return key;
835 }
836
837 public Timestamp timestamp() {
838 return timestamp;
839 }
840 }
841}