blob: 2d267c33059da71987aff1d5eb9cb24441415967 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
Jonathan Hartf9108232015-02-02 16:37:35 -080019import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080020import org.onlab.util.KryoNamespace;
21import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.NodeId;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
29import org.onosproject.store.serializers.KryoSerializer;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import java.io.IOException;
34import java.util.ArrayList;
35import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080036import java.util.HashMap;
37import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import java.util.List;
39import java.util.Map;
40import java.util.Set;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.CopyOnWriteArraySet;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.Executors;
45import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080046import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.stream.Collectors;
48
49import static com.google.common.base.Preconditions.checkNotNull;
50import static com.google.common.base.Preconditions.checkState;
51import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
52import static org.onlab.util.Tools.minPriority;
53import static org.onlab.util.Tools.namedThreads;
54
55/**
56 * Distributed Map implementation which uses optimistic replication and gossip
57 * based techniques to provide an eventually consistent data store.
58 */
59public class EventuallyConsistentMapImpl<K, V>
60 implements EventuallyConsistentMap<K, V> {
61
62 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
63
64 private final Map<K, Timestamped<V>> items;
65 private final Map<K, Timestamp> removedItems;
66
67 private final String mapName;
68 private final ClusterService clusterService;
69 private final ClusterCommunicationService clusterCommunicator;
70 private final KryoSerializer serializer;
71
72 private final ClockService<K> clockService;
73
74 private final MessageSubject updateMessageSubject;
75 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080076 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080077
Jonathan Hartaaa56572015-01-28 21:56:35 -080078 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 = new CopyOnWriteArraySet<>();
80
81 private final ExecutorService executor;
82
83 private final ScheduledExecutorService backgroundExecutor;
84
85 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080086 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
88 // TODO: Make these anti-entropy params configurable
89 private long initialDelaySec = 5;
90 private long periodSec = 5;
91
92 /**
93 * Creates a new eventually consistent map shared amongst multiple instances.
94 *
95 * Each map is identified by a string map name. EventuallyConsistentMapImpl
96 * objects in different JVMs that use the same map name will form a
97 * distributed map across JVMs (provided the cluster service is aware of
98 * both nodes).
99 *
100 * The client is expected to provide an
101 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
102 * will be stored in this map have been registered (including referenced
103 * classes). This serializer will be used to serialize both K and V for
104 * inter-node notifications.
105 *
106 * The client must provide an {@link org.onosproject.store.impl.ClockService}
107 * which can generate timestamps for a given key. The clock service is free
108 * to generate timestamps however it wishes, however these timestamps will
109 * be used to serialize updates to the map so they must be strict enough
110 * to ensure updates are properly ordered for the use case (i.e. in some
111 * cases wallclock time will suffice, whereas in other cases logical time
112 * will be necessary).
113 *
114 * @param mapName a String identifier for the map.
115 * @param clusterService the cluster service
116 * @param clusterCommunicator the cluster communications service
117 * @param serializerBuilder a Kryo namespace builder that can serialize
118 * both K and V
119 * @param clockService a clock service able to generate timestamps
120 * for K
121 */
122 public EventuallyConsistentMapImpl(String mapName,
123 ClusterService clusterService,
124 ClusterCommunicationService clusterCommunicator,
125 KryoNamespace.Builder serializerBuilder,
126 ClockService<K> clockService) {
127
128 this.mapName = checkNotNull(mapName);
129 this.clusterService = checkNotNull(clusterService);
130 this.clusterCommunicator = checkNotNull(clusterCommunicator);
131
132 serializer = createSerializer(checkNotNull(serializerBuilder));
133
134 this.clockService = checkNotNull(clockService);
135
136 items = new ConcurrentHashMap<>();
137 removedItems = new ConcurrentHashMap<>();
138
139 executor = Executors
140 .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
141
142 backgroundExecutor =
143 newSingleThreadScheduledExecutor(minPriority(
144 namedThreads("onos-ecm-" + mapName + "-bg-%d")));
145
Jonathan Hartaaa56572015-01-28 21:56:35 -0800146 // start anti-entropy thread
147 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
148 initialDelaySec, periodSec,
149 TimeUnit.SECONDS);
150
Jonathan Hartdb3af892015-01-26 13:19:07 -0800151 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
152 clusterCommunicator.addSubscriber(updateMessageSubject,
153 new InternalPutEventListener());
154 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
155 clusterCommunicator.addSubscriber(removeMessageSubject,
156 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800157 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
158 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
159 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800160 }
161
162 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
163 return new KryoSerializer() {
164 @Override
165 protected void setupKryoPool() {
166 // Add the map's internal helper classes to the user-supplied serializer
167 serializerPool = builder
168 .register(WallClockTimestamp.class)
169 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800170 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800171 .register(ArrayList.class)
172 .register(InternalPutEvent.class)
173 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800174 .register(AntiEntropyAdvertisement.class)
175 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800176 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800177 }
178 };
179 }
180
181 @Override
182 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800183 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800184 return items.size();
185 }
186
187 @Override
188 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800189 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800190 return items.isEmpty();
191 }
192
193 @Override
194 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800195 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800196 return items.containsKey(key);
197 }
198
199 @Override
200 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800201 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800202
203 return items.values().stream()
204 .anyMatch(timestamped -> timestamped.value().equals(value));
205 }
206
207 @Override
208 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800209 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800210
211 Timestamped<V> value = items.get(key);
212 if (value != null) {
213 return value.value();
214 }
215 return null;
216 }
217
218 @Override
219 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800220 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800221
222 Timestamp timestamp = clockService.getTimestamp(key);
223 if (putInternal(key, value, timestamp)) {
224 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
225 EventuallyConsistentMapEvent<K, V> externalEvent
226 = new EventuallyConsistentMapEvent<>(
227 EventuallyConsistentMapEvent.Type.PUT, key, value);
228 notifyListeners(externalEvent);
229 }
230 }
231
232 private boolean putInternal(K key, V value, Timestamp timestamp) {
233 synchronized (this) {
234 Timestamp removed = removedItems.get(key);
235 if (removed != null && removed.compareTo(timestamp) > 0) {
236 return false;
237 }
238
239 Timestamped<V> existing = items.get(key);
240 if (existing != null && existing.isNewer(timestamp)) {
241 return false;
242 } else {
243 items.put(key, new Timestamped<>(value, timestamp));
244 removedItems.remove(key);
245 return true;
246 }
247 }
248 }
249
250 @Override
251 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800252 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800253
254 Timestamp timestamp = clockService.getTimestamp(key);
255 if (removeInternal(key, timestamp)) {
256 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
257 EventuallyConsistentMapEvent<K, V> externalEvent
258 = new EventuallyConsistentMapEvent<>(
259 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
260 notifyListeners(externalEvent);
261 }
262 }
263
264 private boolean removeInternal(K key, Timestamp timestamp) {
265 synchronized (this) {
266 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
267 return false;
268 }
269
270 items.remove(key);
271 removedItems.put(key, timestamp);
272 return true;
273 }
274 }
275
276 @Override
277 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800278 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800279
280 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
281
282 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
283 K key = entry.getKey();
284 V value = entry.getValue();
285 Timestamp timestamp = clockService.getTimestamp(entry.getKey());
286
287 if (putInternal(key, value, timestamp)) {
288 updates.add(new PutEntry<>(key, value, timestamp));
289 }
290 }
291
Jonathan Hart584d2f32015-01-27 19:46:14 -0800292 if (!updates.isEmpty()) {
293 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800294
Jonathan Hart584d2f32015-01-27 19:46:14 -0800295 for (PutEntry<K, V> entry : updates) {
296 EventuallyConsistentMapEvent<K, V> externalEvent = new EventuallyConsistentMapEvent<>(
297 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
298 entry.value());
299 notifyListeners(externalEvent);
300 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800301 }
302 }
303
304 @Override
305 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800306 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800307
308 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
309
310 for (K key : items.keySet()) {
311 Timestamp timestamp = clockService.getTimestamp(key);
312
313 if (removeInternal(key, timestamp)) {
314 removed.add(new RemoveEntry<>(key, timestamp));
315 }
316 }
317
Jonathan Hart584d2f32015-01-27 19:46:14 -0800318 if (!removed.isEmpty()) {
319 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800320
Jonathan Hart584d2f32015-01-27 19:46:14 -0800321 for (RemoveEntry<K> entry : removed) {
322 EventuallyConsistentMapEvent<K, V> externalEvent
323 = new EventuallyConsistentMapEvent<>(
324 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
325 null);
326 notifyListeners(externalEvent);
327 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800328 }
329 }
330
331 @Override
332 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800333 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800334
335 return items.keySet();
336 }
337
338 @Override
339 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800340 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341
342 return items.values().stream()
343 .map(Timestamped::value)
344 .collect(Collectors.toList());
345 }
346
347 @Override
348 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800349 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800350
351 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800352 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800353 .collect(Collectors.toSet());
354 }
355
356 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800357 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
358 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800359
360 listeners.add(checkNotNull(listener));
361 }
362
363 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800364 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
365 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800366
367 listeners.remove(checkNotNull(listener));
368 }
369
370 @Override
371 public void destroy() {
372 destroyed = true;
373
374 executor.shutdown();
375 backgroundExecutor.shutdown();
376
Jonathan Hart584d2f32015-01-27 19:46:14 -0800377 listeners.clear();
378
Jonathan Hartdb3af892015-01-26 13:19:07 -0800379 clusterCommunicator.removeSubscriber(updateMessageSubject);
380 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800381 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800382 }
383
Jonathan Hartaaa56572015-01-28 21:56:35 -0800384 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
385 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800386 listener.event(event);
387 }
388 }
389
390 private void notifyPeers(InternalPutEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800391 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800392 }
393
394 private void notifyPeers(InternalRemoveEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800395 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800396 }
397
Jonathan Hart7d656f42015-01-27 14:07:23 -0800398 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800399 ClusterMessage message = new ClusterMessage(
400 clusterService.getLocalNode().id(),
401 subject,
402 serializer.encode(event));
403 clusterCommunicator.broadcast(message);
404 }
405
406 private void unicastMessage(NodeId peer,
407 MessageSubject subject,
408 Object event) throws IOException {
409 ClusterMessage message = new ClusterMessage(
410 clusterService.getLocalNode().id(),
411 subject,
412 serializer.encode(event));
413 clusterCommunicator.unicast(message, peer);
414 }
415
Jonathan Hartaaa56572015-01-28 21:56:35 -0800416 private final class SendAdvertisementTask implements Runnable {
417 @Override
418 public void run() {
419 if (Thread.currentThread().isInterrupted()) {
420 log.info("Interrupted, quitting");
421 return;
422 }
423
424 try {
425 final NodeId self = clusterService.getLocalNode().id();
426 Set<ControllerNode> nodes = clusterService.getNodes();
427
428 List<NodeId> nodeIds = nodes.stream()
429 .map(node -> node.id())
430 .collect(Collectors.toList());
431
432 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
433 log.trace("No other peers in the cluster.");
434 return;
435 }
436
437 NodeId peer;
438 do {
439 int idx = RandomUtils.nextInt(0, nodeIds.size());
440 peer = nodeIds.get(idx);
441 } while (peer.equals(self));
442
443 if (Thread.currentThread().isInterrupted()) {
444 log.info("Interrupted, quitting");
445 return;
446 }
447
448 AntiEntropyAdvertisement<K> ad = createAdvertisement();
449
450 try {
451 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
452 } catch (IOException e) {
453 log.debug("Failed to send anti-entropy advertisement to {}", peer);
454 }
455 } catch (Exception e) {
456 // Catch all exceptions to avoid scheduled task being suppressed.
457 log.error("Exception thrown while sending advertisement", e);
458 }
459 }
460 }
461
462 private AntiEntropyAdvertisement<K> createAdvertisement() {
463 final NodeId self = clusterService.getLocalNode().id();
464
465 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
466
467 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
468
469 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
470
471 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
472 }
473
474 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
475 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
476
477 synchronized (this) {
478 final NodeId sender = ad.sender();
479
480 externalEvents = antiEntropyCheckLocalItems(ad);
481
482 antiEntropyCheckLocalRemoved(ad);
483
484 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
485
486 // if remote ad has something unknown, actively sync
487 for (K key : ad.timestamps().keySet()) {
488 if (!items.containsKey(key)) {
489 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
490 try {
491 unicastMessage(sender, antiEntropyAdvertisementSubject,
492 myAd);
493 break;
494 } catch (IOException e) {
495 log.debug(
496 "Failed to send reactive anti-entropy advertisement to {}",
497 sender);
498 }
499 }
500 }
501 } // synchronized (this)
502
503 externalEvents.forEach(this::notifyListeners);
504 }
505
506 /**
507 * Checks if any of the remote's live items or tombstones are out of date
508 * according to our local live item list, or if our live items are out of
509 * date according to the remote's tombstone list.
510 * If the local copy is more recent, it will be pushed to the remote. If the
511 * remote has a more recent remove, we apply that to the local state.
512 *
513 * @param ad remote anti-entropy advertisement
514 * @return list of external events relating to local operations performed
515 */
516 // Guarded by synchronized (this)
517 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
518 AntiEntropyAdvertisement<K> ad) {
519 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
520 = new LinkedList<>();
521 final NodeId sender = ad.sender();
522
523 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
524
525 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
526 K key = item.getKey();
527 Timestamped<V> localValue = item.getValue();
528
529 Timestamp remoteTimestamp = ad.timestamps().get(key);
530 if (remoteTimestamp == null) {
531 remoteTimestamp = ad.tombstones().get(key);
532 }
533 if (remoteTimestamp == null || localValue
534 .isNewer(remoteTimestamp)) {
535 // local value is more recent, push to sender
536 updatesToSend
537 .add(new PutEntry<>(key, localValue.value(),
538 localValue.timestamp()));
539 }
540
541 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
542 if (remoteDeadTimestamp != null &&
543 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
544 // sender has a more recent remove
545 if (removeInternal(key, remoteDeadTimestamp)) {
546 externalEvents.add(new EventuallyConsistentMapEvent<>(
547 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
548 }
549 }
550 }
551
552 // Send all updates to the peer at once
553 if (!updatesToSend.isEmpty()) {
554 try {
555 unicastMessage(sender, updateMessageSubject, new InternalPutEvent<>(updatesToSend));
556 } catch (IOException e) {
557 log.warn("Failed to send advertisement response", e);
558 }
559 }
560
561 return externalEvents;
562 }
563
564 /**
565 * Checks if any items in the remote live list are out of date according
566 * to our tombstone list. If we find we have a more up to date tombstone,
567 * we'll send it to the remote.
568 *
569 * @param ad remote anti-entropy advertisement
570 */
571 // Guarded by synchronized (this)
572 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
573 final NodeId sender = ad.sender();
574
575 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
576
577 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
578 K key = dead.getKey();
579 Timestamp localDeadTimestamp = dead.getValue();
580
581 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
582 if (remoteLiveTimestamp != null
583 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
584 // sender has zombie, push remove
585 removesToSend
586 .add(new RemoveEntry<>(key, localDeadTimestamp));
587 }
588 }
589
590 // Send all removes to the peer at once
591 if (!removesToSend.isEmpty()) {
592 try {
593 unicastMessage(sender, removeMessageSubject, new InternalRemoveEvent<>(removesToSend));
594 } catch (IOException e) {
595 log.warn("Failed to send advertisement response", e);
596 }
597 }
598 }
599
600 /**
601 * Checks if any of the local live items are out of date according to the
602 * remote's tombstone advertisements. If we find a local item is out of date,
603 * we'll apply the remove operation to the local state.
604 *
605 * @param ad remote anti-entropy advertisement
606 * @return list of external events relating to local operations performed
607 */
608 // Guarded by synchronized (this)
609 private List<EventuallyConsistentMapEvent<K, V>>
610 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
611 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
612 = new LinkedList<>();
613
614 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
615 K key = remoteDead.getKey();
616 Timestamp remoteDeadTimestamp = remoteDead.getValue();
617
618 Timestamped<V> local = items.get(key);
619 Timestamp localDead = removedItems.get(key);
620 if (local != null
621 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
622 // remove our version
623 if (removeInternal(key, remoteDeadTimestamp)) {
624 externalEvents.add(new EventuallyConsistentMapEvent<>(
625 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
626 }
627 } else if (localDead != null &&
628 remoteDeadTimestamp.compareTo(localDead) > 0) {
629 // If we both had the item as removed, but their timestamp is
630 // newer, update ours to the newer value
631 removedItems.put(key, remoteDeadTimestamp);
632 }
633 }
634
635 return externalEvents;
636 }
637
638 private final class InternalAntiEntropyListener
639 implements ClusterMessageHandler {
640
641 @Override
642 public void handle(ClusterMessage message) {
643 log.trace("Received anti-entropy advertisement from peer: {}", message.sender());
644 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
645 backgroundExecutor.submit(() -> {
646 try {
647 handleAntiEntropyAdvertisement(advertisement);
648 } catch (Exception e) {
649 log.warn("Exception thrown handling advertisements", e);
650 }
651 });
652 }
653 }
654
Jonathan Hartdb3af892015-01-26 13:19:07 -0800655 private final class InternalPutEventListener implements
656 ClusterMessageHandler {
657 @Override
658 public void handle(ClusterMessage message) {
659 log.debug("Received put event from peer: {}", message.sender());
660 InternalPutEvent<K, V> event = serializer.decode(message.payload());
661
662 executor.submit(() -> {
663 try {
664 for (PutEntry<K, V> entry : event.entries()) {
665 K key = entry.key();
666 V value = entry.value();
667 Timestamp timestamp = entry.timestamp();
668
669 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800670 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800671 new EventuallyConsistentMapEvent<>(
672 EventuallyConsistentMapEvent.Type.PUT, key,
673 value);
674 notifyListeners(externalEvent);
675 }
676 }
677 } catch (Exception e) {
678 log.warn("Exception thrown handling put", e);
679 }
680 });
681 }
682 }
683
684 private final class InternalRemoveEventListener implements
685 ClusterMessageHandler {
686 @Override
687 public void handle(ClusterMessage message) {
688 log.debug("Received remove event from peer: {}", message.sender());
689 InternalRemoveEvent<K> event = serializer.decode(message.payload());
690
691 executor.submit(() -> {
692 try {
693 for (RemoveEntry<K> entry : event.entries()) {
694 K key = entry.key();
695 Timestamp timestamp = entry.timestamp();
696
697 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800698 EventuallyConsistentMapEvent<K, V> externalEvent
699 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800700 EventuallyConsistentMapEvent.Type.REMOVE,
701 key, null);
702 notifyListeners(externalEvent);
703 }
704 }
705 } catch (Exception e) {
706 log.warn("Exception thrown handling remove", e);
707 }
708 });
709 }
710 }
711
Jonathan Hartdb3af892015-01-26 13:19:07 -0800712}