blob: 4404d98ef544fe68b49029eaa2d727ccbe09de8b [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
Jonathan Hartf9108232015-02-02 16:37:35 -080019import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080020import org.onlab.util.KryoNamespace;
21import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.NodeId;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080029import org.onosproject.store.impl.ClockService;
30import org.onosproject.store.impl.Timestamped;
31import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.serializers.KryoSerializer;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
36import java.io.IOException;
37import java.util.ArrayList;
38import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080039import java.util.HashMap;
40import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080041import java.util.List;
42import java.util.Map;
43import java.util.Set;
44import java.util.concurrent.ConcurrentHashMap;
45import java.util.concurrent.CopyOnWriteArraySet;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.ScheduledExecutorService;
49import java.util.stream.Collectors;
50
51import static com.google.common.base.Preconditions.checkNotNull;
52import static com.google.common.base.Preconditions.checkState;
53import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080054import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080055import static org.onlab.util.Tools.minPriority;
Jonathan Hartdb3af892015-01-26 13:19:07 -080056
57/**
58 * Distributed Map implementation which uses optimistic replication and gossip
59 * based techniques to provide an eventually consistent data store.
60 */
61public class EventuallyConsistentMapImpl<K, V>
62 implements EventuallyConsistentMap<K, V> {
63
64 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
65
66 private final Map<K, Timestamped<V>> items;
67 private final Map<K, Timestamp> removedItems;
68
69 private final String mapName;
70 private final ClusterService clusterService;
71 private final ClusterCommunicationService clusterCommunicator;
72 private final KryoSerializer serializer;
73
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080074 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080075
76 private final MessageSubject updateMessageSubject;
77 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080078 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080079
Jonathan Hartaaa56572015-01-28 21:56:35 -080080 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080081 = new CopyOnWriteArraySet<>();
82
83 private final ExecutorService executor;
84
85 private final ScheduledExecutorService backgroundExecutor;
86
Madan Jampanib28e4ad2015-02-19 12:31:37 -080087 private ExecutorService broadcastMessageExecutor;
Madan Jampani28726282015-02-19 11:40:23 -080088
Jonathan Hartdb3af892015-01-26 13:19:07 -080089 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080090 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080091
Jonathan Hart4f397e82015-02-04 09:10:41 -080092 private static final String ERROR_NULL_KEY = "Key cannot be null";
93 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
94
Jonathan Hartdb3af892015-01-26 13:19:07 -080095 // TODO: Make these anti-entropy params configurable
96 private long initialDelaySec = 5;
97 private long periodSec = 5;
98
99 /**
100 * Creates a new eventually consistent map shared amongst multiple instances.
101 *
102 * Each map is identified by a string map name. EventuallyConsistentMapImpl
103 * objects in different JVMs that use the same map name will form a
104 * distributed map across JVMs (provided the cluster service is aware of
105 * both nodes).
106 *
107 * The client is expected to provide an
108 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
109 * will be stored in this map have been registered (including referenced
110 * classes). This serializer will be used to serialize both K and V for
111 * inter-node notifications.
112 *
113 * The client must provide an {@link org.onosproject.store.impl.ClockService}
114 * which can generate timestamps for a given key. The clock service is free
115 * to generate timestamps however it wishes, however these timestamps will
116 * be used to serialize updates to the map so they must be strict enough
117 * to ensure updates are properly ordered for the use case (i.e. in some
118 * cases wallclock time will suffice, whereas in other cases logical time
119 * will be necessary).
120 *
121 * @param mapName a String identifier for the map.
122 * @param clusterService the cluster service
123 * @param clusterCommunicator the cluster communications service
124 * @param serializerBuilder a Kryo namespace builder that can serialize
125 * both K and V
126 * @param clockService a clock service able to generate timestamps
127 * for K
128 */
129 public EventuallyConsistentMapImpl(String mapName,
130 ClusterService clusterService,
131 ClusterCommunicationService clusterCommunicator,
132 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800133 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800134
135 this.mapName = checkNotNull(mapName);
136 this.clusterService = checkNotNull(clusterService);
137 this.clusterCommunicator = checkNotNull(clusterCommunicator);
138
139 serializer = createSerializer(checkNotNull(serializerBuilder));
140
141 this.clockService = checkNotNull(clockService);
142
143 items = new ConcurrentHashMap<>();
144 removedItems = new ConcurrentHashMap<>();
145
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800146 executor = Executors //FIXME
147 .newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800148
Madan Jampani28726282015-02-19 11:40:23 -0800149 broadcastMessageExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
150
Jonathan Hartdb3af892015-01-26 13:19:07 -0800151 backgroundExecutor =
152 newSingleThreadScheduledExecutor(minPriority(
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800153 groupedThreads("onos/ecm", mapName + "-bg-%d")));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800154
Jonathan Hartaaa56572015-01-28 21:56:35 -0800155 // start anti-entropy thread
Brian O'Connor8fc739a2015-02-19 22:31:44 -0800156 //FIXME need to re-enable
157// backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
158// initialDelaySec, periodSec,
159// TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800160
Jonathan Hartdb3af892015-01-26 13:19:07 -0800161 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
162 clusterCommunicator.addSubscriber(updateMessageSubject,
163 new InternalPutEventListener());
164 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
165 clusterCommunicator.addSubscriber(removeMessageSubject,
166 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800167 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
168 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
169 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800170 }
171
172 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
173 return new KryoSerializer() {
174 @Override
175 protected void setupKryoPool() {
176 // Add the map's internal helper classes to the user-supplied serializer
177 serializerPool = builder
178 .register(WallClockTimestamp.class)
179 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800180 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800181 .register(ArrayList.class)
182 .register(InternalPutEvent.class)
183 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800184 .register(AntiEntropyAdvertisement.class)
185 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800186 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800187 }
188 };
189 }
190
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800191 /**
192 * Sets the executor to use for broadcasting messages and returns this
193 * instance for method chaining.
194 * @param executor executor service
195 * @return this instance
196 */
197 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
198 checkNotNull(executor, "Null executor");
199 broadcastMessageExecutor = executor;
200 return this;
201 }
202
Jonathan Hartdb3af892015-01-26 13:19:07 -0800203 @Override
204 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800205 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800206 return items.size();
207 }
208
209 @Override
210 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800211 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800212 return items.isEmpty();
213 }
214
215 @Override
216 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800217 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800218 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800219 return items.containsKey(key);
220 }
221
222 @Override
223 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800224 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800225 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800226
227 return items.values().stream()
228 .anyMatch(timestamped -> timestamped.value().equals(value));
229 }
230
231 @Override
232 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800233 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800234 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800235
236 Timestamped<V> value = items.get(key);
237 if (value != null) {
238 return value.value();
239 }
240 return null;
241 }
242
243 @Override
244 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800245 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800246 checkNotNull(key, ERROR_NULL_KEY);
247 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800248
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800249 Timestamp timestamp = clockService.getTimestamp(key, value);
250
Jonathan Hartdb3af892015-01-26 13:19:07 -0800251 if (putInternal(key, value, timestamp)) {
252 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
253 EventuallyConsistentMapEvent<K, V> externalEvent
254 = new EventuallyConsistentMapEvent<>(
255 EventuallyConsistentMapEvent.Type.PUT, key, value);
256 notifyListeners(externalEvent);
257 }
258 }
259
260 private boolean putInternal(K key, V value, Timestamp timestamp) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800261 Timestamp removed = removedItems.get(key);
262 if (removed != null && removed.compareTo(timestamp) > 0) {
263 log.debug("ecmap - removed was newer {}", value);
264 return false;
265 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800266
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800267 boolean success;
268 synchronized (this) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800269 Timestamped<V> existing = items.get(key);
270 if (existing != null && existing.isNewer(timestamp)) {
Jonathan Hart07e58be2015-02-12 09:57:16 -0800271 log.debug("ecmap - existing was newer {}", value);
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800272 success = false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800273 } else {
274 items.put(key, new Timestamped<>(value, timestamp));
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800275 success = true;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800276 }
277 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800278
279 if (success && removed != null) {
280 removedItems.remove(key, removed);
281 }
282 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800283 }
284
285 @Override
286 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800287 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800288 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800289
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800290 // TODO prevent calls here if value is important for timestamp
291 Timestamp timestamp = clockService.getTimestamp(key, null);
292
Jonathan Hartdb3af892015-01-26 13:19:07 -0800293 if (removeInternal(key, timestamp)) {
294 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
295 EventuallyConsistentMapEvent<K, V> externalEvent
296 = new EventuallyConsistentMapEvent<>(
297 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
298 notifyListeners(externalEvent);
299 }
300 }
301
302 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800303 Timestamped<V> value = items.get(key);
304 if (value != null) {
305 if (value.isNewer(timestamp)) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800306 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800307 } else {
308 items.remove(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309 }
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800310 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800311
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800312 Timestamp removedTimestamp = removedItems.get(key);
313 if (removedTimestamp == null) {
314 return removedItems.putIfAbsent(key, timestamp) == null;
315 } else if (timestamp.compareTo(removedTimestamp) > 0) {
316 return removedItems.replace(key, removedTimestamp, timestamp);
317 } else {
318 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800319 }
320 }
321
322 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800323 public void remove(K key, V value) {
324 checkState(!destroyed, mapName + ERROR_DESTROYED);
325 checkNotNull(key, ERROR_NULL_KEY);
326 checkNotNull(value, ERROR_NULL_VALUE);
327
328 Timestamp timestamp = clockService.getTimestamp(key, value);
329
330 if (removeInternal(key, timestamp)) {
331 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
332 EventuallyConsistentMapEvent<K, V> externalEvent
333 = new EventuallyConsistentMapEvent<>(
334 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
335 notifyListeners(externalEvent);
336 }
337 }
338
339 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800340 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800341 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800342
343 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
344
345 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
346 K key = entry.getKey();
347 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800348
349 checkNotNull(key, ERROR_NULL_KEY);
350 checkNotNull(value, ERROR_NULL_VALUE);
351
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800352 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800353
354 if (putInternal(key, value, timestamp)) {
355 updates.add(new PutEntry<>(key, value, timestamp));
356 }
357 }
358
Jonathan Hart584d2f32015-01-27 19:46:14 -0800359 if (!updates.isEmpty()) {
360 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800361
Jonathan Hart584d2f32015-01-27 19:46:14 -0800362 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800363 EventuallyConsistentMapEvent<K, V> externalEvent =
364 new EventuallyConsistentMapEvent<>(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800365 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
366 entry.value());
367 notifyListeners(externalEvent);
368 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800369 }
370 }
371
372 @Override
373 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800374 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800375
376 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
377
378 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800379 // TODO also this is not applicable if value is important for timestamp?
380 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800381
382 if (removeInternal(key, timestamp)) {
383 removed.add(new RemoveEntry<>(key, timestamp));
384 }
385 }
386
Jonathan Hart584d2f32015-01-27 19:46:14 -0800387 if (!removed.isEmpty()) {
388 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800389
Jonathan Hart584d2f32015-01-27 19:46:14 -0800390 for (RemoveEntry<K> entry : removed) {
391 EventuallyConsistentMapEvent<K, V> externalEvent
392 = new EventuallyConsistentMapEvent<>(
393 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
394 null);
395 notifyListeners(externalEvent);
396 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800397 }
398 }
399
400 @Override
401 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800402 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800403
404 return items.keySet();
405 }
406
407 @Override
408 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800409 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800410
411 return items.values().stream()
412 .map(Timestamped::value)
413 .collect(Collectors.toList());
414 }
415
416 @Override
417 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800418 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800419
420 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800421 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800422 .collect(Collectors.toSet());
423 }
424
425 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800426 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
427 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800428
429 listeners.add(checkNotNull(listener));
430 }
431
432 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800433 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
434 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800435
436 listeners.remove(checkNotNull(listener));
437 }
438
439 @Override
440 public void destroy() {
441 destroyed = true;
442
443 executor.shutdown();
444 backgroundExecutor.shutdown();
Madan Jampani337bb442015-02-19 14:29:18 -0800445 broadcastMessageExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800446
Jonathan Hart584d2f32015-01-27 19:46:14 -0800447 listeners.clear();
448
Jonathan Hartdb3af892015-01-26 13:19:07 -0800449 clusterCommunicator.removeSubscriber(updateMessageSubject);
450 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800451 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800452 }
453
Jonathan Hartaaa56572015-01-28 21:56:35 -0800454 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
455 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800456 listener.event(event);
457 }
458 }
459
460 private void notifyPeers(InternalPutEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800461 // FIXME extremely memory expensive when we are overrun
462// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
463 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800464 }
465
466 private void notifyPeers(InternalRemoveEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800467 // FIXME extremely memory expensive when we are overrun
468// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
469 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800470 }
471
Jonathan Hart7d656f42015-01-27 14:07:23 -0800472 private void broadcastMessage(MessageSubject subject, Object event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800473 // FIXME can we parallelize the serialization... use the caller???
Jonathan Hartdb3af892015-01-26 13:19:07 -0800474 ClusterMessage message = new ClusterMessage(
475 clusterService.getLocalNode().id(),
476 subject,
477 serializer.encode(event));
Brian O'Connorb2894222015-02-20 22:05:19 -0800478 //broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
Madan Jampani337bb442015-02-19 14:29:18 -0800479 clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800480 }
481
482 private void unicastMessage(NodeId peer,
483 MessageSubject subject,
484 Object event) throws IOException {
485 ClusterMessage message = new ClusterMessage(
486 clusterService.getLocalNode().id(),
487 subject,
488 serializer.encode(event));
489 clusterCommunicator.unicast(message, peer);
490 }
491
Jonathan Hartaaa56572015-01-28 21:56:35 -0800492 private final class SendAdvertisementTask implements Runnable {
493 @Override
494 public void run() {
495 if (Thread.currentThread().isInterrupted()) {
496 log.info("Interrupted, quitting");
497 return;
498 }
499
500 try {
501 final NodeId self = clusterService.getLocalNode().id();
502 Set<ControllerNode> nodes = clusterService.getNodes();
503
504 List<NodeId> nodeIds = nodes.stream()
505 .map(node -> node.id())
506 .collect(Collectors.toList());
507
508 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
509 log.trace("No other peers in the cluster.");
510 return;
511 }
512
513 NodeId peer;
514 do {
515 int idx = RandomUtils.nextInt(0, nodeIds.size());
516 peer = nodeIds.get(idx);
517 } while (peer.equals(self));
518
519 if (Thread.currentThread().isInterrupted()) {
520 log.info("Interrupted, quitting");
521 return;
522 }
523
524 AntiEntropyAdvertisement<K> ad = createAdvertisement();
525
526 try {
527 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
528 } catch (IOException e) {
529 log.debug("Failed to send anti-entropy advertisement to {}", peer);
530 }
531 } catch (Exception e) {
532 // Catch all exceptions to avoid scheduled task being suppressed.
533 log.error("Exception thrown while sending advertisement", e);
534 }
535 }
536 }
537
538 private AntiEntropyAdvertisement<K> createAdvertisement() {
539 final NodeId self = clusterService.getLocalNode().id();
540
541 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
542
543 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
544
545 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
546
547 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
548 }
549
550 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
551 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
Jonathan Hart3469e602015-02-19 13:50:27 -0800552 boolean sync = false;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800553
554 synchronized (this) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800555 externalEvents = antiEntropyCheckLocalItems(ad);
556
557 antiEntropyCheckLocalRemoved(ad);
558
559 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
560
561 // if remote ad has something unknown, actively sync
562 for (K key : ad.timestamps().keySet()) {
563 if (!items.containsKey(key)) {
Jonathan Hart3469e602015-02-19 13:50:27 -0800564 sync = true;
565 break;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800566 }
567 }
568 } // synchronized (this)
569
Jonathan Hart3469e602015-02-19 13:50:27 -0800570 // Send the advertisement outside the synchronized block
571 if (sync) {
572 final NodeId sender = ad.sender();
573 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
574 try {
575 unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
576 } catch (IOException e) {
577 log.debug(
578 "Failed to send reactive anti-entropy advertisement to {}",
579 sender);
580 }
581 }
582
Jonathan Hartaaa56572015-01-28 21:56:35 -0800583 externalEvents.forEach(this::notifyListeners);
584 }
585
586 /**
587 * Checks if any of the remote's live items or tombstones are out of date
588 * according to our local live item list, or if our live items are out of
589 * date according to the remote's tombstone list.
590 * If the local copy is more recent, it will be pushed to the remote. If the
591 * remote has a more recent remove, we apply that to the local state.
592 *
593 * @param ad remote anti-entropy advertisement
594 * @return list of external events relating to local operations performed
595 */
596 // Guarded by synchronized (this)
597 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
598 AntiEntropyAdvertisement<K> ad) {
599 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
600 = new LinkedList<>();
601 final NodeId sender = ad.sender();
602
603 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
604
605 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
606 K key = item.getKey();
607 Timestamped<V> localValue = item.getValue();
608
609 Timestamp remoteTimestamp = ad.timestamps().get(key);
610 if (remoteTimestamp == null) {
611 remoteTimestamp = ad.tombstones().get(key);
612 }
613 if (remoteTimestamp == null || localValue
614 .isNewer(remoteTimestamp)) {
615 // local value is more recent, push to sender
616 updatesToSend
617 .add(new PutEntry<>(key, localValue.value(),
618 localValue.timestamp()));
619 }
620
621 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
622 if (remoteDeadTimestamp != null &&
623 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
624 // sender has a more recent remove
625 if (removeInternal(key, remoteDeadTimestamp)) {
626 externalEvents.add(new EventuallyConsistentMapEvent<>(
627 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
628 }
629 }
630 }
631
632 // Send all updates to the peer at once
633 if (!updatesToSend.isEmpty()) {
634 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800635 unicastMessage(sender, updateMessageSubject,
636 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800637 } catch (IOException e) {
638 log.warn("Failed to send advertisement response", e);
639 }
640 }
641
642 return externalEvents;
643 }
644
645 /**
646 * Checks if any items in the remote live list are out of date according
647 * to our tombstone list. If we find we have a more up to date tombstone,
648 * we'll send it to the remote.
649 *
650 * @param ad remote anti-entropy advertisement
651 */
652 // Guarded by synchronized (this)
653 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
654 final NodeId sender = ad.sender();
655
656 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
657
658 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
659 K key = dead.getKey();
660 Timestamp localDeadTimestamp = dead.getValue();
661
662 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
663 if (remoteLiveTimestamp != null
664 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
665 // sender has zombie, push remove
666 removesToSend
667 .add(new RemoveEntry<>(key, localDeadTimestamp));
668 }
669 }
670
671 // Send all removes to the peer at once
672 if (!removesToSend.isEmpty()) {
673 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800674 unicastMessage(sender, removeMessageSubject,
675 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800676 } catch (IOException e) {
677 log.warn("Failed to send advertisement response", e);
678 }
679 }
680 }
681
682 /**
683 * Checks if any of the local live items are out of date according to the
684 * remote's tombstone advertisements. If we find a local item is out of date,
685 * we'll apply the remove operation to the local state.
686 *
687 * @param ad remote anti-entropy advertisement
688 * @return list of external events relating to local operations performed
689 */
690 // Guarded by synchronized (this)
691 private List<EventuallyConsistentMapEvent<K, V>>
692 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
693 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
694 = new LinkedList<>();
695
696 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
697 K key = remoteDead.getKey();
698 Timestamp remoteDeadTimestamp = remoteDead.getValue();
699
700 Timestamped<V> local = items.get(key);
701 Timestamp localDead = removedItems.get(key);
702 if (local != null
703 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
704 // remove our version
705 if (removeInternal(key, remoteDeadTimestamp)) {
706 externalEvents.add(new EventuallyConsistentMapEvent<>(
707 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
708 }
709 } else if (localDead != null &&
710 remoteDeadTimestamp.compareTo(localDead) > 0) {
711 // If we both had the item as removed, but their timestamp is
712 // newer, update ours to the newer value
713 removedItems.put(key, remoteDeadTimestamp);
714 }
715 }
716
717 return externalEvents;
718 }
719
720 private final class InternalAntiEntropyListener
721 implements ClusterMessageHandler {
722
723 @Override
724 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800725 log.trace("Received anti-entropy advertisement from peer: {}",
726 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800727 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
728 backgroundExecutor.submit(() -> {
729 try {
730 handleAntiEntropyAdvertisement(advertisement);
731 } catch (Exception e) {
732 log.warn("Exception thrown handling advertisements", e);
733 }
734 });
735 }
736 }
737
Jonathan Hartdb3af892015-01-26 13:19:07 -0800738 private final class InternalPutEventListener implements
739 ClusterMessageHandler {
740 @Override
741 public void handle(ClusterMessage message) {
742 log.debug("Received put event from peer: {}", message.sender());
743 InternalPutEvent<K, V> event = serializer.decode(message.payload());
744
745 executor.submit(() -> {
746 try {
747 for (PutEntry<K, V> entry : event.entries()) {
748 K key = entry.key();
749 V value = entry.value();
750 Timestamp timestamp = entry.timestamp();
751
752 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800753 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800754 new EventuallyConsistentMapEvent<>(
755 EventuallyConsistentMapEvent.Type.PUT, key,
756 value);
757 notifyListeners(externalEvent);
758 }
759 }
760 } catch (Exception e) {
761 log.warn("Exception thrown handling put", e);
762 }
763 });
764 }
765 }
766
767 private final class InternalRemoveEventListener implements
768 ClusterMessageHandler {
769 @Override
770 public void handle(ClusterMessage message) {
771 log.debug("Received remove event from peer: {}", message.sender());
772 InternalRemoveEvent<K> event = serializer.decode(message.payload());
773
774 executor.submit(() -> {
775 try {
776 for (RemoveEntry<K> entry : event.entries()) {
777 K key = entry.key();
778 Timestamp timestamp = entry.timestamp();
779
780 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800781 EventuallyConsistentMapEvent<K, V> externalEvent
782 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800783 EventuallyConsistentMapEvent.Type.REMOVE,
784 key, null);
785 notifyListeners(externalEvent);
786 }
787 }
788 } catch (Exception e) {
789 log.warn("Exception thrown handling remove", e);
790 }
791 });
792 }
793 }
794
Jonathan Hartdb3af892015-01-26 13:19:07 -0800795}