blob: 19786f8511f91f2913e01f2f9e6a0a4bf6b05c93 [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
18import com.google.common.base.MoreObjects;
19import org.onlab.util.KryoNamespace;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.store.Timestamp;
23import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
24import org.onosproject.store.cluster.messaging.ClusterMessage;
25import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
26import org.onosproject.store.cluster.messaging.MessageSubject;
27import org.onosproject.store.serializers.KryoSerializer;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import java.io.IOException;
32import java.util.ArrayList;
33import java.util.Collection;
34import java.util.Collections;
35import java.util.List;
36import java.util.Map;
37import java.util.Set;
38import java.util.concurrent.ConcurrentHashMap;
39import java.util.concurrent.CopyOnWriteArraySet;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42import java.util.concurrent.ScheduledExecutorService;
43import java.util.stream.Collectors;
44
45import static com.google.common.base.Preconditions.checkNotNull;
46import static com.google.common.base.Preconditions.checkState;
47import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
48import static org.onlab.util.Tools.minPriority;
49import static org.onlab.util.Tools.namedThreads;
50
51/**
52 * Distributed Map implementation which uses optimistic replication and gossip
53 * based techniques to provide an eventually consistent data store.
54 */
55public class EventuallyConsistentMapImpl<K, V>
56 implements EventuallyConsistentMap<K, V> {
57
58 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
59
60 private final Map<K, Timestamped<V>> items;
61 private final Map<K, Timestamp> removedItems;
62
63 private final String mapName;
64 private final ClusterService clusterService;
65 private final ClusterCommunicationService clusterCommunicator;
66 private final KryoSerializer serializer;
67
68 private final ClockService<K> clockService;
69
70 private final MessageSubject updateMessageSubject;
71 private final MessageSubject removeMessageSubject;
72
73 private final Set<EventuallyConsistentMapListener> listeners
74 = new CopyOnWriteArraySet<>();
75
76 private final ExecutorService executor;
77
78 private final ScheduledExecutorService backgroundExecutor;
79
80 private volatile boolean destroyed = false;
81 private static final String ERROR_DESTROYED = " is already destroyed";
82
83 // TODO: Make these anti-entropy params configurable
84 private long initialDelaySec = 5;
85 private long periodSec = 5;
86
87 /**
88 * Creates a new eventually consistent map shared amongst multiple instances.
89 *
90 * Each map is identified by a string map name. EventuallyConsistentMapImpl
91 * objects in different JVMs that use the same map name will form a
92 * distributed map across JVMs (provided the cluster service is aware of
93 * both nodes).
94 *
95 * The client is expected to provide an
96 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
97 * will be stored in this map have been registered (including referenced
98 * classes). This serializer will be used to serialize both K and V for
99 * inter-node notifications.
100 *
101 * The client must provide an {@link org.onosproject.store.impl.ClockService}
102 * which can generate timestamps for a given key. The clock service is free
103 * to generate timestamps however it wishes, however these timestamps will
104 * be used to serialize updates to the map so they must be strict enough
105 * to ensure updates are properly ordered for the use case (i.e. in some
106 * cases wallclock time will suffice, whereas in other cases logical time
107 * will be necessary).
108 *
109 * @param mapName a String identifier for the map.
110 * @param clusterService the cluster service
111 * @param clusterCommunicator the cluster communications service
112 * @param serializerBuilder a Kryo namespace builder that can serialize
113 * both K and V
114 * @param clockService a clock service able to generate timestamps
115 * for K
116 */
117 public EventuallyConsistentMapImpl(String mapName,
118 ClusterService clusterService,
119 ClusterCommunicationService clusterCommunicator,
120 KryoNamespace.Builder serializerBuilder,
121 ClockService<K> clockService) {
122
123 this.mapName = checkNotNull(mapName);
124 this.clusterService = checkNotNull(clusterService);
125 this.clusterCommunicator = checkNotNull(clusterCommunicator);
126
127 serializer = createSerializer(checkNotNull(serializerBuilder));
128
129 this.clockService = checkNotNull(clockService);
130
131 items = new ConcurrentHashMap<>();
132 removedItems = new ConcurrentHashMap<>();
133
134 executor = Executors
135 .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
136
137 backgroundExecutor =
138 newSingleThreadScheduledExecutor(minPriority(
139 namedThreads("onos-ecm-" + mapName + "-bg-%d")));
140
141 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
142 clusterCommunicator.addSubscriber(updateMessageSubject,
143 new InternalPutEventListener());
144 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
145 clusterCommunicator.addSubscriber(removeMessageSubject,
146 new InternalRemoveEventListener());
147 }
148
149 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
150 return new KryoSerializer() {
151 @Override
152 protected void setupKryoPool() {
153 // Add the map's internal helper classes to the user-supplied serializer
154 serializerPool = builder
155 .register(WallClockTimestamp.class)
156 .register(PutEntry.class)
157 .register(ArrayList.class)
158 .register(InternalPutEvent.class)
159 .register(InternalRemoveEvent.class)
160 .build();
161
162 // TODO anti-entropy classes
163 }
164 };
165 }
166
167 @Override
168 public int size() {
169 checkState(destroyed, mapName + ERROR_DESTROYED);
170 return items.size();
171 }
172
173 @Override
174 public boolean isEmpty() {
175 checkState(destroyed, mapName + ERROR_DESTROYED);
176 return items.isEmpty();
177 }
178
179 @Override
180 public boolean containsKey(K key) {
181 checkState(destroyed, mapName + ERROR_DESTROYED);
182 return items.containsKey(key);
183 }
184
185 @Override
186 public boolean containsValue(V value) {
187 checkState(destroyed, mapName + ERROR_DESTROYED);
188
189 return items.values().stream()
190 .anyMatch(timestamped -> timestamped.value().equals(value));
191 }
192
193 @Override
194 public V get(K key) {
195 checkState(destroyed, mapName + ERROR_DESTROYED);
196
197 Timestamped<V> value = items.get(key);
198 if (value != null) {
199 return value.value();
200 }
201 return null;
202 }
203
204 @Override
205 public void put(K key, V value) {
206 checkState(destroyed, mapName + ERROR_DESTROYED);
207
208 Timestamp timestamp = clockService.getTimestamp(key);
209 if (putInternal(key, value, timestamp)) {
210 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
211 EventuallyConsistentMapEvent<K, V> externalEvent
212 = new EventuallyConsistentMapEvent<>(
213 EventuallyConsistentMapEvent.Type.PUT, key, value);
214 notifyListeners(externalEvent);
215 }
216 }
217
218 private boolean putInternal(K key, V value, Timestamp timestamp) {
219 synchronized (this) {
220 Timestamp removed = removedItems.get(key);
221 if (removed != null && removed.compareTo(timestamp) > 0) {
222 return false;
223 }
224
225 Timestamped<V> existing = items.get(key);
226 if (existing != null && existing.isNewer(timestamp)) {
227 return false;
228 } else {
229 items.put(key, new Timestamped<>(value, timestamp));
230 removedItems.remove(key);
231 return true;
232 }
233 }
234 }
235
236 @Override
237 public void remove(K key) {
238 checkState(destroyed, mapName + ERROR_DESTROYED);
239
240 Timestamp timestamp = clockService.getTimestamp(key);
241 if (removeInternal(key, timestamp)) {
242 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
243 EventuallyConsistentMapEvent<K, V> externalEvent
244 = new EventuallyConsistentMapEvent<>(
245 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
246 notifyListeners(externalEvent);
247 }
248 }
249
250 private boolean removeInternal(K key, Timestamp timestamp) {
251 synchronized (this) {
252 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
253 return false;
254 }
255
256 items.remove(key);
257 removedItems.put(key, timestamp);
258 return true;
259 }
260 }
261
262 @Override
263 public void putAll(Map<? extends K, ? extends V> m) {
264 checkState(destroyed, mapName + ERROR_DESTROYED);
265
266 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
267
268 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
269 K key = entry.getKey();
270 V value = entry.getValue();
271 Timestamp timestamp = clockService.getTimestamp(entry.getKey());
272
273 if (putInternal(key, value, timestamp)) {
274 updates.add(new PutEntry<>(key, value, timestamp));
275 }
276 }
277
278 notifyPeers(new InternalPutEvent<>(updates));
279
280 for (PutEntry<K, V> entry : updates) {
281 EventuallyConsistentMapEvent<K, V> externalEvent =
282 new EventuallyConsistentMapEvent<>(
283 EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
284 notifyListeners(externalEvent);
285 }
286 }
287
288 @Override
289 public void clear() {
290 checkState(destroyed, mapName + ERROR_DESTROYED);
291
292 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
293
294 for (K key : items.keySet()) {
295 Timestamp timestamp = clockService.getTimestamp(key);
296
297 if (removeInternal(key, timestamp)) {
298 removed.add(new RemoveEntry<>(key, timestamp));
299 }
300 }
301
302 notifyPeers(new InternalRemoveEvent<>(removed));
303
304 for (RemoveEntry<K> entry : removed) {
305 EventuallyConsistentMapEvent<K, V> externalEvent =
306 new EventuallyConsistentMapEvent<>(
307 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
308 notifyListeners(externalEvent);
309 }
310 }
311
312 @Override
313 public Set<K> keySet() {
314 checkState(destroyed, mapName + ERROR_DESTROYED);
315
316 return items.keySet();
317 }
318
319 @Override
320 public Collection<V> values() {
321 checkState(destroyed, mapName + ERROR_DESTROYED);
322
323 return items.values().stream()
324 .map(Timestamped::value)
325 .collect(Collectors.toList());
326 }
327
328 @Override
329 public Set<Map.Entry<K, V>> entrySet() {
330 checkState(destroyed, mapName + ERROR_DESTROYED);
331
332 return items.entrySet().stream()
333 .map(e -> new Entry(e.getKey(), e.getValue().value()))
334 .collect(Collectors.toSet());
335 }
336
337 @Override
338 public void addListener(EventuallyConsistentMapListener listener) {
339 checkState(destroyed, mapName + ERROR_DESTROYED);
340
341 listeners.add(checkNotNull(listener));
342 }
343
344 @Override
345 public void removeListener(EventuallyConsistentMapListener listener) {
346 checkState(destroyed, mapName + ERROR_DESTROYED);
347
348 listeners.remove(checkNotNull(listener));
349 }
350
351 @Override
352 public void destroy() {
353 destroyed = true;
354
355 executor.shutdown();
356 backgroundExecutor.shutdown();
357
358 clusterCommunicator.removeSubscriber(updateMessageSubject);
359 clusterCommunicator.removeSubscriber(removeMessageSubject);
360 }
361
362 private void notifyListeners(EventuallyConsistentMapEvent event) {
363 for (EventuallyConsistentMapListener listener : listeners) {
364 listener.event(event);
365 }
366 }
367
368 private void notifyPeers(InternalPutEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800369 broadcastMessage(updateMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800370 }
371
372 private void notifyPeers(InternalRemoveEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800373 broadcastMessage(removeMessageSubject, event);
Jonathan Hartdb3af892015-01-26 13:19:07 -0800374 }
375
Jonathan Hart7d656f42015-01-27 14:07:23 -0800376 private void broadcastMessage(MessageSubject subject, Object event) {
Jonathan Hartdb3af892015-01-26 13:19:07 -0800377 ClusterMessage message = new ClusterMessage(
378 clusterService.getLocalNode().id(),
379 subject,
380 serializer.encode(event));
381 clusterCommunicator.broadcast(message);
382 }
383
384 private void unicastMessage(NodeId peer,
385 MessageSubject subject,
386 Object event) throws IOException {
387 ClusterMessage message = new ClusterMessage(
388 clusterService.getLocalNode().id(),
389 subject,
390 serializer.encode(event));
391 clusterCommunicator.unicast(message, peer);
392 }
393
394 private final class Entry implements Map.Entry<K, V> {
395
396 private final K key;
397 private final V value;
398
399 public Entry(K key, V value) {
400 this.key = key;
401 this.value = value;
402 }
403
404 @Override
405 public K getKey() {
406 return key;
407 }
408
409 @Override
410 public V getValue() {
411 return value;
412 }
413
414 @Override
415 public V setValue(V value) {
416 throw new UnsupportedOperationException();
417 }
418 }
419
420 private final class InternalPutEventListener implements
421 ClusterMessageHandler {
422 @Override
423 public void handle(ClusterMessage message) {
424 log.debug("Received put event from peer: {}", message.sender());
425 InternalPutEvent<K, V> event = serializer.decode(message.payload());
426
427 executor.submit(() -> {
428 try {
429 for (PutEntry<K, V> entry : event.entries()) {
430 K key = entry.key();
431 V value = entry.value();
432 Timestamp timestamp = entry.timestamp();
433
434 if (putInternal(key, value, timestamp)) {
435 EventuallyConsistentMapEvent externalEvent =
436 new EventuallyConsistentMapEvent<>(
437 EventuallyConsistentMapEvent.Type.PUT, key,
438 value);
439 notifyListeners(externalEvent);
440 }
441 }
442 } catch (Exception e) {
443 log.warn("Exception thrown handling put", e);
444 }
445 });
446 }
447 }
448
449 private final class InternalRemoveEventListener implements
450 ClusterMessageHandler {
451 @Override
452 public void handle(ClusterMessage message) {
453 log.debug("Received remove event from peer: {}", message.sender());
454 InternalRemoveEvent<K> event = serializer.decode(message.payload());
455
456 executor.submit(() -> {
457 try {
458 for (RemoveEntry<K> entry : event.entries()) {
459 K key = entry.key();
460 Timestamp timestamp = entry.timestamp();
461
462 if (removeInternal(key, timestamp)) {
463 EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
464 EventuallyConsistentMapEvent.Type.REMOVE,
465 key, null);
466 notifyListeners(externalEvent);
467 }
468 }
469 } catch (Exception e) {
470 log.warn("Exception thrown handling remove", e);
471 }
472 });
473 }
474 }
475
476 private static final class InternalPutEvent<K, V> {
477 private final List<PutEntry<K, V>> entries;
478
479 public InternalPutEvent(K key, V value, Timestamp timestamp) {
480 entries = Collections
481 .singletonList(new PutEntry<>(key, value, timestamp));
482 }
483
484 public InternalPutEvent(List<PutEntry<K, V>> entries) {
485 this.entries = checkNotNull(entries);
486 }
487
488 // Needed for serialization.
489 @SuppressWarnings("unused")
490 private InternalPutEvent() {
491 entries = null;
492 }
493
494 public List<PutEntry<K, V>> entries() {
495 return entries;
496 }
497 }
498
499 private static final class PutEntry<K, V> {
500 private final K key;
501 private final V value;
502 private final Timestamp timestamp;
503
504 public PutEntry(K key, V value, Timestamp timestamp) {
505 this.key = checkNotNull(key);
506 this.value = checkNotNull(value);
507 this.timestamp = checkNotNull(timestamp);
508 }
509
510 // Needed for serialization.
511 @SuppressWarnings("unused")
512 private PutEntry() {
513 this.key = null;
514 this.value = null;
515 this.timestamp = null;
516 }
517
518 public K key() {
519 return key;
520 }
521
522 public V value() {
523 return value;
524 }
525
526 public Timestamp timestamp() {
527 return timestamp;
528 }
529
530 public String toString() {
531 return MoreObjects.toStringHelper(getClass())
532 .add("key", key)
533 .add("value", value)
534 .add("timestamp", timestamp)
535 .toString();
536 }
537 }
538
539 private static final class InternalRemoveEvent<K> {
540 private final List<RemoveEntry<K>> entries;
541
542 public InternalRemoveEvent(K key, Timestamp timestamp) {
543 entries = Collections.singletonList(
544 new RemoveEntry<>(key, timestamp));
545 }
546
547 public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
548 this.entries = checkNotNull(entries);
549 }
550
551 // Needed for serialization.
552 @SuppressWarnings("unused")
553 private InternalRemoveEvent() {
554 entries = null;
555 }
556
557 public List<RemoveEntry<K>> entries() {
558 return entries;
559 }
560 }
561
562 private static final class RemoveEntry<K> {
563 private final K key;
564 private final Timestamp timestamp;
565
566 public RemoveEntry(K key, Timestamp timestamp) {
567 this.key = checkNotNull(key);
568 this.timestamp = checkNotNull(timestamp);
569 }
570
571 // Needed for serialization.
572 @SuppressWarnings("unused")
573 private RemoveEntry() {
574 this.key = null;
575 this.timestamp = null;
576 }
577
578 public K key() {
579 return key;
580 }
581
582 public Timestamp timestamp() {
583 return timestamp;
584 }
585 }
586}