blob: b011517fe579ec76b6157f0a5c88f5e18f9f88eb [file] [log] [blame]
Jonathan Hartdb3af892015-01-26 13:19:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
18import com.google.common.base.MoreObjects;
19import org.onlab.util.KryoNamespace;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.store.Timestamp;
23import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
24import org.onosproject.store.cluster.messaging.ClusterMessage;
25import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
26import org.onosproject.store.cluster.messaging.MessageSubject;
27import org.onosproject.store.serializers.KryoSerializer;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import java.io.IOException;
32import java.util.ArrayList;
33import java.util.Collection;
34import java.util.Collections;
35import java.util.List;
36import java.util.Map;
37import java.util.Set;
38import java.util.concurrent.ConcurrentHashMap;
39import java.util.concurrent.CopyOnWriteArraySet;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42import java.util.concurrent.ScheduledExecutorService;
43import java.util.stream.Collectors;
44
45import static com.google.common.base.Preconditions.checkNotNull;
46import static com.google.common.base.Preconditions.checkState;
47import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
48import static org.onlab.util.Tools.minPriority;
49import static org.onlab.util.Tools.namedThreads;
50
51/**
52 * Distributed Map implementation which uses optimistic replication and gossip
53 * based techniques to provide an eventually consistent data store.
54 */
55public class EventuallyConsistentMapImpl<K, V>
56 implements EventuallyConsistentMap<K, V> {
57
58 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
59
60 private final Map<K, Timestamped<V>> items;
61 private final Map<K, Timestamp> removedItems;
62
63 private final String mapName;
64 private final ClusterService clusterService;
65 private final ClusterCommunicationService clusterCommunicator;
66 private final KryoSerializer serializer;
67
68 private final ClockService<K> clockService;
69
70 private final MessageSubject updateMessageSubject;
71 private final MessageSubject removeMessageSubject;
72
73 private final Set<EventuallyConsistentMapListener> listeners
74 = new CopyOnWriteArraySet<>();
75
76 private final ExecutorService executor;
77
78 private final ScheduledExecutorService backgroundExecutor;
79
80 private volatile boolean destroyed = false;
81 private static final String ERROR_DESTROYED = " is already destroyed";
82
83 // TODO: Make these anti-entropy params configurable
84 private long initialDelaySec = 5;
85 private long periodSec = 5;
86
87 /**
88 * Creates a new eventually consistent map shared amongst multiple instances.
89 *
90 * Each map is identified by a string map name. EventuallyConsistentMapImpl
91 * objects in different JVMs that use the same map name will form a
92 * distributed map across JVMs (provided the cluster service is aware of
93 * both nodes).
94 *
95 * The client is expected to provide an
96 * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
97 * will be stored in this map have been registered (including referenced
98 * classes). This serializer will be used to serialize both K and V for
99 * inter-node notifications.
100 *
101 * The client must provide an {@link org.onosproject.store.impl.ClockService}
102 * which can generate timestamps for a given key. The clock service is free
103 * to generate timestamps however it wishes, however these timestamps will
104 * be used to serialize updates to the map so they must be strict enough
105 * to ensure updates are properly ordered for the use case (i.e. in some
106 * cases wallclock time will suffice, whereas in other cases logical time
107 * will be necessary).
108 *
109 * @param mapName a String identifier for the map.
110 * @param clusterService the cluster service
111 * @param clusterCommunicator the cluster communications service
112 * @param serializerBuilder a Kryo namespace builder that can serialize
113 * both K and V
114 * @param clockService a clock service able to generate timestamps
115 * for K
116 */
117 public EventuallyConsistentMapImpl(String mapName,
118 ClusterService clusterService,
119 ClusterCommunicationService clusterCommunicator,
120 KryoNamespace.Builder serializerBuilder,
121 ClockService<K> clockService) {
122
123 this.mapName = checkNotNull(mapName);
124 this.clusterService = checkNotNull(clusterService);
125 this.clusterCommunicator = checkNotNull(clusterCommunicator);
126
127 serializer = createSerializer(checkNotNull(serializerBuilder));
128
129 this.clockService = checkNotNull(clockService);
130
131 items = new ConcurrentHashMap<>();
132 removedItems = new ConcurrentHashMap<>();
133
134 executor = Executors
135 .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
136
137 backgroundExecutor =
138 newSingleThreadScheduledExecutor(minPriority(
139 namedThreads("onos-ecm-" + mapName + "-bg-%d")));
140
141 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
142 clusterCommunicator.addSubscriber(updateMessageSubject,
143 new InternalPutEventListener());
144 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
145 clusterCommunicator.addSubscriber(removeMessageSubject,
146 new InternalRemoveEventListener());
147 }
148
149 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
150 return new KryoSerializer() {
151 @Override
152 protected void setupKryoPool() {
153 // Add the map's internal helper classes to the user-supplied serializer
154 serializerPool = builder
155 .register(WallClockTimestamp.class)
156 .register(PutEntry.class)
157 .register(ArrayList.class)
158 .register(InternalPutEvent.class)
159 .register(InternalRemoveEvent.class)
160 .build();
161
162 // TODO anti-entropy classes
163 }
164 };
165 }
166
167 @Override
168 public int size() {
169 checkState(destroyed, mapName + ERROR_DESTROYED);
170 return items.size();
171 }
172
173 @Override
174 public boolean isEmpty() {
175 checkState(destroyed, mapName + ERROR_DESTROYED);
176 return items.isEmpty();
177 }
178
179 @Override
180 public boolean containsKey(K key) {
181 checkState(destroyed, mapName + ERROR_DESTROYED);
182 return items.containsKey(key);
183 }
184
185 @Override
186 public boolean containsValue(V value) {
187 checkState(destroyed, mapName + ERROR_DESTROYED);
188
189 return items.values().stream()
190 .anyMatch(timestamped -> timestamped.value().equals(value));
191 }
192
193 @Override
194 public V get(K key) {
195 checkState(destroyed, mapName + ERROR_DESTROYED);
196
197 Timestamped<V> value = items.get(key);
198 if (value != null) {
199 return value.value();
200 }
201 return null;
202 }
203
204 @Override
205 public void put(K key, V value) {
206 checkState(destroyed, mapName + ERROR_DESTROYED);
207
208 Timestamp timestamp = clockService.getTimestamp(key);
209 if (putInternal(key, value, timestamp)) {
210 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
211 EventuallyConsistentMapEvent<K, V> externalEvent
212 = new EventuallyConsistentMapEvent<>(
213 EventuallyConsistentMapEvent.Type.PUT, key, value);
214 notifyListeners(externalEvent);
215 }
216 }
217
218 private boolean putInternal(K key, V value, Timestamp timestamp) {
219 synchronized (this) {
220 Timestamp removed = removedItems.get(key);
221 if (removed != null && removed.compareTo(timestamp) > 0) {
222 return false;
223 }
224
225 Timestamped<V> existing = items.get(key);
226 if (existing != null && existing.isNewer(timestamp)) {
227 return false;
228 } else {
229 items.put(key, new Timestamped<>(value, timestamp));
230 removedItems.remove(key);
231 return true;
232 }
233 }
234 }
235
236 @Override
237 public void remove(K key) {
238 checkState(destroyed, mapName + ERROR_DESTROYED);
239
240 Timestamp timestamp = clockService.getTimestamp(key);
241 if (removeInternal(key, timestamp)) {
242 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
243 EventuallyConsistentMapEvent<K, V> externalEvent
244 = new EventuallyConsistentMapEvent<>(
245 EventuallyConsistentMapEvent.Type.REMOVE, key, null);
246 notifyListeners(externalEvent);
247 }
248 }
249
250 private boolean removeInternal(K key, Timestamp timestamp) {
251 synchronized (this) {
252 if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
253 return false;
254 }
255
256 items.remove(key);
257 removedItems.put(key, timestamp);
258 return true;
259 }
260 }
261
262 @Override
263 public void putAll(Map<? extends K, ? extends V> m) {
264 checkState(destroyed, mapName + ERROR_DESTROYED);
265
266 List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
267
268 for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
269 K key = entry.getKey();
270 V value = entry.getValue();
271 Timestamp timestamp = clockService.getTimestamp(entry.getKey());
272
273 if (putInternal(key, value, timestamp)) {
274 updates.add(new PutEntry<>(key, value, timestamp));
275 }
276 }
277
278 notifyPeers(new InternalPutEvent<>(updates));
279
280 for (PutEntry<K, V> entry : updates) {
281 EventuallyConsistentMapEvent<K, V> externalEvent =
282 new EventuallyConsistentMapEvent<>(
283 EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
284 notifyListeners(externalEvent);
285 }
286 }
287
288 @Override
289 public void clear() {
290 checkState(destroyed, mapName + ERROR_DESTROYED);
291
292 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
293
294 for (K key : items.keySet()) {
295 Timestamp timestamp = clockService.getTimestamp(key);
296
297 if (removeInternal(key, timestamp)) {
298 removed.add(new RemoveEntry<>(key, timestamp));
299 }
300 }
301
302 notifyPeers(new InternalRemoveEvent<>(removed));
303
304 for (RemoveEntry<K> entry : removed) {
305 EventuallyConsistentMapEvent<K, V> externalEvent =
306 new EventuallyConsistentMapEvent<>(
307 EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
308 notifyListeners(externalEvent);
309 }
310 }
311
312 @Override
313 public Set<K> keySet() {
314 checkState(destroyed, mapName + ERROR_DESTROYED);
315
316 return items.keySet();
317 }
318
319 @Override
320 public Collection<V> values() {
321 checkState(destroyed, mapName + ERROR_DESTROYED);
322
323 return items.values().stream()
324 .map(Timestamped::value)
325 .collect(Collectors.toList());
326 }
327
328 @Override
329 public Set<Map.Entry<K, V>> entrySet() {
330 checkState(destroyed, mapName + ERROR_DESTROYED);
331
332 return items.entrySet().stream()
333 .map(e -> new Entry(e.getKey(), e.getValue().value()))
334 .collect(Collectors.toSet());
335 }
336
337 @Override
338 public void addListener(EventuallyConsistentMapListener listener) {
339 checkState(destroyed, mapName + ERROR_DESTROYED);
340
341 listeners.add(checkNotNull(listener));
342 }
343
344 @Override
345 public void removeListener(EventuallyConsistentMapListener listener) {
346 checkState(destroyed, mapName + ERROR_DESTROYED);
347
348 listeners.remove(checkNotNull(listener));
349 }
350
351 @Override
352 public void destroy() {
353 destroyed = true;
354
355 executor.shutdown();
356 backgroundExecutor.shutdown();
357
358 clusterCommunicator.removeSubscriber(updateMessageSubject);
359 clusterCommunicator.removeSubscriber(removeMessageSubject);
360 }
361
362 private void notifyListeners(EventuallyConsistentMapEvent event) {
363 for (EventuallyConsistentMapListener listener : listeners) {
364 listener.event(event);
365 }
366 }
367
368 private void notifyPeers(InternalPutEvent event) {
369 try {
370 log.debug("sending put {}", event);
371 broadcastMessage(updateMessageSubject, event);
372 } catch (IOException e) {
373 // TODO this won't happen; remove from API
374 log.debug("IOException broadcasting update", e);
375 }
376 }
377
378 private void notifyPeers(InternalRemoveEvent event) {
379 try {
380 broadcastMessage(removeMessageSubject, event);
381 } catch (IOException e) {
382 // TODO this won't happen; remove from API
383 log.debug("IOException broadcasting update", e);
384 }
385 }
386
387 private void broadcastMessage(MessageSubject subject, Object event) throws
388 IOException {
389 ClusterMessage message = new ClusterMessage(
390 clusterService.getLocalNode().id(),
391 subject,
392 serializer.encode(event));
393 clusterCommunicator.broadcast(message);
394 }
395
396 private void unicastMessage(NodeId peer,
397 MessageSubject subject,
398 Object event) throws IOException {
399 ClusterMessage message = new ClusterMessage(
400 clusterService.getLocalNode().id(),
401 subject,
402 serializer.encode(event));
403 clusterCommunicator.unicast(message, peer);
404 }
405
406 private final class Entry implements Map.Entry<K, V> {
407
408 private final K key;
409 private final V value;
410
411 public Entry(K key, V value) {
412 this.key = key;
413 this.value = value;
414 }
415
416 @Override
417 public K getKey() {
418 return key;
419 }
420
421 @Override
422 public V getValue() {
423 return value;
424 }
425
426 @Override
427 public V setValue(V value) {
428 throw new UnsupportedOperationException();
429 }
430 }
431
432 private final class InternalPutEventListener implements
433 ClusterMessageHandler {
434 @Override
435 public void handle(ClusterMessage message) {
436 log.debug("Received put event from peer: {}", message.sender());
437 InternalPutEvent<K, V> event = serializer.decode(message.payload());
438
439 executor.submit(() -> {
440 try {
441 for (PutEntry<K, V> entry : event.entries()) {
442 K key = entry.key();
443 V value = entry.value();
444 Timestamp timestamp = entry.timestamp();
445
446 if (putInternal(key, value, timestamp)) {
447 EventuallyConsistentMapEvent externalEvent =
448 new EventuallyConsistentMapEvent<>(
449 EventuallyConsistentMapEvent.Type.PUT, key,
450 value);
451 notifyListeners(externalEvent);
452 }
453 }
454 } catch (Exception e) {
455 log.warn("Exception thrown handling put", e);
456 }
457 });
458 }
459 }
460
461 private final class InternalRemoveEventListener implements
462 ClusterMessageHandler {
463 @Override
464 public void handle(ClusterMessage message) {
465 log.debug("Received remove event from peer: {}", message.sender());
466 InternalRemoveEvent<K> event = serializer.decode(message.payload());
467
468 executor.submit(() -> {
469 try {
470 for (RemoveEntry<K> entry : event.entries()) {
471 K key = entry.key();
472 Timestamp timestamp = entry.timestamp();
473
474 if (removeInternal(key, timestamp)) {
475 EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
476 EventuallyConsistentMapEvent.Type.REMOVE,
477 key, null);
478 notifyListeners(externalEvent);
479 }
480 }
481 } catch (Exception e) {
482 log.warn("Exception thrown handling remove", e);
483 }
484 });
485 }
486 }
487
488 private static final class InternalPutEvent<K, V> {
489 private final List<PutEntry<K, V>> entries;
490
491 public InternalPutEvent(K key, V value, Timestamp timestamp) {
492 entries = Collections
493 .singletonList(new PutEntry<>(key, value, timestamp));
494 }
495
496 public InternalPutEvent(List<PutEntry<K, V>> entries) {
497 this.entries = checkNotNull(entries);
498 }
499
500 // Needed for serialization.
501 @SuppressWarnings("unused")
502 private InternalPutEvent() {
503 entries = null;
504 }
505
506 public List<PutEntry<K, V>> entries() {
507 return entries;
508 }
509 }
510
511 private static final class PutEntry<K, V> {
512 private final K key;
513 private final V value;
514 private final Timestamp timestamp;
515
516 public PutEntry(K key, V value, Timestamp timestamp) {
517 this.key = checkNotNull(key);
518 this.value = checkNotNull(value);
519 this.timestamp = checkNotNull(timestamp);
520 }
521
522 // Needed for serialization.
523 @SuppressWarnings("unused")
524 private PutEntry() {
525 this.key = null;
526 this.value = null;
527 this.timestamp = null;
528 }
529
530 public K key() {
531 return key;
532 }
533
534 public V value() {
535 return value;
536 }
537
538 public Timestamp timestamp() {
539 return timestamp;
540 }
541
542 public String toString() {
543 return MoreObjects.toStringHelper(getClass())
544 .add("key", key)
545 .add("value", value)
546 .add("timestamp", timestamp)
547 .toString();
548 }
549 }
550
551 private static final class InternalRemoveEvent<K> {
552 private final List<RemoveEntry<K>> entries;
553
554 public InternalRemoveEvent(K key, Timestamp timestamp) {
555 entries = Collections.singletonList(
556 new RemoveEntry<>(key, timestamp));
557 }
558
559 public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
560 this.entries = checkNotNull(entries);
561 }
562
563 // Needed for serialization.
564 @SuppressWarnings("unused")
565 private InternalRemoveEvent() {
566 entries = null;
567 }
568
569 public List<RemoveEntry<K>> entries() {
570 return entries;
571 }
572 }
573
574 private static final class RemoveEntry<K> {
575 private final K key;
576 private final Timestamp timestamp;
577
578 public RemoveEntry(K key, Timestamp timestamp) {
579 this.key = checkNotNull(key);
580 this.timestamp = checkNotNull(timestamp);
581 }
582
583 // Needed for serialization.
584 @SuppressWarnings("unused")
585 private RemoveEntry() {
586 this.key = null;
587 this.timestamp = null;
588 }
589
590 public K key() {
591 return key;
592 }
593
594 public Timestamp timestamp() {
595 return timestamp;
596 }
597 }
598}