blob: 0acef1a22f6232595535b6d12d6a4b302c2fe253 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
18import com.google.common.base.MoreObjects;
19import org.onlab.util.KryoNamespace;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.store.Timestamp;
23import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
24import org.onosproject.store.cluster.messaging.ClusterMessage;
25import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
26import org.onosproject.store.cluster.messaging.MessageSubject;
27import org.onosproject.store.serializers.KryoSerializer;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import java.io.IOException;
32import java.util.ArrayList;
33import java.util.Collection;
34import java.util.Collections;
35import java.util.List;
36import java.util.Map;
37import java.util.Set;
38import java.util.concurrent.ConcurrentHashMap;
39import java.util.concurrent.CopyOnWriteArraySet;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42import java.util.concurrent.ScheduledExecutorService;
43import java.util.stream.Collectors;
44
45import static com.google.common.base.Preconditions.checkNotNull;
46import static com.google.common.base.Preconditions.checkState;
47import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
48import static org.onlab.util.Tools.minPriority;
49import static org.onlab.util.Tools.namedThreads;
50
51/**
52 * Distributed Map implementation which uses optimistic replication and gossip
53 * based techniques to provide an eventually consistent data store.
54 */
55public class EventuallyConsistentMapImpl<K, V>
56 implements EventuallyConsistentMap<K, V> {
57
58 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
59
60 private final Map<K, Timestamped<V>> items;
61 private final Map<K, Timestamp> removedItems;
62
63 private final String mapName;
64 private final ClusterService clusterService;
65 private final ClusterCommunicationService clusterCommunicator;
66 private final KryoSerializer serializer;
67
68 private final ClockService<K> clockService;
69
70 private final MessageSubject updateMessageSubject;
71 private final MessageSubject removeMessageSubject;
72
73 private final Set<EventuallyConsistentMapListener> listeners
74 = new CopyOnWriteArraySet<>();
75
76 private final ExecutorService executor;
77
78 private final ScheduledExecutorService backgroundExecutor;
79
80 private volatile boolean destroyed = false;
Jonathan Hart539a6462015-01-27 17:05:43 -080081 private static final String ERROR_DESTROYED = " map is already destroyed";
Jonathan Hartdb3af892015-01-26 13:19:07 -080082
83 // TODO: Make these anti-entropy params configurable
84 private long initialDelaySec = 5;
85 private long periodSec = 5;
86
87 /**
88 * Creates a new eventually consistent map shared amongst multiple instances.
89 *
90 * Each map is identified by a string map name. EventuallyConsistentMapImpl
91 * objects in different JVMs that use the same map name will form a
92 * distributed map across JVMs (provided the cluster service is aware of
93 * both nodes).
94 *
95 * The client is expected to provide an
96 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
97 * will be stored in this map have been registered (including referenced
98 * classes). This serializer will be used to serialize both K and V for
99 * inter-node notifications.
100 *
101 * The client must provide an {@link org.onosproject.store.impl.ClockService}
102 * which can generate timestamps for a given key. The clock service is free
103 * to generate timestamps however it wishes, however these timestamps will
104 * be used to serialize updates to the map so they must be strict enough
105 * to ensure updates are properly ordered for the use case (i.e. in some
106 * cases wallclock time will suffice, whereas in other cases logical time
107 * will be necessary).
108 *
109 * @param mapName a String identifier for the map.
110 * @param clusterService the cluster service
111 * @param clusterCommunicator the cluster communications service
112 * @param serializerBuilder a Kryo namespace builder that can serialize
113 * both K and V
114 * @param clockService a clock service able to generate timestamps
115 * for K
116 */
117 public EventuallyConsistentMapImpl(String mapName,
118 ClusterService clusterService,
119 ClusterCommunicationService clusterCommunicator,
120 KryoNamespace.Builder serializerBuilder,
121 ClockService<K> clockService) {
122
123 this.mapName = checkNotNull(mapName);
124 this.clusterService = checkNotNull(clusterService);
125 this.clusterCommunicator = checkNotNull(clusterCommunicator);
126
127 serializer = createSerializer(checkNotNull(serializerBuilder));
128
129 this.clockService = checkNotNull(clockService);
130
131 items = new ConcurrentHashMap<>();
132 removedItems = new ConcurrentHashMap<>();
133
134 executor = Executors
135 .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
136
137 backgroundExecutor =
138 newSingleThreadScheduledExecutor(minPriority(
139 namedThreads("onos-ecm-" + mapName + "-bg-%d")));
140
141 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
142 clusterCommunicator.addSubscriber(updateMessageSubject,
143 new InternalPutEventListener());
144 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
145 clusterCommunicator.addSubscriber(removeMessageSubject,
146 new InternalRemoveEventListener());
147 }
148
149 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
150 return new KryoSerializer() {
151 @Override
152 protected void setupKryoPool() {
153 // Add the map's internal helper classes to the user-supplied serializer
154 serializerPool = builder
155 .register(WallClockTimestamp.class)
156 .register(PutEntry.class)
Jonathan Hart539a6462015-01-27 17:05:43 -0800157 .register(RemoveEntry.class)
Jonathan Hartdb3af892015-01-26 13:19:07 -0800158 .register(ArrayList.class)
159 .register(InternalPutEvent.class)
160 .register(InternalRemoveEvent.class)
161 .build();
162
163 // TODO anti-entropy classes
164 }
165 };
166 }
167
168 @Override
169 public int size() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800170 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800171 return items.size();
172 }
173
174 @Override
175 public boolean isEmpty() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800176 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800177 return items.isEmpty();
178 }
179
180 @Override
181 public boolean containsKey(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800182 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800183 return items.containsKey(key);
184 }
185
186 @Override
187 public boolean containsValue(V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800188 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800189
190 return items.values().stream()
191 .anyMatch(timestamped -> timestamped.value().equals(value));
192 }
193
194 @Override
195 public V get(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800196 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800197
198 Timestamped<V> value = items.get(key);
199 if (value != null) {
200 return value.value();
201 }
202 return null;
203 }
204
205 @Override
206 public void put(K key, V value) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800207 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800208
209 Timestamp timestamp = clockService.getTimestamp(key);
210 if (putInternal(key, value, timestamp)) {
211 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
212 EventuallyConsistentMapEvent<K, V> externalEvent
213 = new EventuallyConsistentMapEvent<>(
214 EventuallyConsistentMapEvent.Type.PUT, key, value);
215 notifyListeners(externalEvent);
216 }
217 }
218
219 private boolean putInternal(K key, V value, Timestamp timestamp) {
220 synchronized (this) {
221 Timestamp removed = removedItems.get(key);
222 if (removed != null && removed.compareTo(timestamp) > 0) {
223 return false;
224 }
225
226 Timestamped<V> existing = items.get(key);
227 if (existing != null && existing.isNewer(timestamp)) {
228 return false;
229 } else {
230 items.put(key, new Timestamped<>(value, timestamp));
231 removedItems.remove(key);
232 return true;
233 }
234 }
235 }
236
237 @Override
238 public void remove(K key) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800239 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800240
241 Timestamp timestamp = clockService.getTimestamp(key);
242 if (removeInternal(key, timestamp)) {
243 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
244 EventuallyConsistentMapEvent<K, V> externalEvent
245 = new EventuallyConsistentMapEvent<>(
246 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
247 notifyListeners(externalEvent);
248 }
249 }
250
251 private boolean removeInternal(K key, Timestamp timestamp) {
252 synchronized (this) {
253 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
254 return false;
255 }
256
257 items.remove(key);
258 removedItems.put(key, timestamp);
259 return true;
260 }
261 }
262
263 @Override
264 public void putAll(Map<? extends K, ? extends V> m) {
Jonathan Hart539a6462015-01-27 17:05:43 -0800265 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800266
267 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
268
269 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
270 K key = entry.getKey();
271 V value = entry.getValue();
272 Timestamp timestamp = clockService.getTimestamp(entry.getKey());
273
274 if (putInternal(key, value, timestamp)) {
275 updates.add(new PutEntry<>(key, value, timestamp));
276 }
277 }
278
279 notifyPeers(new InternalPutEvent<>(updates));
280
281 for (PutEntry<K, V> entry : updates) {
282 EventuallyConsistentMapEvent<K, V> externalEvent =
283 new EventuallyConsistentMapEvent<>(
284 EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
285 notifyListeners(externalEvent);
286 }
287 }
288
289 @Override
290 public void clear() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800291 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800292
293 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
294
295 for (K key : items.keySet()) {
296 Timestamp timestamp = clockService.getTimestamp(key);
297
298 if (removeInternal(key, timestamp)) {
299 removed.add(new RemoveEntry<>(key, timestamp));
300 }
301 }
302
303 notifyPeers(new InternalRemoveEvent<>(removed));
304
305 for (RemoveEntry<K> entry : removed) {
306 EventuallyConsistentMapEvent<K, V> externalEvent =
307 new EventuallyConsistentMapEvent<>(
308 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
309 notifyListeners(externalEvent);
310 }
311 }
312
313 @Override
314 public Set<K> keySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800315 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800316
317 return items.keySet();
318 }
319
320 @Override
321 public Collection<V> values() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800322 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800323
324 return items.values().stream()
325 .map(Timestamped::value)
326 .collect(Collectors.toList());
327 }
328
329 @Override
330 public Set<Map.Entry<K, V>> entrySet() {
Jonathan Hart539a6462015-01-27 17:05:43 -0800331 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800332
333 return items.entrySet().stream()
334 .map(e -> new Entry(e.getKey(), e.getValue().value()))
335 .collect(Collectors.toSet());
336 }
337
338 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800339 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
340 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800341
342 listeners.add(checkNotNull(listener));
343 }
344
345 @Override
Jonathan Hart539a6462015-01-27 17:05:43 -0800346 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
347 checkState(!destroyed, mapName + ERROR_DESTROYED);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800348
349 listeners.remove(checkNotNull(listener));
350 }
351
352 @Override
353 public void destroy() {
354 destroyed = true;
355
356 executor.shutdown();
357 backgroundExecutor.shutdown();
358
359 clusterCommunicator.removeSubscriber(updateMessageSubject);
360 clusterCommunicator.removeSubscriber(removeMessageSubject);
361 }
362
363 private void notifyListeners(EventuallyConsistentMapEvent event) {
364 for (EventuallyConsistentMapListener listener : listeners) {
365 listener.event(event);
366 }
367 }
368
369 private void notifyPeers(InternalPutEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800370 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800371 }
372
373 private void notifyPeers(InternalRemoveEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800374 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800375 }
376
Jonathan Hart7d656f42015-01-27 14:07:23 -0800377 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800378 ClusterMessage message = new ClusterMessage(
379 clusterService.getLocalNode().id(),
380 subject,
381 serializer.encode(event));
382 clusterCommunicator.broadcast(message);
383 }
384
385 private void unicastMessage(NodeId peer,
386 MessageSubject subject,
387 Object event) throws IOException {
388 ClusterMessage message = new ClusterMessage(
389 clusterService.getLocalNode().id(),
390 subject,
391 serializer.encode(event));
392 clusterCommunicator.unicast(message, peer);
393 }
394
395 private final class Entry implements Map.Entry<K, V> {
396
397 private final K key;
398 private final V value;
399
400 public Entry(K key, V value) {
401 this.key = key;
402 this.value = value;
403 }
404
405 @Override
406 public K getKey() {
407 return key;
408 }
409
410 @Override
411 public V getValue() {
412 return value;
413 }
414
415 @Override
416 public V setValue(V value) {
417 throw new UnsupportedOperationException();
418 }
419 }
420
421 private final class InternalPutEventListener implements
422 ClusterMessageHandler {
423 @Override
424 public void handle(ClusterMessage message) {
425 log.debug("Received put event from peer: {}", message.sender());
426 InternalPutEvent<K, V> event = serializer.decode(message.payload());
427
428 executor.submit(() -> {
429 try {
430 for (PutEntry<K, V> entry : event.entries()) {
431 K key = entry.key();
432 V value = entry.value();
433 Timestamp timestamp = entry.timestamp();
434
435 if (putInternal(key, value, timestamp)) {
436 EventuallyConsistentMapEvent externalEvent =
437 new EventuallyConsistentMapEvent<>(
438 EventuallyConsistentMapEvent.Type.PUT, key,
439 value);
440 notifyListeners(externalEvent);
441 }
442 }
443 } catch (Exception e) {
444 log.warn("Exception thrown handling put", e);
445 }
446 });
447 }
448 }
449
450 private final class InternalRemoveEventListener implements
451 ClusterMessageHandler {
452 @Override
453 public void handle(ClusterMessage message) {
454 log.debug("Received remove event from peer: {}", message.sender());
455 InternalRemoveEvent<K> event = serializer.decode(message.payload());
456
457 executor.submit(() -> {
458 try {
459 for (RemoveEntry<K> entry : event.entries()) {
460 K key = entry.key();
461 Timestamp timestamp = entry.timestamp();
462
463 if (removeInternal(key, timestamp)) {
464 EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
465 EventuallyConsistentMapEvent.Type.REMOVE,
466 key, null);
467 notifyListeners(externalEvent);
468 }
469 }
470 } catch (Exception e) {
471 log.warn("Exception thrown handling remove", e);
472 }
473 });
474 }
475 }
476
477 private static final class InternalPutEvent<K, V> {
478 private final List<PutEntry<K, V>> entries;
479
480 public InternalPutEvent(K key, V value, Timestamp timestamp) {
481 entries = Collections
482 .singletonList(new PutEntry<>(key, value, timestamp));
483 }
484
485 public InternalPutEvent(List<PutEntry<K, V>> entries) {
486 this.entries = checkNotNull(entries);
487 }
488
489 // Needed for serialization.
490 @SuppressWarnings("unused")
491 private InternalPutEvent() {
492 entries = null;
493 }
494
495 public List<PutEntry<K, V>> entries() {
496 return entries;
497 }
498 }
499
500 private static final class PutEntry<K, V> {
501 private final K key;
502 private final V value;
503 private final Timestamp timestamp;
504
505 public PutEntry(K key, V value, Timestamp timestamp) {
506 this.key = checkNotNull(key);
507 this.value = checkNotNull(value);
508 this.timestamp = checkNotNull(timestamp);
509 }
510
511 // Needed for serialization.
512 @SuppressWarnings("unused")
513 private PutEntry() {
514 this.key = null;
515 this.value = null;
516 this.timestamp = null;
517 }
518
519 public K key() {
520 return key;
521 }
522
523 public V value() {
524 return value;
525 }
526
527 public Timestamp timestamp() {
528 return timestamp;
529 }
530
531 public String toString() {
532 return MoreObjects.toStringHelper(getClass())
533 .add("key", key)
534 .add("value", value)
535 .add("timestamp", timestamp)
536 .toString();
537 }
538 }
539
540 private static final class InternalRemoveEvent<K> {
541 private final List<RemoveEntry<K>> entries;
542
543 public InternalRemoveEvent(K key, Timestamp timestamp) {
544 entries = Collections.singletonList(
545 new RemoveEntry<>(key, timestamp));
546 }
547
548 public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
549 this.entries = checkNotNull(entries);
550 }
551
552 // Needed for serialization.
553 @SuppressWarnings("unused")
554 private InternalRemoveEvent() {
555 entries = null;
556 }
557
558 public List<RemoveEntry<K>> entries() {
559 return entries;
560 }
561 }
562
563 private static final class RemoveEntry<K> {
564 private final K key;
565 private final Timestamp timestamp;
566
567 public RemoveEntry(K key, Timestamp timestamp) {
568 this.key = checkNotNull(key);
569 this.timestamp = checkNotNull(timestamp);
570 }
571
572 // Needed for serialization.
573 @SuppressWarnings("unused")
574 private RemoveEntry() {
575 this.key = null;
576 this.timestamp = null;
577 }
578
579 public K key() {
580 return key;
581 }
582
583 public Timestamp timestamp() {
584 return timestamp;
585 }
586 }
587}