blob: 7f4241411f427f931143e62ea1278a99c4ac2eeb [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
Jonathan Hartf9108232015-02-02 16:37:35 -080019import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080020import org.onlab.util.KryoNamespace;
21import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.NodeId;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080029import org.onosproject.store.impl.ClockService;
30import org.onosproject.store.impl.Timestamped;
31import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.serializers.KryoSerializer;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
36import java.io.IOException;
37import java.util.ArrayList;
38import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080039import java.util.HashMap;
40import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080041import java.util.List;
42import java.util.Map;
43import java.util.Set;
44import java.util.concurrent.ConcurrentHashMap;
45import java.util.concurrent.CopyOnWriteArraySet;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080049import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.stream.Collectors;
51
52import static com.google.common.base.Preconditions.checkNotNull;
53import static com.google.common.base.Preconditions.checkState;
54import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080055import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080056import static org.onlab.util.Tools.minPriority;
Jonathan Hartdb3af892015-01-26 13:19:07 -080057
58/**
59 * Distributed Map implementation which uses optimistic replication and gossip
60 * based techniques to provide an eventually consistent data store.
61 */
62public class EventuallyConsistentMapImpl<K, V>
63 implements EventuallyConsistentMap<K, V> {
64
65 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
66
67 private final Map<K, Timestamped<V>> items;
68 private final Map<K, Timestamp> removedItems;
69
70 private final String mapName;
71 private final ClusterService clusterService;
72 private final ClusterCommunicationService clusterCommunicator;
73 private final KryoSerializer serializer;
74
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080075 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080076
77 private final MessageSubject updateMessageSubject;
78 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080079 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080080
Jonathan Hartaaa56572015-01-28 21:56:35 -080081 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080082 = new CopyOnWriteArraySet<>();
83
84 private final ExecutorService executor;
85
86 private final ScheduledExecutorService backgroundExecutor;
87
88 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080089 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080090
Jonathan Hart4f397e82015-02-04 09:10:41 -080091 private static final String ERROR_NULL_KEY = "Key cannot be null";
92 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
93
Jonathan Hartdb3af892015-01-26 13:19:07 -080094 // TODO: Make these anti-entropy params configurable
95 private long initialDelaySec = 5;
96 private long periodSec = 5;
97
98 /**
99 * Creates a new eventually consistent map shared amongst multiple instances.
100 *
101 * Each map is identified by a string map name. EventuallyConsistentMapImpl
102 * objects in different JVMs that use the same map name will form a
103 * distributed map across JVMs (provided the cluster service is aware of
104 * both nodes).
105 *
106 * The client is expected to provide an
107 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
108 * will be stored in this map have been registered (including referenced
109 * classes). This serializer will be used to serialize both K and V for
110 * inter-node notifications.
111 *
112 * The client must provide an {@link org.onosproject.store.impl.ClockService}
113 * which can generate timestamps for a given key. The clock service is free
114 * to generate timestamps however it wishes, however these timestamps will
115 * be used to serialize updates to the map so they must be strict enough
116 * to ensure updates are properly ordered for the use case (i.e. in some
117 * cases wallclock time will suffice, whereas in other cases logical time
118 * will be necessary).
119 *
120 * @param mapName a String identifier for the map.
121 * @param clusterService the cluster service
122 * @param clusterCommunicator the cluster communications service
123 * @param serializerBuilder a Kryo namespace builder that can serialize
124 * both K and V
125 * @param clockService a clock service able to generate timestamps
126 * for K
127 */
128 public EventuallyConsistentMapImpl(String mapName,
129 ClusterService clusterService,
130 ClusterCommunicationService clusterCommunicator,
131 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800132 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800133
134 this.mapName = checkNotNull(mapName);
135 this.clusterService = checkNotNull(clusterService);
136 this.clusterCommunicator = checkNotNull(clusterCommunicator);
137
138 serializer = createSerializer(checkNotNull(serializerBuilder));
139
140 this.clockService = checkNotNull(clockService);
141
142 items = new ConcurrentHashMap<>();
143 removedItems = new ConcurrentHashMap<>();
144
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800145 executor = Executors //FIXME
146 .newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800147
148 backgroundExecutor =
149 newSingleThreadScheduledExecutor(minPriority(
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800150 groupedThreads("onos/ecm", mapName + "-bg-%d")));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800151
Jonathan Hartaaa56572015-01-28 21:56:35 -0800152 // start anti-entropy thread
153 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
154 initialDelaySec, periodSec,
155 TimeUnit.SECONDS);
156
Jonathan Hartdb3af892015-01-26 13:19:07 -0800157 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
158 clusterCommunicator.addSubscriber(updateMessageSubject,
159 new InternalPutEventListener());
160 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
161 clusterCommunicator.addSubscriber(removeMessageSubject,
162 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800163 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
164 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
165 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800166 }
167
168 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
169 return new KryoSerializer() {
170 @Override
171 protected void setupKryoPool() {
172 // Add the map's internal helper classes to the user-supplied serializer
173 serializerPool = builder
174 .register(WallClockTimestamp.class)
175 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800176 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800177 .register(ArrayList.class)
178 .register(InternalPutEvent.class)
179 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800180 .register(AntiEntropyAdvertisement.class)
181 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800182 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800183 }
184 };
185 }
186
187 @Override
188 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800189 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800190 return items.size();
191 }
192
193 @Override
194 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800195 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800196 return items.isEmpty();
197 }
198
199 @Override
200 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800201 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800202 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800203 return items.containsKey(key);
204 }
205
206 @Override
207 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800208 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800209 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800210
211 return items.values().stream()
212 .anyMatch(timestamped -> timestamped.value().equals(value));
213 }
214
215 @Override
216 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800217 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800218 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800219
220 Timestamped<V> value = items.get(key);
221 if (value != null) {
222 return value.value();
223 }
224 return null;
225 }
226
227 @Override
228 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800229 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800230 checkNotNull(key, ERROR_NULL_KEY);
231 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800232
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800233 Timestamp timestamp = clockService.getTimestamp(key, value);
234
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235 if (putInternal(key, value, timestamp)) {
236 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
237 EventuallyConsistentMapEvent<K, V> externalEvent
238 = new EventuallyConsistentMapEvent<>(
239 EventuallyConsistentMapEvent.Type.PUT, key, value);
240 notifyListeners(externalEvent);
241 }
242 }
243
244 private boolean putInternal(K key, V value, Timestamp timestamp) {
245 synchronized (this) {
246 Timestamp removed = removedItems.get(key);
247 if (removed != null && removed.compareTo(timestamp) > 0) {
Jonathan Hart07e58be2015-02-12 09:57:16 -0800248 log.debug("ecmap - removed was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800249 return false;
250 }
251
252 Timestamped<V> existing = items.get(key);
253 if (existing != null && existing.isNewer(timestamp)) {
Jonathan Hart07e58be2015-02-12 09:57:16 -0800254 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800255 return false;
256 } else {
257 items.put(key, new Timestamped<>(value, timestamp));
258 removedItems.remove(key);
259 return true;
260 }
261 }
262 }
263
264 @Override
265 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800266 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800267 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800268
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800269 // TODO prevent calls here if value is important for timestamp
270 Timestamp timestamp = clockService.getTimestamp(key, null);
271
Jonathan Hartdb3af892015-01-26 13:19:07 -0800272 if (removeInternal(key, timestamp)) {
273 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
274 EventuallyConsistentMapEvent<K, V> externalEvent
275 = new EventuallyConsistentMapEvent<>(
276 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
277 notifyListeners(externalEvent);
278 }
279 }
280
281 private boolean removeInternal(K key, Timestamp timestamp) {
282 synchronized (this) {
283 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
284 return false;
285 }
286
287 items.remove(key);
288 removedItems.put(key, timestamp);
289 return true;
290 }
291 }
292
293 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800294 public void remove(K key, V value) {
295 checkState(!destroyed, mapName + ERROR_DESTROYED);
296 checkNotNull(key, ERROR_NULL_KEY);
297 checkNotNull(value, ERROR_NULL_VALUE);
298
299 Timestamp timestamp = clockService.getTimestamp(key, value);
300
301 if (removeInternal(key, timestamp)) {
302 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
303 EventuallyConsistentMapEvent<K, V> externalEvent
304 = new EventuallyConsistentMapEvent<>(
305 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
306 notifyListeners(externalEvent);
307 }
308 }
309
310 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800311 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800312 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800313
314 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
315
316 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
317 K key = entry.getKey();
318 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800319
320 checkNotNull(key, ERROR_NULL_KEY);
321 checkNotNull(value, ERROR_NULL_VALUE);
322
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800323 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800324
325 if (putInternal(key, value, timestamp)) {
326 updates.add(new PutEntry<>(key, value, timestamp));
327 }
328 }
329
Jonathan Hart584d2f32015-01-27 19:46:14 -0800330 if (!updates.isEmpty()) {
331 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800332
Jonathan Hart584d2f32015-01-27 19:46:14 -0800333 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800334 EventuallyConsistentMapEvent<K, V> externalEvent =
335 new EventuallyConsistentMapEvent<>(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800336 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
337 entry.value());
338 notifyListeners(externalEvent);
339 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800340 }
341 }
342
343 @Override
344 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800345 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800346
347 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
348
349 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800350 // TODO also this is not applicable if value is important for timestamp?
351 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800352
353 if (removeInternal(key, timestamp)) {
354 removed.add(new RemoveEntry<>(key, timestamp));
355 }
356 }
357
Jonathan Hart584d2f32015-01-27 19:46:14 -0800358 if (!removed.isEmpty()) {
359 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800360
Jonathan Hart584d2f32015-01-27 19:46:14 -0800361 for (RemoveEntry<K> entry : removed) {
362 EventuallyConsistentMapEvent<K, V> externalEvent
363 = new EventuallyConsistentMapEvent<>(
364 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
365 null);
366 notifyListeners(externalEvent);
367 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800368 }
369 }
370
371 @Override
372 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800373 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800374
375 return items.keySet();
376 }
377
378 @Override
379 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800380 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800381
382 return items.values().stream()
383 .map(Timestamped::value)
384 .collect(Collectors.toList());
385 }
386
387 @Override
388 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800389 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800390
391 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800392 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800393 .collect(Collectors.toSet());
394 }
395
396 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800397 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
398 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800399
400 listeners.add(checkNotNull(listener));
401 }
402
403 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800404 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
405 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800406
407 listeners.remove(checkNotNull(listener));
408 }
409
410 @Override
411 public void destroy() {
412 destroyed = true;
413
414 executor.shutdown();
415 backgroundExecutor.shutdown();
416
Jonathan Hart584d2f32015-01-27 19:46:14 -0800417 listeners.clear();
418
Jonathan Hartdb3af892015-01-26 13:19:07 -0800419 clusterCommunicator.removeSubscriber(updateMessageSubject);
420 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800421 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800422 }
423
Jonathan Hartaaa56572015-01-28 21:56:35 -0800424 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
425 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800426 listener.event(event);
427 }
428 }
429
430 private void notifyPeers(InternalPutEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800431 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800432 }
433
434 private void notifyPeers(InternalRemoveEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800435 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800436 }
437
Jonathan Hart7d656f42015-01-27 14:07:23 -0800438 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800439 ClusterMessage message = new ClusterMessage(
440 clusterService.getLocalNode().id(),
441 subject,
442 serializer.encode(event));
443 clusterCommunicator.broadcast(message);
444 }
445
446 private void unicastMessage(NodeId peer,
447 MessageSubject subject,
448 Object event) throws IOException {
449 ClusterMessage message = new ClusterMessage(
450 clusterService.getLocalNode().id(),
451 subject,
452 serializer.encode(event));
453 clusterCommunicator.unicast(message, peer);
454 }
455
Jonathan Hartaaa56572015-01-28 21:56:35 -0800456 private final class SendAdvertisementTask implements Runnable {
457 @Override
458 public void run() {
459 if (Thread.currentThread().isInterrupted()) {
460 log.info("Interrupted, quitting");
461 return;
462 }
463
464 try {
465 final NodeId self = clusterService.getLocalNode().id();
466 Set<ControllerNode> nodes = clusterService.getNodes();
467
468 List<NodeId> nodeIds = nodes.stream()
469 .map(node -> node.id())
470 .collect(Collectors.toList());
471
472 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
473 log.trace("No other peers in the cluster.");
474 return;
475 }
476
477 NodeId peer;
478 do {
479 int idx = RandomUtils.nextInt(0, nodeIds.size());
480 peer = nodeIds.get(idx);
481 } while (peer.equals(self));
482
483 if (Thread.currentThread().isInterrupted()) {
484 log.info("Interrupted, quitting");
485 return;
486 }
487
488 AntiEntropyAdvertisement<K> ad = createAdvertisement();
489
490 try {
491 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
492 } catch (IOException e) {
493 log.debug("Failed to send anti-entropy advertisement to {}", peer);
494 }
495 } catch (Exception e) {
496 // Catch all exceptions to avoid scheduled task being suppressed.
497 log.error("Exception thrown while sending advertisement", e);
498 }
499 }
500 }
501
502 private AntiEntropyAdvertisement<K> createAdvertisement() {
503 final NodeId self = clusterService.getLocalNode().id();
504
505 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
506
507 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
508
509 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
510
511 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
512 }
513
514 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
515 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
516
517 synchronized (this) {
518 final NodeId sender = ad.sender();
519
520 externalEvents = antiEntropyCheckLocalItems(ad);
521
522 antiEntropyCheckLocalRemoved(ad);
523
524 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
525
526 // if remote ad has something unknown, actively sync
527 for (K key : ad.timestamps().keySet()) {
528 if (!items.containsKey(key)) {
529 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
530 try {
531 unicastMessage(sender, antiEntropyAdvertisementSubject,
532 myAd);
533 break;
534 } catch (IOException e) {
535 log.debug(
536 "Failed to send reactive anti-entropy advertisement to {}",
537 sender);
538 }
539 }
540 }
541 } // synchronized (this)
542
543 externalEvents.forEach(this::notifyListeners);
544 }
545
546 /**
547 * Checks if any of the remote's live items or tombstones are out of date
548 * according to our local live item list, or if our live items are out of
549 * date according to the remote's tombstone list.
550 * If the local copy is more recent, it will be pushed to the remote. If the
551 * remote has a more recent remove, we apply that to the local state.
552 *
553 * @param ad remote anti-entropy advertisement
554 * @return list of external events relating to local operations performed
555 */
556 // Guarded by synchronized (this)
557 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
558 AntiEntropyAdvertisement<K> ad) {
559 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
560 = new LinkedList<>();
561 final NodeId sender = ad.sender();
562
563 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
564
565 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
566 K key = item.getKey();
567 Timestamped<V> localValue = item.getValue();
568
569 Timestamp remoteTimestamp = ad.timestamps().get(key);
570 if (remoteTimestamp == null) {
571 remoteTimestamp = ad.tombstones().get(key);
572 }
573 if (remoteTimestamp == null || localValue
574 .isNewer(remoteTimestamp)) {
575 // local value is more recent, push to sender
576 updatesToSend
577 .add(new PutEntry<>(key, localValue.value(),
578 localValue.timestamp()));
579 }
580
581 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
582 if (remoteDeadTimestamp != null &&
583 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
584 // sender has a more recent remove
585 if (removeInternal(key, remoteDeadTimestamp)) {
586 externalEvents.add(new EventuallyConsistentMapEvent<>(
587 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
588 }
589 }
590 }
591
592 // Send all updates to the peer at once
593 if (!updatesToSend.isEmpty()) {
594 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800595 unicastMessage(sender, updateMessageSubject,
596 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800597 } catch (IOException e) {
598 log.warn("Failed to send advertisement response", e);
599 }
600 }
601
602 return externalEvents;
603 }
604
605 /**
606 * Checks if any items in the remote live list are out of date according
607 * to our tombstone list. If we find we have a more up to date tombstone,
608 * we'll send it to the remote.
609 *
610 * @param ad remote anti-entropy advertisement
611 */
612 // Guarded by synchronized (this)
613 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
614 final NodeId sender = ad.sender();
615
616 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
617
618 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
619 K key = dead.getKey();
620 Timestamp localDeadTimestamp = dead.getValue();
621
622 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
623 if (remoteLiveTimestamp != null
624 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
625 // sender has zombie, push remove
626 removesToSend
627 .add(new RemoveEntry<>(key, localDeadTimestamp));
628 }
629 }
630
631 // Send all removes to the peer at once
632 if (!removesToSend.isEmpty()) {
633 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800634 unicastMessage(sender, removeMessageSubject,
635 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800636 } catch (IOException e) {
637 log.warn("Failed to send advertisement response", e);
638 }
639 }
640 }
641
642 /**
643 * Checks if any of the local live items are out of date according to the
644 * remote's tombstone advertisements. If we find a local item is out of date,
645 * we'll apply the remove operation to the local state.
646 *
647 * @param ad remote anti-entropy advertisement
648 * @return list of external events relating to local operations performed
649 */
650 // Guarded by synchronized (this)
651 private List<EventuallyConsistentMapEvent<K, V>>
652 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
653 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
654 = new LinkedList<>();
655
656 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
657 K key = remoteDead.getKey();
658 Timestamp remoteDeadTimestamp = remoteDead.getValue();
659
660 Timestamped<V> local = items.get(key);
661 Timestamp localDead = removedItems.get(key);
662 if (local != null
663 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
664 // remove our version
665 if (removeInternal(key, remoteDeadTimestamp)) {
666 externalEvents.add(new EventuallyConsistentMapEvent<>(
667 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
668 }
669 } else if (localDead != null &&
670 remoteDeadTimestamp.compareTo(localDead) > 0) {
671 // If we both had the item as removed, but their timestamp is
672 // newer, update ours to the newer value
673 removedItems.put(key, remoteDeadTimestamp);
674 }
675 }
676
677 return externalEvents;
678 }
679
680 private final class InternalAntiEntropyListener
681 implements ClusterMessageHandler {
682
683 @Override
684 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800685 log.trace("Received anti-entropy advertisement from peer: {}",
686 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800687 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
688 backgroundExecutor.submit(() -> {
689 try {
690 handleAntiEntropyAdvertisement(advertisement);
691 } catch (Exception e) {
692 log.warn("Exception thrown handling advertisements", e);
693 }
694 });
695 }
696 }
697
Jonathan Hartdb3af892015-01-26 13:19:07 -0800698 private final class InternalPutEventListener implements
699 ClusterMessageHandler {
700 @Override
701 public void handle(ClusterMessage message) {
702 log.debug("Received put event from peer: {}", message.sender());
703 InternalPutEvent<K, V> event = serializer.decode(message.payload());
704
705 executor.submit(() -> {
706 try {
707 for (PutEntry<K, V> entry : event.entries()) {
708 K key = entry.key();
709 V value = entry.value();
710 Timestamp timestamp = entry.timestamp();
711
712 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800713 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800714 new EventuallyConsistentMapEvent<>(
715 EventuallyConsistentMapEvent.Type.PUT, key,
716 value);
717 notifyListeners(externalEvent);
718 }
719 }
720 } catch (Exception e) {
721 log.warn("Exception thrown handling put", e);
722 }
723 });
724 }
725 }
726
727 private final class InternalRemoveEventListener implements
728 ClusterMessageHandler {
729 @Override
730 public void handle(ClusterMessage message) {
731 log.debug("Received remove event from peer: {}", message.sender());
732 InternalRemoveEvent<K> event = serializer.decode(message.payload());
733
734 executor.submit(() -> {
735 try {
736 for (RemoveEntry<K> entry : event.entries()) {
737 K key = entry.key();
738 Timestamp timestamp = entry.timestamp();
739
740 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800741 EventuallyConsistentMapEvent<K, V> externalEvent
742 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800743 EventuallyConsistentMapEvent.Type.REMOVE,
744 key, null);
745 notifyListeners(externalEvent);
746 }
747 }
748 } catch (Exception e) {
749 log.warn("Exception thrown handling remove", e);
750 }
751 });
752 }
753 }
754
Jonathan Hartdb3af892015-01-26 13:19:07 -0800755}