blob: 0e6a590791071ab80b75adfdb7199ecbccdfb28c [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
Jonathan Hartf9108232015-02-02 16:37:35 -080019import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080020import org.onlab.util.KryoNamespace;
21import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.NodeId;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
29import org.onosproject.store.serializers.KryoSerializer;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import java.io.IOException;
34import java.util.ArrayList;
35import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080036import java.util.HashMap;
37import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080038import java.util.List;
39import java.util.Map;
40import java.util.Set;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.CopyOnWriteArraySet;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.Executors;
45import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080046import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080047import java.util.stream.Collectors;
48
49import static com.google.common.base.Preconditions.checkNotNull;
50import static com.google.common.base.Preconditions.checkState;
51import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
52import static org.onlab.util.Tools.minPriority;
53import static org.onlab.util.Tools.namedThreads;
54
55/**
56 * Distributed Map implementation which uses optimistic replication and gossip
57 * based techniques to provide an eventually consistent data store.
58 */
59public class EventuallyConsistentMapImpl<K, V>
60 implements EventuallyConsistentMap<K, V> {
61
62 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
63
64 private final Map<K, Timestamped<V>> items;
65 private final Map<K, Timestamp> removedItems;
66
67 private final String mapName;
68 private final ClusterService clusterService;
69 private final ClusterCommunicationService clusterCommunicator;
70 private final KryoSerializer serializer;
71
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080072 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080073
74 private final MessageSubject updateMessageSubject;
75 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080076 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080077
Jonathan Hartaaa56572015-01-28 21:56:35 -080078 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080079 = new CopyOnWriteArraySet<>();
80
81 private final ExecutorService executor;
82
83 private final ScheduledExecutorService backgroundExecutor;
84
85 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080086 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080087
Jonathan Hart4f397e82015-02-04 09:10:41 -080088 private static final String ERROR_NULL_KEY = "Key cannot be null";
89 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
90
Jonathan Hartdb3af892015-01-26 13:19:07 -080091 // TODO: Make these anti-entropy params configurable
92 private long initialDelaySec = 5;
93 private long periodSec = 5;
94
95 /**
96 * Creates a new eventually consistent map shared amongst multiple instances.
97 *
98 * Each map is identified by a string map name. EventuallyConsistentMapImpl
99 * objects in different JVMs that use the same map name will form a
100 * distributed map across JVMs (provided the cluster service is aware of
101 * both nodes).
102 *
103 * The client is expected to provide an
104 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
105 * will be stored in this map have been registered (including referenced
106 * classes). This serializer will be used to serialize both K and V for
107 * inter-node notifications.
108 *
109 * The client must provide an {@link org.onosproject.store.impl.ClockService}
110 * which can generate timestamps for a given key. The clock service is free
111 * to generate timestamps however it wishes, however these timestamps will
112 * be used to serialize updates to the map so they must be strict enough
113 * to ensure updates are properly ordered for the use case (i.e. in some
114 * cases wallclock time will suffice, whereas in other cases logical time
115 * will be necessary).
116 *
117 * @param mapName a String identifier for the map.
118 * @param clusterService the cluster service
119 * @param clusterCommunicator the cluster communications service
120 * @param serializerBuilder a Kryo namespace builder that can serialize
121 * both K and V
122 * @param clockService a clock service able to generate timestamps
123 * for K
124 */
125 public EventuallyConsistentMapImpl(String mapName,
126 ClusterService clusterService,
127 ClusterCommunicationService clusterCommunicator,
128 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800129 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800130
131 this.mapName = checkNotNull(mapName);
132 this.clusterService = checkNotNull(clusterService);
133 this.clusterCommunicator = checkNotNull(clusterCommunicator);
134
135 serializer = createSerializer(checkNotNull(serializerBuilder));
136
137 this.clockService = checkNotNull(clockService);
138
139 items = new ConcurrentHashMap<>();
140 removedItems = new ConcurrentHashMap<>();
141
142 executor = Executors
143 .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
144
145 backgroundExecutor =
146 newSingleThreadScheduledExecutor(minPriority(
147 namedThreads("onos-ecm-" + mapName + "-bg-%d")));
148
Jonathan Hartaaa56572015-01-28 21:56:35 -0800149 // start anti-entropy thread
150 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
151 initialDelaySec, periodSec,
152 TimeUnit.SECONDS);
153
Jonathan Hartdb3af892015-01-26 13:19:07 -0800154 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
155 clusterCommunicator.addSubscriber(updateMessageSubject,
156 new InternalPutEventListener());
157 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
158 clusterCommunicator.addSubscriber(removeMessageSubject,
159 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800160 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
161 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
162 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800163 }
164
165 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
166 return new KryoSerializer() {
167 @Override
168 protected void setupKryoPool() {
169 // Add the map's internal helper classes to the user-supplied serializer
170 serializerPool = builder
171 .register(WallClockTimestamp.class)
172 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800173 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800174 .register(ArrayList.class)
175 .register(InternalPutEvent.class)
176 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800177 .register(AntiEntropyAdvertisement.class)
178 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800179 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800180 }
181 };
182 }
183
184 @Override
185 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800186 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800187 return items.size();
188 }
189
190 @Override
191 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800192 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800193 return items.isEmpty();
194 }
195
196 @Override
197 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800198 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800199 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800200 return items.containsKey(key);
201 }
202
203 @Override
204 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800205 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800206 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800207
208 return items.values().stream()
209 .anyMatch(timestamped -> timestamped.value().equals(value));
210 }
211
212 @Override
213 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800214 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800215 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800216
217 Timestamped<V> value = items.get(key);
218 if (value != null) {
219 return value.value();
220 }
221 return null;
222 }
223
224 @Override
225 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800226 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800227 checkNotNull(key, ERROR_NULL_KEY);
228 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800229
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800230 Timestamp timestamp = clockService.getTimestamp(key, value);
231
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232 if (putInternal(key, value, timestamp)) {
233 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
234 EventuallyConsistentMapEvent<K, V> externalEvent
235 = new EventuallyConsistentMapEvent<>(
236 EventuallyConsistentMapEvent.Type.PUT, key, value);
237 notifyListeners(externalEvent);
238 }
239 }
240
241 private boolean putInternal(K key, V value, Timestamp timestamp) {
242 synchronized (this) {
243 Timestamp removed = removedItems.get(key);
244 if (removed != null && removed.compareTo(timestamp) > 0) {
245 return false;
246 }
247
248 Timestamped<V> existing = items.get(key);
249 if (existing != null && existing.isNewer(timestamp)) {
250 return false;
251 } else {
252 items.put(key, new Timestamped<>(value, timestamp));
253 removedItems.remove(key);
254 return true;
255 }
256 }
257 }
258
259 @Override
260 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800261 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800262 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800263
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800264 // TODO prevent calls here if value is important for timestamp
265 Timestamp timestamp = clockService.getTimestamp(key, null);
266
Jonathan Hartdb3af892015-01-26 13:19:07 -0800267 if (removeInternal(key, timestamp)) {
268 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
269 EventuallyConsistentMapEvent<K, V> externalEvent
270 = new EventuallyConsistentMapEvent<>(
271 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
272 notifyListeners(externalEvent);
273 }
274 }
275
276 private boolean removeInternal(K key, Timestamp timestamp) {
277 synchronized (this) {
278 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
279 return false;
280 }
281
282 items.remove(key);
283 removedItems.put(key, timestamp);
284 return true;
285 }
286 }
287
288 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800289 public void remove(K key, V value) {
290 checkState(!destroyed, mapName + ERROR_DESTROYED);
291 checkNotNull(key, ERROR_NULL_KEY);
292 checkNotNull(value, ERROR_NULL_VALUE);
293
294 Timestamp timestamp = clockService.getTimestamp(key, value);
295
296 if (removeInternal(key, timestamp)) {
297 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
298 EventuallyConsistentMapEvent<K, V> externalEvent
299 = new EventuallyConsistentMapEvent<>(
300 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
301 notifyListeners(externalEvent);
302 }
303 }
304
305 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800306 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800307 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800308
309 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
310
311 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
312 K key = entry.getKey();
313 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800314
315 checkNotNull(key, ERROR_NULL_KEY);
316 checkNotNull(value, ERROR_NULL_VALUE);
317
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800318 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800319
320 if (putInternal(key, value, timestamp)) {
321 updates.add(new PutEntry<>(key, value, timestamp));
322 }
323 }
324
Jonathan Hart584d2f32015-01-27 19:46:14 -0800325 if (!updates.isEmpty()) {
326 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800327
Jonathan Hart584d2f32015-01-27 19:46:14 -0800328 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800329 EventuallyConsistentMapEvent<K, V> externalEvent =
330 new EventuallyConsistentMapEvent<>(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800331 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
332 entry.value());
333 notifyListeners(externalEvent);
334 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800335 }
336 }
337
338 @Override
339 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800340 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341
342 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
343
344 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800345 // TODO also this is not applicable if value is important for timestamp?
346 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800347
348 if (removeInternal(key, timestamp)) {
349 removed.add(new RemoveEntry<>(key, timestamp));
350 }
351 }
352
Jonathan Hart584d2f32015-01-27 19:46:14 -0800353 if (!removed.isEmpty()) {
354 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800355
Jonathan Hart584d2f32015-01-27 19:46:14 -0800356 for (RemoveEntry<K> entry : removed) {
357 EventuallyConsistentMapEvent<K, V> externalEvent
358 = new EventuallyConsistentMapEvent<>(
359 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
360 null);
361 notifyListeners(externalEvent);
362 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800363 }
364 }
365
366 @Override
367 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800368 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800369
370 return items.keySet();
371 }
372
373 @Override
374 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800375 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800376
377 return items.values().stream()
378 .map(Timestamped::value)
379 .collect(Collectors.toList());
380 }
381
382 @Override
383 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800384 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800385
386 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800387 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800388 .collect(Collectors.toSet());
389 }
390
391 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800392 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
393 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800394
395 listeners.add(checkNotNull(listener));
396 }
397
398 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800399 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
400 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800401
402 listeners.remove(checkNotNull(listener));
403 }
404
405 @Override
406 public void destroy() {
407 destroyed = true;
408
409 executor.shutdown();
410 backgroundExecutor.shutdown();
411
Jonathan Hart584d2f32015-01-27 19:46:14 -0800412 listeners.clear();
413
Jonathan Hartdb3af892015-01-26 13:19:07 -0800414 clusterCommunicator.removeSubscriber(updateMessageSubject);
415 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800416 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800417 }
418
Jonathan Hartaaa56572015-01-28 21:56:35 -0800419 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
420 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800421 listener.event(event);
422 }
423 }
424
425 private void notifyPeers(InternalPutEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800426 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800427 }
428
429 private void notifyPeers(InternalRemoveEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800430 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800431 }
432
Jonathan Hart7d656f42015-01-27 14:07:23 -0800433 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800434 ClusterMessage message = new ClusterMessage(
435 clusterService.getLocalNode().id(),
436 subject,
437 serializer.encode(event));
438 clusterCommunicator.broadcast(message);
439 }
440
441 private void unicastMessage(NodeId peer,
442 MessageSubject subject,
443 Object event) throws IOException {
444 ClusterMessage message = new ClusterMessage(
445 clusterService.getLocalNode().id(),
446 subject,
447 serializer.encode(event));
448 clusterCommunicator.unicast(message, peer);
449 }
450
Jonathan Hartaaa56572015-01-28 21:56:35 -0800451 private final class SendAdvertisementTask implements Runnable {
452 @Override
453 public void run() {
454 if (Thread.currentThread().isInterrupted()) {
455 log.info("Interrupted, quitting");
456 return;
457 }
458
459 try {
460 final NodeId self = clusterService.getLocalNode().id();
461 Set<ControllerNode> nodes = clusterService.getNodes();
462
463 List<NodeId> nodeIds = nodes.stream()
464 .map(node -> node.id())
465 .collect(Collectors.toList());
466
467 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
468 log.trace("No other peers in the cluster.");
469 return;
470 }
471
472 NodeId peer;
473 do {
474 int idx = RandomUtils.nextInt(0, nodeIds.size());
475 peer = nodeIds.get(idx);
476 } while (peer.equals(self));
477
478 if (Thread.currentThread().isInterrupted()) {
479 log.info("Interrupted, quitting");
480 return;
481 }
482
483 AntiEntropyAdvertisement<K> ad = createAdvertisement();
484
485 try {
486 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
487 } catch (IOException e) {
488 log.debug("Failed to send anti-entropy advertisement to {}", peer);
489 }
490 } catch (Exception e) {
491 // Catch all exceptions to avoid scheduled task being suppressed.
492 log.error("Exception thrown while sending advertisement", e);
493 }
494 }
495 }
496
497 private AntiEntropyAdvertisement<K> createAdvertisement() {
498 final NodeId self = clusterService.getLocalNode().id();
499
500 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
501
502 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
503
504 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
505
506 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
507 }
508
509 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
510 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
511
512 synchronized (this) {
513 final NodeId sender = ad.sender();
514
515 externalEvents = antiEntropyCheckLocalItems(ad);
516
517 antiEntropyCheckLocalRemoved(ad);
518
519 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
520
521 // if remote ad has something unknown, actively sync
522 for (K key : ad.timestamps().keySet()) {
523 if (!items.containsKey(key)) {
524 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
525 try {
526 unicastMessage(sender, antiEntropyAdvertisementSubject,
527 myAd);
528 break;
529 } catch (IOException e) {
530 log.debug(
531 "Failed to send reactive anti-entropy advertisement to {}",
532 sender);
533 }
534 }
535 }
536 } // synchronized (this)
537
538 externalEvents.forEach(this::notifyListeners);
539 }
540
541 /**
542 * Checks if any of the remote's live items or tombstones are out of date
543 * according to our local live item list, or if our live items are out of
544 * date according to the remote's tombstone list.
545 * If the local copy is more recent, it will be pushed to the remote. If the
546 * remote has a more recent remove, we apply that to the local state.
547 *
548 * @param ad remote anti-entropy advertisement
549 * @return list of external events relating to local operations performed
550 */
551 // Guarded by synchronized (this)
552 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
553 AntiEntropyAdvertisement<K> ad) {
554 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
555 = new LinkedList<>();
556 final NodeId sender = ad.sender();
557
558 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
559
560 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
561 K key = item.getKey();
562 Timestamped<V> localValue = item.getValue();
563
564 Timestamp remoteTimestamp = ad.timestamps().get(key);
565 if (remoteTimestamp == null) {
566 remoteTimestamp = ad.tombstones().get(key);
567 }
568 if (remoteTimestamp == null || localValue
569 .isNewer(remoteTimestamp)) {
570 // local value is more recent, push to sender
571 updatesToSend
572 .add(new PutEntry<>(key, localValue.value(),
573 localValue.timestamp()));
574 }
575
576 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
577 if (remoteDeadTimestamp != null &&
578 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
579 // sender has a more recent remove
580 if (removeInternal(key, remoteDeadTimestamp)) {
581 externalEvents.add(new EventuallyConsistentMapEvent<>(
582 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
583 }
584 }
585 }
586
587 // Send all updates to the peer at once
588 if (!updatesToSend.isEmpty()) {
589 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800590 unicastMessage(sender, updateMessageSubject,
591 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800592 } catch (IOException e) {
593 log.warn("Failed to send advertisement response", e);
594 }
595 }
596
597 return externalEvents;
598 }
599
600 /**
601 * Checks if any items in the remote live list are out of date according
602 * to our tombstone list. If we find we have a more up to date tombstone,
603 * we'll send it to the remote.
604 *
605 * @param ad remote anti-entropy advertisement
606 */
607 // Guarded by synchronized (this)
608 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
609 final NodeId sender = ad.sender();
610
611 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
612
613 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
614 K key = dead.getKey();
615 Timestamp localDeadTimestamp = dead.getValue();
616
617 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
618 if (remoteLiveTimestamp != null
619 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
620 // sender has zombie, push remove
621 removesToSend
622 .add(new RemoveEntry<>(key, localDeadTimestamp));
623 }
624 }
625
626 // Send all removes to the peer at once
627 if (!removesToSend.isEmpty()) {
628 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800629 unicastMessage(sender, removeMessageSubject,
630 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800631 } catch (IOException e) {
632 log.warn("Failed to send advertisement response", e);
633 }
634 }
635 }
636
637 /**
638 * Checks if any of the local live items are out of date according to the
639 * remote's tombstone advertisements. If we find a local item is out of date,
640 * we'll apply the remove operation to the local state.
641 *
642 * @param ad remote anti-entropy advertisement
643 * @return list of external events relating to local operations performed
644 */
645 // Guarded by synchronized (this)
646 private List<EventuallyConsistentMapEvent<K, V>>
647 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
648 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
649 = new LinkedList<>();
650
651 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
652 K key = remoteDead.getKey();
653 Timestamp remoteDeadTimestamp = remoteDead.getValue();
654
655 Timestamped<V> local = items.get(key);
656 Timestamp localDead = removedItems.get(key);
657 if (local != null
658 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
659 // remove our version
660 if (removeInternal(key, remoteDeadTimestamp)) {
661 externalEvents.add(new EventuallyConsistentMapEvent<>(
662 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
663 }
664 } else if (localDead != null &&
665 remoteDeadTimestamp.compareTo(localDead) > 0) {
666 // If we both had the item as removed, but their timestamp is
667 // newer, update ours to the newer value
668 removedItems.put(key, remoteDeadTimestamp);
669 }
670 }
671
672 return externalEvents;
673 }
674
675 private final class InternalAntiEntropyListener
676 implements ClusterMessageHandler {
677
678 @Override
679 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800680 log.trace("Received anti-entropy advertisement from peer: {}",
681 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800682 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
683 backgroundExecutor.submit(() -> {
684 try {
685 handleAntiEntropyAdvertisement(advertisement);
686 } catch (Exception e) {
687 log.warn("Exception thrown handling advertisements", e);
688 }
689 });
690 }
691 }
692
Jonathan Hartdb3af892015-01-26 13:19:07 -0800693 private final class InternalPutEventListener implements
694 ClusterMessageHandler {
695 @Override
696 public void handle(ClusterMessage message) {
697 log.debug("Received put event from peer: {}", message.sender());
698 InternalPutEvent<K, V> event = serializer.decode(message.payload());
699
700 executor.submit(() -> {
701 try {
702 for (PutEntry<K, V> entry : event.entries()) {
703 K key = entry.key();
704 V value = entry.value();
705 Timestamp timestamp = entry.timestamp();
706
707 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800708 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800709 new EventuallyConsistentMapEvent<>(
710 EventuallyConsistentMapEvent.Type.PUT, key,
711 value);
712 notifyListeners(externalEvent);
713 }
714 }
715 } catch (Exception e) {
716 log.warn("Exception thrown handling put", e);
717 }
718 });
719 }
720 }
721
722 private final class InternalRemoveEventListener implements
723 ClusterMessageHandler {
724 @Override
725 public void handle(ClusterMessage message) {
726 log.debug("Received remove event from peer: {}", message.sender());
727 InternalRemoveEvent<K> event = serializer.decode(message.payload());
728
729 executor.submit(() -> {
730 try {
731 for (RemoveEntry<K> entry : event.entries()) {
732 K key = entry.key();
733 Timestamp timestamp = entry.timestamp();
734
735 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800736 EventuallyConsistentMapEvent<K, V> externalEvent
737 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800738 EventuallyConsistentMapEvent.Type.REMOVE,
739 key, null);
740 notifyListeners(externalEvent);
741 }
742 }
743 } catch (Exception e) {
744 log.warn("Exception thrown handling remove", e);
745 }
746 });
747 }
748 }
749
Jonathan Hartdb3af892015-01-26 13:19:07 -0800750}