blob: 19e77bb3b67cf061164180c3f8c79aa99584cc91 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hartdb3af892015-01-26 13:19:07 -080017
Jonathan Hartaaa56572015-01-28 21:56:35 -080018import org.apache.commons.lang3.RandomUtils;
Jonathan Hartf9108232015-02-02 16:37:35 -080019import org.apache.commons.lang3.tuple.Pair;
Jonathan Hartdb3af892015-01-26 13:19:07 -080020import org.onlab.util.KryoNamespace;
21import org.onosproject.cluster.ClusterService;
Jonathan Hartaaa56572015-01-28 21:56:35 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartdb3af892015-01-26 13:19:07 -080023import org.onosproject.cluster.NodeId;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080029import org.onosproject.store.impl.ClockService;
30import org.onosproject.store.impl.Timestamped;
31import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hartdb3af892015-01-26 13:19:07 -080032import org.onosproject.store.serializers.KryoSerializer;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
36import java.io.IOException;
37import java.util.ArrayList;
38import java.util.Collection;
Jonathan Hartaaa56572015-01-28 21:56:35 -080039import java.util.HashMap;
40import java.util.LinkedList;
Jonathan Hartdb3af892015-01-26 13:19:07 -080041import java.util.List;
42import java.util.Map;
43import java.util.Set;
44import java.util.concurrent.ConcurrentHashMap;
45import java.util.concurrent.CopyOnWriteArraySet;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.ScheduledExecutorService;
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080049import java.util.concurrent.TimeUnit;
Jonathan Hartdb3af892015-01-26 13:19:07 -080050import java.util.stream.Collectors;
51
52import static com.google.common.base.Preconditions.checkNotNull;
53import static com.google.common.base.Preconditions.checkState;
54import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -080055import static org.onlab.util.Tools.groupedThreads;
Jonathan Hartdb3af892015-01-26 13:19:07 -080056import static org.onlab.util.Tools.minPriority;
Jonathan Hartdb3af892015-01-26 13:19:07 -080057
58/**
59 * Distributed Map implementation which uses optimistic replication and gossip
60 * based techniques to provide an eventually consistent data store.
61 */
62public class EventuallyConsistentMapImpl<K, V>
63 implements EventuallyConsistentMap<K, V> {
64
65 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
66
67 private final Map<K, Timestamped<V>> items;
68 private final Map<K, Timestamp> removedItems;
69
Jonathan Hartdb3af892015-01-26 13:19:07 -080070 private final ClusterService clusterService;
71 private final ClusterCommunicationService clusterCommunicator;
72 private final KryoSerializer serializer;
73
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080074 private final ClockService<K, V> clockService;
Jonathan Hartdb3af892015-01-26 13:19:07 -080075
76 private final MessageSubject updateMessageSubject;
77 private final MessageSubject removeMessageSubject;
Jonathan Hartaaa56572015-01-28 21:56:35 -080078 private final MessageSubject antiEntropyAdvertisementSubject;
Jonathan Hartdb3af892015-01-26 13:19:07 -080079
Jonathan Hartaaa56572015-01-28 21:56:35 -080080 private final Set<EventuallyConsistentMapListener<K, V>> listeners
Jonathan Hartdb3af892015-01-26 13:19:07 -080081 = new CopyOnWriteArraySet<>();
82
83 private final ExecutorService executor;
84
85 private final ScheduledExecutorService backgroundExecutor;
86
Madan Jampanib28e4ad2015-02-19 12:31:37 -080087 private ExecutorService broadcastMessageExecutor;
Madan Jampani28726282015-02-19 11:40:23 -080088
Jonathan Hartdb3af892015-01-26 13:19:07 -080089 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080090 private static final String ERROR_DESTROYED = " map is already destroyed";
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -080091 private final String destroyedMessage;
Jonathan Hartdb3af892015-01-26 13:19:07 -080092
Jonathan Hart4f397e82015-02-04 09:10:41 -080093 private static final String ERROR_NULL_KEY = "Key cannot be null";
94 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
95
Jonathan Hartdb3af892015-01-26 13:19:07 -080096 // TODO: Make these anti-entropy params configurable
97 private long initialDelaySec = 5;
98 private long periodSec = 5;
99
100 /**
101 * Creates a new eventually consistent map shared amongst multiple instances.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800102 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800103 * Each map is identified by a string map name. EventuallyConsistentMapImpl
104 * objects in different JVMs that use the same map name will form a
105 * distributed map across JVMs (provided the cluster service is aware of
106 * both nodes).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800107 * </p>
108 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800109 * The client is expected to provide an
110 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
111 * will be stored in this map have been registered (including referenced
112 * classes). This serializer will be used to serialize both K and V for
113 * inter-node notifications.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800114 * </p>
115 * <p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800116 * The client must provide an {@link org.onosproject.store.impl.ClockService}
117 * which can generate timestamps for a given key. The clock service is free
118 * to generate timestamps however it wishes, however these timestamps will
119 * be used to serialize updates to the map so they must be strict enough
120 * to ensure updates are properly ordered for the use case (i.e. in some
121 * cases wallclock time will suffice, whereas in other cases logical time
122 * will be necessary).
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800123 * </p>
Jonathan Hartdb3af892015-01-26 13:19:07 -0800124 *
125 * @param mapName a String identifier for the map.
126 * @param clusterService the cluster service
127 * @param clusterCommunicator the cluster communications service
128 * @param serializerBuilder a Kryo namespace builder that can serialize
129 * both K and V
130 * @param clockService a clock service able to generate timestamps
131 * for K
132 */
133 public EventuallyConsistentMapImpl(String mapName,
134 ClusterService clusterService,
135 ClusterCommunicationService clusterCommunicator,
136 KryoNamespace.Builder serializerBuilder,
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800137 ClockService<K, V> clockService) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800138 this.clusterService = checkNotNull(clusterService);
139 this.clusterCommunicator = checkNotNull(clusterCommunicator);
140
141 serializer = createSerializer(checkNotNull(serializerBuilder));
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800142 destroyedMessage = mapName + ERROR_DESTROYED;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800143
144 this.clockService = checkNotNull(clockService);
145
146 items = new ConcurrentHashMap<>();
147 removedItems = new ConcurrentHashMap<>();
148
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800149 executor = Executors //FIXME
150 .newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800151
Madan Jampani28726282015-02-19 11:40:23 -0800152 broadcastMessageExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
153
Jonathan Hartdb3af892015-01-26 13:19:07 -0800154 backgroundExecutor =
155 newSingleThreadScheduledExecutor(minPriority(
Brian O'Connor4b2ba5f2015-02-18 20:54:00 -0800156 groupedThreads("onos/ecm", mapName + "-bg-%d")));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800157
Jonathan Hartaaa56572015-01-28 21:56:35 -0800158 // start anti-entropy thread
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800159 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
160 initialDelaySec, periodSec,
161 TimeUnit.SECONDS);
Jonathan Hartaaa56572015-01-28 21:56:35 -0800162
Jonathan Hartdb3af892015-01-26 13:19:07 -0800163 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
164 clusterCommunicator.addSubscriber(updateMessageSubject,
165 new InternalPutEventListener());
166 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
167 clusterCommunicator.addSubscriber(removeMessageSubject,
168 new InternalRemoveEventListener());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800169 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
170 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
171 new InternalAntiEntropyListener());
Jonathan Hartdb3af892015-01-26 13:19:07 -0800172 }
173
174 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
175 return new KryoSerializer() {
176 @Override
177 protected void setupKryoPool() {
178 // Add the map's internal helper classes to the user-supplied serializer
179 serializerPool = builder
180 .register(WallClockTimestamp.class)
181 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800182 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800183 .register(ArrayList.class)
184 .register(InternalPutEvent.class)
185 .register(InternalRemoveEvent.class)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800186 .register(AntiEntropyAdvertisement.class)
187 .register(HashMap.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800188 .build();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800189 }
190 };
191 }
192
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800193 /**
194 * Sets the executor to use for broadcasting messages and returns this
195 * instance for method chaining.
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800196 *
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800197 * @param executor executor service
198 * @return this instance
199 */
200 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
201 checkNotNull(executor, "Null executor");
202 broadcastMessageExecutor = executor;
203 return this;
204 }
205
Jonathan Hartdb3af892015-01-26 13:19:07 -0800206 @Override
207 public int size() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800208 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800209 return items.size();
210 }
211
212 @Override
213 public boolean isEmpty() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800214 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800215 return items.isEmpty();
216 }
217
218 @Override
219 public boolean containsKey(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800220 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800221 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800222 return items.containsKey(key);
223 }
224
225 @Override
226 public boolean containsValue(V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800227 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800228 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800229
230 return items.values().stream()
231 .anyMatch(timestamped -> timestamped.value().equals(value));
232 }
233
234 @Override
235 public V get(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800236 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800237 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800238
239 Timestamped<V> value = items.get(key);
240 if (value != null) {
241 return value.value();
242 }
243 return null;
244 }
245
246 @Override
247 public void put(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800248 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800249 checkNotNull(key, ERROR_NULL_KEY);
250 checkNotNull(value, ERROR_NULL_VALUE);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800251
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800252 Timestamp timestamp = clockService.getTimestamp(key, value);
253
Jonathan Hartdb3af892015-01-26 13:19:07 -0800254 if (putInternal(key, value, timestamp)) {
255 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
256 EventuallyConsistentMapEvent<K, V> externalEvent
257 = new EventuallyConsistentMapEvent<>(
258 EventuallyConsistentMapEvent.Type.PUT, key, value);
259 notifyListeners(externalEvent);
260 }
261 }
262
263 private boolean putInternal(K key, V value, Timestamp timestamp) {
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800264 Timestamp removed = removedItems.get(key);
265 if (removed != null && removed.compareTo(timestamp) > 0) {
266 log.debug("ecmap - removed was newer {}", value);
267 return false;
268 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800269
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800270 boolean success;
271 synchronized (this) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800272 Timestamped<V> existing = items.get(key);
273 if (existing != null && existing.isNewer(timestamp)) {
Jonathan Hart07e58be2015-02-12 09:57:16 -0800274 log.debug("ecmap - existing was newer {}", value);
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800275 success = false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800276 } else {
277 items.put(key, new Timestamped<>(value, timestamp));
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800278 success = true;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800279 }
280 }
Brian O'Connor2952e3b2015-02-19 21:47:57 -0800281
282 if (success && removed != null) {
283 removedItems.remove(key, removed);
284 }
285 return success;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800286 }
287
288 @Override
289 public void remove(K key) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800290 checkState(!destroyed, destroyedMessage);
Jonathan Hart4f397e82015-02-04 09:10:41 -0800291 checkNotNull(key, ERROR_NULL_KEY);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800292
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800293 // TODO prevent calls here if value is important for timestamp
294 Timestamp timestamp = clockService.getTimestamp(key, null);
295
Jonathan Hartdb3af892015-01-26 13:19:07 -0800296 if (removeInternal(key, timestamp)) {
297 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
298 EventuallyConsistentMapEvent<K, V> externalEvent
299 = new EventuallyConsistentMapEvent<>(
300 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
301 notifyListeners(externalEvent);
302 }
303 }
304
305 private boolean removeInternal(K key, Timestamp timestamp) {
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800306 Timestamped<V> value = items.get(key);
307 if (value != null) {
308 if (value.isNewer(timestamp)) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800309 return false;
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800310 } else {
311 items.remove(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800312 }
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800313 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800314
Madan Jampani7e6cfe32015-02-19 17:44:44 -0800315 Timestamp removedTimestamp = removedItems.get(key);
316 if (removedTimestamp == null) {
317 return removedItems.putIfAbsent(key, timestamp) == null;
318 } else if (timestamp.compareTo(removedTimestamp) > 0) {
319 return removedItems.replace(key, removedTimestamp, timestamp);
320 } else {
321 return false;
Jonathan Hartdb3af892015-01-26 13:19:07 -0800322 }
323 }
324
325 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800326 public void remove(K key, V value) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800327 checkState(!destroyed, destroyedMessage);
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800328 checkNotNull(key, ERROR_NULL_KEY);
329 checkNotNull(value, ERROR_NULL_VALUE);
330
331 Timestamp timestamp = clockService.getTimestamp(key, value);
332
333 if (removeInternal(key, timestamp)) {
334 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
335 EventuallyConsistentMapEvent<K, V> externalEvent
336 = new EventuallyConsistentMapEvent<>(
337 EventuallyConsistentMapEvent.Type.REMOVE, key, value);
338 notifyListeners(externalEvent);
339 }
340 }
341
342 @Override
Jonathan Hartdb3af892015-01-26 13:19:07 -0800343 public void putAll(Map<? extends K, ? extends V> m) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800344 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800345
346 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
347
348 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
349 K key = entry.getKey();
350 V value = entry.getValue();
Jonathan Hart4f397e82015-02-04 09:10:41 -0800351
352 checkNotNull(key, ERROR_NULL_KEY);
353 checkNotNull(value, ERROR_NULL_VALUE);
354
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800355 Timestamp timestamp = clockService.getTimestamp(key, value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800356
357 if (putInternal(key, value, timestamp)) {
358 updates.add(new PutEntry<>(key, value, timestamp));
359 }
360 }
361
Jonathan Hart584d2f32015-01-27 19:46:14 -0800362 if (!updates.isEmpty()) {
363 notifyPeers(new InternalPutEvent<>(updates));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800364
Jonathan Hart584d2f32015-01-27 19:46:14 -0800365 for (PutEntry<K, V> entry : updates) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800366 EventuallyConsistentMapEvent<K, V> externalEvent =
367 new EventuallyConsistentMapEvent<>(
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800368 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
369 entry.value());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800370 notifyListeners(externalEvent);
371 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800372 }
373 }
374
375 @Override
376 public void clear() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800377 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800378
379 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
380
381 for (K key : items.keySet()) {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800382 // TODO also this is not applicable if value is important for timestamp?
383 Timestamp timestamp = clockService.getTimestamp(key, null);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800384
385 if (removeInternal(key, timestamp)) {
386 removed.add(new RemoveEntry<>(key, timestamp));
387 }
388 }
389
Jonathan Hart584d2f32015-01-27 19:46:14 -0800390 if (!removed.isEmpty()) {
391 notifyPeers(new InternalRemoveEvent<>(removed));
Jonathan Hartdb3af892015-01-26 13:19:07 -0800392
Jonathan Hart584d2f32015-01-27 19:46:14 -0800393 for (RemoveEntry<K> entry : removed) {
394 EventuallyConsistentMapEvent<K, V> externalEvent
395 = new EventuallyConsistentMapEvent<>(
396 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
397 null);
398 notifyListeners(externalEvent);
399 }
Jonathan Hartdb3af892015-01-26 13:19:07 -0800400 }
401 }
402
403 @Override
404 public Set<K> keySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800405 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800406
407 return items.keySet();
408 }
409
410 @Override
411 public Collection<V> values() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800412 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800413
414 return items.values().stream()
415 .map(Timestamped::value)
416 .collect(Collectors.toList());
417 }
418
419 @Override
420 public Set<Map.Entry<K, V>> entrySet() {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800421 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800422
423 return items.entrySet().stream()
Jonathan Hartf9108232015-02-02 16:37:35 -0800424 .map(e -> Pair.of(e.getKey(), e.getValue().value()))
Jonathan Hartdb3af892015-01-26 13:19:07 -0800425 .collect(Collectors.toSet());
426 }
427
428 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800429 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800430 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800431
432 listeners.add(checkNotNull(listener));
433 }
434
435 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800436 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800437 checkState(!destroyed, destroyedMessage);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800438
439 listeners.remove(checkNotNull(listener));
440 }
441
442 @Override
443 public void destroy() {
444 destroyed = true;
445
446 executor.shutdown();
447 backgroundExecutor.shutdown();
Madan Jampani337bb442015-02-19 14:29:18 -0800448 broadcastMessageExecutor.shutdown();
Jonathan Hartdb3af892015-01-26 13:19:07 -0800449
Jonathan Hart584d2f32015-01-27 19:46:14 -0800450 listeners.clear();
451
Jonathan Hartdb3af892015-01-26 13:19:07 -0800452 clusterCommunicator.removeSubscriber(updateMessageSubject);
453 clusterCommunicator.removeSubscriber(removeMessageSubject);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800454 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800455 }
456
Jonathan Hartaaa56572015-01-28 21:56:35 -0800457 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
458 for (EventuallyConsistentMapListener<K, V> listener : listeners) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800459 listener.event(event);
460 }
461 }
462
463 private void notifyPeers(InternalPutEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800464 // FIXME extremely memory expensive when we are overrun
465// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
466 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800467 }
468
469 private void notifyPeers(InternalRemoveEvent event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800470 // FIXME extremely memory expensive when we are overrun
471// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
472 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800473 }
474
Jonathan Hart7d656f42015-01-27 14:07:23 -0800475 private void broadcastMessage(MessageSubject subject, Object event) {
Brian O'Connorb2894222015-02-20 22:05:19 -0800476 // FIXME can we parallelize the serialization... use the caller???
Jonathan Hartdb3af892015-01-26 13:19:07 -0800477 ClusterMessage message = new ClusterMessage(
478 clusterService.getLocalNode().id(),
479 subject,
480 serializer.encode(event));
Brian O'Connorb2894222015-02-20 22:05:19 -0800481 //broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
Madan Jampani337bb442015-02-19 14:29:18 -0800482 clusterCommunicator.broadcast(message);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800483 }
484
485 private void unicastMessage(NodeId peer,
486 MessageSubject subject,
487 Object event) throws IOException {
488 ClusterMessage message = new ClusterMessage(
489 clusterService.getLocalNode().id(),
490 subject,
491 serializer.encode(event));
492 clusterCommunicator.unicast(message, peer);
493 }
494
Jonathan Hartaaa56572015-01-28 21:56:35 -0800495 private final class SendAdvertisementTask implements Runnable {
496 @Override
497 public void run() {
498 if (Thread.currentThread().isInterrupted()) {
499 log.info("Interrupted, quitting");
500 return;
501 }
502
503 try {
504 final NodeId self = clusterService.getLocalNode().id();
505 Set<ControllerNode> nodes = clusterService.getNodes();
506
507 List<NodeId> nodeIds = nodes.stream()
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800508 .map(ControllerNode::id)
Jonathan Hartaaa56572015-01-28 21:56:35 -0800509 .collect(Collectors.toList());
510
511 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
512 log.trace("No other peers in the cluster.");
513 return;
514 }
515
516 NodeId peer;
517 do {
518 int idx = RandomUtils.nextInt(0, nodeIds.size());
519 peer = nodeIds.get(idx);
520 } while (peer.equals(self));
521
522 if (Thread.currentThread().isInterrupted()) {
523 log.info("Interrupted, quitting");
524 return;
525 }
526
527 AntiEntropyAdvertisement<K> ad = createAdvertisement();
528
529 try {
530 unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
531 } catch (IOException e) {
532 log.debug("Failed to send anti-entropy advertisement to {}", peer);
533 }
534 } catch (Exception e) {
535 // Catch all exceptions to avoid scheduled task being suppressed.
536 log.error("Exception thrown while sending advertisement", e);
537 }
538 }
539 }
540
541 private AntiEntropyAdvertisement<K> createAdvertisement() {
542 final NodeId self = clusterService.getLocalNode().id();
543
544 Map<K, Timestamp> timestamps = new HashMap<>(items.size());
545
546 items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
547
548 Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
549
550 return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
551 }
552
553 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
554 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
Jonathan Hart3469e602015-02-19 13:50:27 -0800555 boolean sync = false;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800556
557 synchronized (this) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800558 externalEvents = antiEntropyCheckLocalItems(ad);
559
560 antiEntropyCheckLocalRemoved(ad);
561
562 externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
563
564 // if remote ad has something unknown, actively sync
565 for (K key : ad.timestamps().keySet()) {
566 if (!items.containsKey(key)) {
Jonathan Hart3469e602015-02-19 13:50:27 -0800567 sync = true;
568 break;
Jonathan Hartaaa56572015-01-28 21:56:35 -0800569 }
570 }
571 } // synchronized (this)
572
Jonathan Hart3469e602015-02-19 13:50:27 -0800573 // Send the advertisement outside the synchronized block
574 if (sync) {
575 final NodeId sender = ad.sender();
576 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
577 try {
578 unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
579 } catch (IOException e) {
580 log.debug(
581 "Failed to send reactive anti-entropy advertisement to {}",
582 sender);
583 }
584 }
585
Jonathan Hartaaa56572015-01-28 21:56:35 -0800586 externalEvents.forEach(this::notifyListeners);
587 }
588
589 /**
590 * Checks if any of the remote's live items or tombstones are out of date
591 * according to our local live item list, or if our live items are out of
592 * date according to the remote's tombstone list.
593 * If the local copy is more recent, it will be pushed to the remote. If the
594 * remote has a more recent remove, we apply that to the local state.
595 *
596 * @param ad remote anti-entropy advertisement
597 * @return list of external events relating to local operations performed
598 */
599 // Guarded by synchronized (this)
600 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
601 AntiEntropyAdvertisement<K> ad) {
602 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
603 = new LinkedList<>();
604 final NodeId sender = ad.sender();
605
606 final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
607
608 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
609 K key = item.getKey();
610 Timestamped<V> localValue = item.getValue();
611
612 Timestamp remoteTimestamp = ad.timestamps().get(key);
613 if (remoteTimestamp == null) {
614 remoteTimestamp = ad.tombstones().get(key);
615 }
616 if (remoteTimestamp == null || localValue
617 .isNewer(remoteTimestamp)) {
618 // local value is more recent, push to sender
619 updatesToSend
620 .add(new PutEntry<>(key, localValue.value(),
621 localValue.timestamp()));
622 }
623
624 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
625 if (remoteDeadTimestamp != null &&
626 remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
627 // sender has a more recent remove
628 if (removeInternal(key, remoteDeadTimestamp)) {
629 externalEvents.add(new EventuallyConsistentMapEvent<>(
630 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
631 }
632 }
633 }
634
635 // Send all updates to the peer at once
636 if (!updatesToSend.isEmpty()) {
637 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800638 unicastMessage(sender, updateMessageSubject,
639 new InternalPutEvent<>(updatesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800640 } catch (IOException e) {
641 log.warn("Failed to send advertisement response", e);
642 }
643 }
644
645 return externalEvents;
646 }
647
648 /**
649 * Checks if any items in the remote live list are out of date according
650 * to our tombstone list. If we find we have a more up to date tombstone,
651 * we'll send it to the remote.
652 *
653 * @param ad remote anti-entropy advertisement
654 */
655 // Guarded by synchronized (this)
656 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
657 final NodeId sender = ad.sender();
658
659 final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
660
661 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
662 K key = dead.getKey();
663 Timestamp localDeadTimestamp = dead.getValue();
664
665 Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
666 if (remoteLiveTimestamp != null
667 && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
668 // sender has zombie, push remove
669 removesToSend
670 .add(new RemoveEntry<>(key, localDeadTimestamp));
671 }
672 }
673
674 // Send all removes to the peer at once
675 if (!removesToSend.isEmpty()) {
676 try {
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800677 unicastMessage(sender, removeMessageSubject,
678 new InternalRemoveEvent<>(removesToSend));
Jonathan Hartaaa56572015-01-28 21:56:35 -0800679 } catch (IOException e) {
680 log.warn("Failed to send advertisement response", e);
681 }
682 }
683 }
684
685 /**
686 * Checks if any of the local live items are out of date according to the
687 * remote's tombstone advertisements. If we find a local item is out of date,
688 * we'll apply the remove operation to the local state.
689 *
690 * @param ad remote anti-entropy advertisement
691 * @return list of external events relating to local operations performed
692 */
693 // Guarded by synchronized (this)
694 private List<EventuallyConsistentMapEvent<K, V>>
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800695 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800696 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
697 = new LinkedList<>();
698
699 for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
700 K key = remoteDead.getKey();
701 Timestamp remoteDeadTimestamp = remoteDead.getValue();
702
703 Timestamped<V> local = items.get(key);
704 Timestamp localDead = removedItems.get(key);
705 if (local != null
706 && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
707 // remove our version
708 if (removeInternal(key, remoteDeadTimestamp)) {
709 externalEvents.add(new EventuallyConsistentMapEvent<>(
710 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
711 }
712 } else if (localDead != null &&
713 remoteDeadTimestamp.compareTo(localDead) > 0) {
714 // If we both had the item as removed, but their timestamp is
715 // newer, update ours to the newer value
716 removedItems.put(key, remoteDeadTimestamp);
717 }
718 }
719
720 return externalEvents;
721 }
722
723 private final class InternalAntiEntropyListener
724 implements ClusterMessageHandler {
725
726 @Override
727 public void handle(ClusterMessage message) {
Jonathan Hart4fd4ebb2015-02-04 17:38:48 -0800728 log.trace("Received anti-entropy advertisement from peer: {}",
729 message.sender());
Jonathan Hartaaa56572015-01-28 21:56:35 -0800730 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
731 backgroundExecutor.submit(() -> {
732 try {
733 handleAntiEntropyAdvertisement(advertisement);
734 } catch (Exception e) {
735 log.warn("Exception thrown handling advertisements", e);
736 }
737 });
738 }
739 }
740
Jonathan Hartdb3af892015-01-26 13:19:07 -0800741 private final class InternalPutEventListener implements
742 ClusterMessageHandler {
743 @Override
744 public void handle(ClusterMessage message) {
745 log.debug("Received put event from peer: {}", message.sender());
746 InternalPutEvent<K, V> event = serializer.decode(message.payload());
747
748 executor.submit(() -> {
749 try {
750 for (PutEntry<K, V> entry : event.entries()) {
751 K key = entry.key();
752 V value = entry.value();
753 Timestamp timestamp = entry.timestamp();
754
755 if (putInternal(key, value, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800756 EventuallyConsistentMapEvent<K, V> externalEvent =
Jonathan Hartdb3af892015-01-26 13:19:07 -0800757 new EventuallyConsistentMapEvent<>(
Thomas Vachuskaa132e3a2015-02-21 01:53:14 -0800758 EventuallyConsistentMapEvent.Type.PUT, key,
759 value);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800760 notifyListeners(externalEvent);
761 }
762 }
763 } catch (Exception e) {
764 log.warn("Exception thrown handling put", e);
765 }
766 });
767 }
768 }
769
770 private final class InternalRemoveEventListener implements
771 ClusterMessageHandler {
772 @Override
773 public void handle(ClusterMessage message) {
774 log.debug("Received remove event from peer: {}", message.sender());
775 InternalRemoveEvent<K> event = serializer.decode(message.payload());
776
777 executor.submit(() -> {
778 try {
779 for (RemoveEntry<K> entry : event.entries()) {
780 K key = entry.key();
781 Timestamp timestamp = entry.timestamp();
782
783 if (removeInternal(key, timestamp)) {
Jonathan Hartaaa56572015-01-28 21:56:35 -0800784 EventuallyConsistentMapEvent<K, V> externalEvent
785 = new EventuallyConsistentMapEvent<>(
Jonathan Hartdb3af892015-01-26 13:19:07 -0800786 EventuallyConsistentMapEvent.Type.REMOVE,
787 key, null);
788 notifyListeners(externalEvent);
789 }
790 }
791 } catch (Exception e) {
792 log.warn("Exception thrown handling remove", e);
793 }
794 });
795 }
796 }
797
Jonathan Hartdb3af892015-01-26 13:19:07 -0800798}