blob: 51e5147f98d9f8cf2c987acf654836817a26bad5 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
Jonathan Hartf9108232015-02-02 16:37:35 -080019import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080020import org.onlab.util.KryoNamespace;
21import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.NodeId;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080029import org.onosproject.store.impl.ClockService;
30import org.onosproject.store.impl.Timestamped;
31import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.serializers.KryoSerializer;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
36import java.io.IOException;
37import java.util.ArrayList;
38import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080039import java.util.HashMap;
40import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080041import java.util.List;
42import java.util.Map;
43import java.util.Set;
44import java.util.concurrent.ConcurrentHashMap;
45import java.util.concurrent.CopyOnWriteArraySet;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.ScheduledExecutorService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080049import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.stream.Collectors;
51
52import static com.google.common.base.Preconditions.checkNotNull;
53import static com.google.common.base.Preconditions.checkState;
54import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080055import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080056import static org.onlab.util.Tools.minPriority;
Jonathan Hartdb3af892015-01-26 13:19:07 -080057
58/**
59 * Distributed Map implementation which uses optimistic replication and gossip
60 * based techniques to provide an eventually consistent data store.
61 */
62public class EventuallyConsistentMapImpl<K, V>
63 implements EventuallyConsistentMap<K, V> {
64
65 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
66
67 private final Map<K, Timestamped<V>> items;
68 private final Map<K, Timestamp> removedItems;
69
70 private final String mapName;
71 private final ClusterService clusterService;
72 private final ClusterCommunicationService clusterCommunicator;
73 private final KryoSerializer serializer;
74
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080075 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080076
77 private final MessageSubject updateMessageSubject;
78 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080079 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080080
Jonathan Hartaaa56572015-01-28 21:56:35 -080081 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080082 = new CopyOnWriteArraySet<>();
83
84 private final ExecutorService executor;
85
86 private final ScheduledExecutorService backgroundExecutor;
87
Madan Jampanib28e4ad2015-02-19 12:31:37 -080088 private ExecutorService broadcastMessageExecutor;
Madan Jampani28726282015-02-19 11:40:23 -080089
Jonathan Hartdb3af892015-01-26 13:19:07 -080090 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080091 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080092
Jonathan Hart4f397e82015-02-04 09:10:41 -080093 private static final String ERROR_NULL_KEY = "Key cannot be null";
94 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
95
Jonathan Hartdb3af892015-01-26 13:19:07 -080096 // TODO: Make these anti-entropy params configurable
97 private long initialDelaySec = 5;
98 private long periodSec = 5;
99
100 /**
101 * Creates a new eventually consistent map shared amongst multiple instances.
102 *
103 * Each map is identified by a string map name. EventuallyConsistentMapImpl
104 * objects in different JVMs that use the same map name will form a
105 * distributed map across JVMs (provided the cluster service is aware of
106 * both nodes).
107 *
108 * The client is expected to provide an
109 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
110 * will be stored in this map have been registered (including referenced
111 * classes). This serializer will be used to serialize both K and V for
112 * inter-node notifications.
113 *
114 * The client must provide an {@link org.onosproject.store.impl.ClockService}
115 * which can generate timestamps for a given key. The clock service is free
116 * to generate timestamps however it wishes, however these timestamps will
117 * be used to serialize updates to the map so they must be strict enough
118 * to ensure updates are properly ordered for the use case (i.e. in some
119 * cases wallclock time will suffice, whereas in other cases logical time
120 * will be necessary).
121 *
122 * @param mapName a String identifier for the map.
123 * @param clusterService the cluster service
124 * @param clusterCommunicator the cluster communications service
125 * @param serializerBuilder a Kryo namespace builder that can serialize
126 * both K and V
127 * @param clockService a clock service able to generate timestamps
128 * for K
129 */
130 public EventuallyConsistentMapImpl(String mapName,
131 ClusterService clusterService,
132 ClusterCommunicationService clusterCommunicator,
133 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800134 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800135
136 this.mapName = checkNotNull(mapName);
137 this.clusterService = checkNotNull(clusterService);
138 this.clusterCommunicator = checkNotNull(clusterCommunicator);
139
140 serializer = createSerializer(checkNotNull(serializerBuilder));
141
142 this.clockService = checkNotNull(clockService);
143
144 items = new ConcurrentHashMap<>();
145 removedItems = new ConcurrentHashMap<>();
146
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800147 executor = Executors //FIXME
148 .newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800149
Madan Jampani28726282015-02-19 11:40:23 -0800150 broadcastMessageExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
151
Jonathan Hartdb3af892015-01-26 13:19:07 -0800152 backgroundExecutor =
153 newSingleThreadScheduledExecutor(minPriority(
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800154 groupedThreads("onos/ecm", mapName + "-bg-%d")));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800155
Jonathan Hartaaa56572015-01-28 21:56:35 -0800156 // start anti-entropy thread
157 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
158 initialDelaySec, periodSec,
159 TimeUnit.SECONDS);
160
Jonathan Hartdb3af892015-01-26 13:19:07 -0800161 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
162 clusterCommunicator.addSubscriber(updateMessageSubject,
163 new InternalPutEventListener());
164 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
165 clusterCommunicator.addSubscriber(removeMessageSubject,
166 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800167 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
168 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
169 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800170 }
171
172 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
173 return new KryoSerializer() {
174 @Override
175 protected void setupKryoPool() {
176 // Add the map's internal helper classes to the user-supplied serializer
177 serializerPool = builder
178 .register(WallClockTimestamp.class)
179 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800180 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800181 .register(ArrayList.class)
182 .register(InternalPutEvent.class)
183 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800184 .register(AntiEntropyAdvertisement.class)
185 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800186 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800187 }
188 };
189 }
190
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800191 /**
192 * Sets the executor to use for broadcasting messages and returns this
193 * instance for method chaining.
194 * @param executor executor service
195 * @return this instance
196 */
197 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
198 checkNotNull(executor, "Null executor");
199 broadcastMessageExecutor = executor;
200 return this;
201 }
202
Jonathan Hartdb3af892015-01-26 13:19:07 -0800203 @Override
204 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800205 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800206 return items.size();
207 }
208
209 @Override
210 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800211 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800212 return items.isEmpty();
213 }
214
215 @Override
216 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800217 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800218 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800219 return items.containsKey(key);
220 }
221
222 @Override
223 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800224 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800225 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800226
227 return items.values().stream()
228 .anyMatch(timestamped -> timestamped.value().equals(value));
229 }
230
231 @Override
232 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800233 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800234 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235
236 Timestamped<V> value = items.get(key);
237 if (value != null) {
238 return value.value();
239 }
240 return null;
241 }
242
243 @Override
244 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800245 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800246 checkNotNull(key, ERROR_NULL_KEY);
247 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800248
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800249 Timestamp timestamp = clockService.getTimestamp(key, value);
250
Jonathan Hartdb3af892015-01-26 13:19:07 -0800251 if (putInternal(key, value, timestamp)) {
252 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
253 EventuallyConsistentMapEvent<K, V> externalEvent
254 = new EventuallyConsistentMapEvent<>(
255 EventuallyConsistentMapEvent.Type.PUT, key, value);
256 notifyListeners(externalEvent);
257 }
258 }
259
260 private boolean putInternal(K key, V value, Timestamp timestamp) {
261 synchronized (this) {
262 Timestamp removed = removedItems.get(key);
263 if (removed != null && removed.compareTo(timestamp) > 0) {
Jonathan Hart07e58be2015-02-12 09:57:16 -0800264 log.debug("ecmap - removed was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800265 return false;
266 }
267
268 Timestamped<V> existing = items.get(key);
269 if (existing != null && existing.isNewer(timestamp)) {
Jonathan Hart07e58be2015-02-12 09:57:16 -0800270 log.debug("ecmap - existing was newer {}", value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800271 return false;
272 } else {
273 items.put(key, new Timestamped<>(value, timestamp));
274 removedItems.remove(key);
275 return true;
276 }
277 }
278 }
279
280 @Override
281 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800282 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800283 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800284
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800285 // TODO prevent calls here if value is important for timestamp
286 Timestamp timestamp = clockService.getTimestamp(key, null);
287
Jonathan Hartdb3af892015-01-26 13:19:07 -0800288 if (removeInternal(key, timestamp)) {
289 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
290 EventuallyConsistentMapEvent<K, V> externalEvent
291 = new EventuallyConsistentMapEvent<>(
292 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
293 notifyListeners(externalEvent);
294 }
295 }
296
297 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800298 Timestamped<V> value = items.get(key);
299 if (value != null) {
300 if (value.isNewer(timestamp)) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800301 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800302 } else {
303 items.remove(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800304 }
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800305 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800306
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800307 Timestamp removedTimestamp = removedItems.get(key);
308 if (removedTimestamp == null) {
309 return removedItems.putIfAbsent(key, timestamp) == null;
310 } else if (timestamp.compareTo(removedTimestamp) > 0) {
311 return removedItems.replace(key, removedTimestamp, timestamp);
312 } else {
313 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800314 }
315 }
316
317 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800318 public void remove(K key, V value) {
319 checkState(!destroyed, mapName + ERROR_DESTROYED);
320 checkNotNull(key, ERROR_NULL_KEY);
321 checkNotNull(value, ERROR_NULL_VALUE);
322
323 Timestamp timestamp = clockService.getTimestamp(key, value);
324
325 if (removeInternal(key, timestamp)) {
326 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
327 EventuallyConsistentMapEvent<K, V> externalEvent
328 = new EventuallyConsistentMapEvent<>(
329 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
330 notifyListeners(externalEvent);
331 }
332 }
333
334 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800335 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800336 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800337
338 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
339
340 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
341 K key = entry.getKey();
342 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800343
344 checkNotNull(key, ERROR_NULL_KEY);
345 checkNotNull(value, ERROR_NULL_VALUE);
346
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800347 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800348
349 if (putInternal(key, value, timestamp)) {
350 updates.add(new PutEntry<>(key, value, timestamp));
351 }
352 }
353
Jonathan Hart584d2f32015-01-27 19:46:14 -0800354 if (!updates.isEmpty()) {
355 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800356
Jonathan Hart584d2f32015-01-27 19:46:14 -0800357 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800358 EventuallyConsistentMapEvent<K, V> externalEvent =
359 new EventuallyConsistentMapEvent<>(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800360 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
361 entry.value());
362 notifyListeners(externalEvent);
363 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800364 }
365 }
366
367 @Override
368 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800369 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800370
371 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
372
373 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800374 // TODO also this is not applicable if value is important for timestamp?
375 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800376
377 if (removeInternal(key, timestamp)) {
378 removed.add(new RemoveEntry<>(key, timestamp));
379 }
380 }
381
Jonathan Hart584d2f32015-01-27 19:46:14 -0800382 if (!removed.isEmpty()) {
383 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800384
Jonathan Hart584d2f32015-01-27 19:46:14 -0800385 for (RemoveEntry<K> entry : removed) {
386 EventuallyConsistentMapEvent<K, V> externalEvent
387 = new EventuallyConsistentMapEvent<>(
388 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
389 null);
390 notifyListeners(externalEvent);
391 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800392 }
393 }
394
395 @Override
396 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800397 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800398
399 return items.keySet();
400 }
401
402 @Override
403 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800404 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800405
406 return items.values().stream()
407 .map(Timestamped::value)
408 .collect(Collectors.toList());
409 }
410
411 @Override
412 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800413 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800414
415 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800416 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800417 .collect(Collectors.toSet());
418 }
419
420 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800421 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
422 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800423
424 listeners.add(checkNotNull(listener));
425 }
426
427 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800428 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
429 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800430
431 listeners.remove(checkNotNull(listener));
432 }
433
434 @Override
435 public void destroy() {
436 destroyed = true;
437
438 executor.shutdown();
439 backgroundExecutor.shutdown();
Madan Jampani337bb442015-02-19 14:29:18 -0800440 broadcastMessageExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800441
Jonathan Hart584d2f32015-01-27 19:46:14 -0800442 listeners.clear();
443
Jonathan Hartdb3af892015-01-26 13:19:07 -0800444 clusterCommunicator.removeSubscriber(updateMessageSubject);
445 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800446 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800447 }
448
Jonathan Hartaaa56572015-01-28 21:56:35 -0800449 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
450 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800451 listener.event(event);
452 }
453 }
454
455 private void notifyPeers(InternalPutEvent event) {
Madan Jampani337bb442015-02-19 14:29:18 -0800456 broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800457 }
458
459 private void notifyPeers(InternalRemoveEvent event) {
Madan Jampani337bb442015-02-19 14:29:18 -0800460 broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800461 }
462
Jonathan Hart7d656f42015-01-27 14:07:23 -0800463 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800464 ClusterMessage message = new ClusterMessage(
465 clusterService.getLocalNode().id(),
466 subject,
467 serializer.encode(event));
Madan Jampani337bb442015-02-19 14:29:18 -0800468 clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800469 }
470
471 private void unicastMessage(NodeId peer,
472 MessageSubject subject,
473 Object event) throws IOException {
474 ClusterMessage message = new ClusterMessage(
475 clusterService.getLocalNode().id(),
476 subject,
477 serializer.encode(event));
478 clusterCommunicator.unicast(message, peer);
479 }
480
Jonathan Hartaaa56572015-01-28 21:56:35 -0800481 private final class SendAdvertisementTask implements Runnable {
482 @Override
483 public void run() {
484 if (Thread.currentThread().isInterrupted()) {
485 log.info("Interrupted, quitting");
486 return;
487 }
488
489 try {
490 final NodeId self = clusterService.getLocalNode().id();
491 Set<ControllerNode> nodes = clusterService.getNodes();
492
493 List<NodeId> nodeIds = nodes.stream()
494 .map(node -> node.id())
495 .collect(Collectors.toList());
496
497 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
498 log.trace("No other peers in the cluster.");
499 return;
500 }
501
502 NodeId peer;
503 do {
504 int idx = RandomUtils.nextInt(0, nodeIds.size());
505 peer = nodeIds.get(idx);
506 } while (peer.equals(self));
507
508 if (Thread.currentThread().isInterrupted()) {
509 log.info("Interrupted, quitting");
510 return;
511 }
512
513 AntiEntropyAdvertisement<K> ad = createAdvertisement();
514
515 try {
516 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
517 } catch (IOException e) {
518 log.debug("Failed to send anti-entropy advertisement to {}", peer);
519 }
520 } catch (Exception e) {
521 // Catch all exceptions to avoid scheduled task being suppressed.
522 log.error("Exception thrown while sending advertisement", e);
523 }
524 }
525 }
526
527 private AntiEntropyAdvertisement<K> createAdvertisement() {
528 final NodeId self = clusterService.getLocalNode().id();
529
530 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
531
532 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
533
534 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
535
536 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
537 }
538
539 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
540 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
Jonathan Hart3469e602015-02-19 13:50:27 -0800541 boolean sync = false;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800542
543 synchronized (this) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800544 externalEvents = antiEntropyCheckLocalItems(ad);
545
546 antiEntropyCheckLocalRemoved(ad);
547
548 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
549
550 // if remote ad has something unknown, actively sync
551 for (K key : ad.timestamps().keySet()) {
552 if (!items.containsKey(key)) {
Jonathan Hart3469e602015-02-19 13:50:27 -0800553 sync = true;
554 break;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800555 }
556 }
557 } // synchronized (this)
558
Jonathan Hart3469e602015-02-19 13:50:27 -0800559 // Send the advertisement outside the synchronized block
560 if (sync) {
561 final NodeId sender = ad.sender();
562 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
563 try {
564 unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
565 } catch (IOException e) {
566 log.debug(
567 "Failed to send reactive anti-entropy advertisement to {}",
568 sender);
569 }
570 }
571
Jonathan Hartaaa56572015-01-28 21:56:35 -0800572 externalEvents.forEach(this::notifyListeners);
573 }
574
575 /**
576 * Checks if any of the remote's live items or tombstones are out of date
577 * according to our local live item list, or if our live items are out of
578 * date according to the remote's tombstone list.
579 * If the local copy is more recent, it will be pushed to the remote. If the
580 * remote has a more recent remove, we apply that to the local state.
581 *
582 * @param ad remote anti-entropy advertisement
583 * @return list of external events relating to local operations performed
584 */
585 // Guarded by synchronized (this)
586 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
587 AntiEntropyAdvertisement<K> ad) {
588 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
589 = new LinkedList<>();
590 final NodeId sender = ad.sender();
591
592 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
593
594 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
595 K key = item.getKey();
596 Timestamped<V> localValue = item.getValue();
597
598 Timestamp remoteTimestamp = ad.timestamps().get(key);
599 if (remoteTimestamp == null) {
600 remoteTimestamp = ad.tombstones().get(key);
601 }
602 if (remoteTimestamp == null || localValue
603 .isNewer(remoteTimestamp)) {
604 // local value is more recent, push to sender
605 updatesToSend
606 .add(new PutEntry<>(key, localValue.value(),
607 localValue.timestamp()));
608 }
609
610 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
611 if (remoteDeadTimestamp != null &&
612 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
613 // sender has a more recent remove
614 if (removeInternal(key, remoteDeadTimestamp)) {
615 externalEvents.add(new EventuallyConsistentMapEvent<>(
616 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
617 }
618 }
619 }
620
621 // Send all updates to the peer at once
622 if (!updatesToSend.isEmpty()) {
623 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800624 unicastMessage(sender, updateMessageSubject,
625 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800626 } catch (IOException e) {
627 log.warn("Failed to send advertisement response", e);
628 }
629 }
630
631 return externalEvents;
632 }
633
634 /**
635 * Checks if any items in the remote live list are out of date according
636 * to our tombstone list. If we find we have a more up to date tombstone,
637 * we'll send it to the remote.
638 *
639 * @param ad remote anti-entropy advertisement
640 */
641 // Guarded by synchronized (this)
642 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
643 final NodeId sender = ad.sender();
644
645 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
646
647 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
648 K key = dead.getKey();
649 Timestamp localDeadTimestamp = dead.getValue();
650
651 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
652 if (remoteLiveTimestamp != null
653 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
654 // sender has zombie, push remove
655 removesToSend
656 .add(new RemoveEntry<>(key, localDeadTimestamp));
657 }
658 }
659
660 // Send all removes to the peer at once
661 if (!removesToSend.isEmpty()) {
662 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800663 unicastMessage(sender, removeMessageSubject,
664 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800665 } catch (IOException e) {
666 log.warn("Failed to send advertisement response", e);
667 }
668 }
669 }
670
671 /**
672 * Checks if any of the local live items are out of date according to the
673 * remote's tombstone advertisements. If we find a local item is out of date,
674 * we'll apply the remove operation to the local state.
675 *
676 * @param ad remote anti-entropy advertisement
677 * @return list of external events relating to local operations performed
678 */
679 // Guarded by synchronized (this)
680 private List<EventuallyConsistentMapEvent<K, V>>
681 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
682 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
683 = new LinkedList<>();
684
685 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
686 K key = remoteDead.getKey();
687 Timestamp remoteDeadTimestamp = remoteDead.getValue();
688
689 Timestamped<V> local = items.get(key);
690 Timestamp localDead = removedItems.get(key);
691 if (local != null
692 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
693 // remove our version
694 if (removeInternal(key, remoteDeadTimestamp)) {
695 externalEvents.add(new EventuallyConsistentMapEvent<>(
696 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
697 }
698 } else if (localDead != null &&
699 remoteDeadTimestamp.compareTo(localDead) > 0) {
700 // If we both had the item as removed, but their timestamp is
701 // newer, update ours to the newer value
702 removedItems.put(key, remoteDeadTimestamp);
703 }
704 }
705
706 return externalEvents;
707 }
708
709 private final class InternalAntiEntropyListener
710 implements ClusterMessageHandler {
711
712 @Override
713 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800714 log.trace("Received anti-entropy advertisement from peer: {}",
715 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800716 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
717 backgroundExecutor.submit(() -> {
718 try {
719 handleAntiEntropyAdvertisement(advertisement);
720 } catch (Exception e) {
721 log.warn("Exception thrown handling advertisements", e);
722 }
723 });
724 }
725 }
726
Jonathan Hartdb3af892015-01-26 13:19:07 -0800727 private final class InternalPutEventListener implements
728 ClusterMessageHandler {
729 @Override
730 public void handle(ClusterMessage message) {
731 log.debug("Received put event from peer: {}", message.sender());
732 InternalPutEvent<K, V> event = serializer.decode(message.payload());
733
734 executor.submit(() -> {
735 try {
736 for (PutEntry<K, V> entry : event.entries()) {
737 K key = entry.key();
738 V value = entry.value();
739 Timestamp timestamp = entry.timestamp();
740
741 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800742 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800743 new EventuallyConsistentMapEvent<>(
744 EventuallyConsistentMapEvent.Type.PUT, key,
745 value);
746 notifyListeners(externalEvent);
747 }
748 }
749 } catch (Exception e) {
750 log.warn("Exception thrown handling put", e);
751 }
752 });
753 }
754 }
755
756 private final class InternalRemoveEventListener implements
757 ClusterMessageHandler {
758 @Override
759 public void handle(ClusterMessage message) {
760 log.debug("Received remove event from peer: {}", message.sender());
761 InternalRemoveEvent<K> event = serializer.decode(message.payload());
762
763 executor.submit(() -> {
764 try {
765 for (RemoveEntry<K> entry : event.entries()) {
766 K key = entry.key();
767 Timestamp timestamp = entry.timestamp();
768
769 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800770 EventuallyConsistentMapEvent<K, V> externalEvent
771 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800772 EventuallyConsistentMapEvent.Type.REMOVE,
773 key, null);
774 notifyListeners(externalEvent);
775 }
776 }
777 } catch (Exception e) {
778 log.warn("Exception thrown handling remove", e);
779 }
780 });
781 }
782 }
783
Jonathan Hartdb3af892015-01-26 13:19:07 -0800784}