blob: 298fe2f9f93e27c75c80dacbdeb6d3867b0a995e [file] [log] [blame]
Madan Jampani7c521002015-03-23 12:23:01 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.consistent.impl;
18
19import static com.google.common.base.Preconditions.*;
Madan Jampani50589ac2015-06-08 11:38:46 -070020import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani7c521002015-03-23 12:23:01 -070021
22import java.util.Collection;
23import java.util.Map;
24import java.util.Map.Entry;
Madan Jampani346d4f52015-05-04 11:09:39 -070025import java.util.Objects;
26import java.util.Optional;
Madan Jampani7c521002015-03-23 12:23:01 -070027import java.util.concurrent.CompletableFuture;
Madan Jampani50589ac2015-06-08 11:38:46 -070028import java.util.concurrent.CopyOnWriteArraySet;
Madan Jampani346d4f52015-05-04 11:09:39 -070029import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
Madan Jampani50589ac2015-06-08 11:38:46 -070031import java.util.function.Consumer;
Madan Jampani346d4f52015-05-04 11:09:39 -070032import java.util.function.Function;
33import java.util.function.Predicate;
Madan Jampani7c521002015-03-23 12:23:01 -070034import java.util.stream.Collectors;
35import java.util.Set;
36
37import org.apache.commons.lang3.tuple.Pair;
38import org.onlab.util.HexString;
Madan Jampani346d4f52015-05-04 11:09:39 -070039import org.onlab.util.Tools;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070040import org.onosproject.core.ApplicationId;
Madan Jampani7c521002015-03-23 12:23:01 -070041import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070042import org.onosproject.store.service.ConsistentMapException;
Madan Jampani50589ac2015-06-08 11:38:46 -070043import org.onosproject.store.service.MapEvent;
44import org.onosproject.store.service.MapEventListener;
Madan Jampani7c521002015-03-23 12:23:01 -070045import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.Versioned;
Madan Jampani50589ac2015-06-08 11:38:46 -070047import org.slf4j.Logger;
Madan Jampani7c521002015-03-23 12:23:01 -070048
49import com.google.common.cache.CacheBuilder;
50import com.google.common.cache.CacheLoader;
51import com.google.common.cache.LoadingCache;
52
53/**
54 * AsyncConsistentMap implementation that is backed by a Raft consensus
55 * based database.
56 *
57 * @param <K> type of key.
58 * @param <V> type of value.
59 */
60public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
61
62 private final String name;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070063 private final ApplicationId applicationId;
Madan Jampanif1b8e172015-03-23 11:42:02 -070064 private final Database database;
Madan Jampani7c521002015-03-23 12:23:01 -070065 private final Serializer serializer;
Madan Jampani02b7fb82015-05-01 13:01:20 -070066 private final boolean readOnly;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070067 private final boolean purgeOnUninstall;
Madan Jampani50589ac2015-06-08 11:38:46 -070068 private final Consumer<MapEvent<K, V>> eventPublisher;
69
70 private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
71
72 private final Logger log = getLogger(getClass());
Madan Jampani7c521002015-03-23 12:23:01 -070073
74 private static final String ERROR_NULL_KEY = "Key cannot be null";
75 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
76
77 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
78 .softValues()
79 .build(new CacheLoader<K, String>() {
80
81 @Override
82 public String load(K key) {
83 return HexString.toHexString(serializer.encode(key));
84 }
85 });
86
87 protected K dK(String key) {
88 return serializer.decode(HexString.fromHexString(key));
89 }
90
91 public DefaultAsyncConsistentMap(String name,
Madan Jampanie8af1cc2015-06-23 14:23:31 -070092 ApplicationId applicationId,
Madan Jampanif1b8e172015-03-23 11:42:02 -070093 Database database,
Madan Jampani02b7fb82015-05-01 13:01:20 -070094 Serializer serializer,
Madan Jampani50589ac2015-06-08 11:38:46 -070095 boolean readOnly,
Madan Jampanie8af1cc2015-06-23 14:23:31 -070096 boolean purgeOnUninstall,
Madan Jampani50589ac2015-06-08 11:38:46 -070097 Consumer<MapEvent<K, V>> eventPublisher) {
Madan Jampani7c521002015-03-23 12:23:01 -070098 this.name = checkNotNull(name, "map name cannot be null");
Madan Jampanie8af1cc2015-06-23 14:23:31 -070099 this.applicationId = applicationId;
Madan Jampanif1b8e172015-03-23 11:42:02 -0700100 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani7c521002015-03-23 12:23:01 -0700101 this.serializer = checkNotNull(serializer, "serializer cannot be null");
Madan Jampani02b7fb82015-05-01 13:01:20 -0700102 this.readOnly = readOnly;
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700103 this.purgeOnUninstall = purgeOnUninstall;
Madan Jampani50589ac2015-06-08 11:38:46 -0700104 this.eventPublisher = eventPublisher;
105 }
106
107 /**
108 * Returns this map name.
109 * @return map name
110 */
111 public String name() {
112 return name;
113 }
114
115 /**
116 * Returns the serializer for map entries.
117 * @return map entry serializer
118 */
119 public Serializer serializer() {
120 return serializer;
Madan Jampani7c521002015-03-23 12:23:01 -0700121 }
122
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700123 /**
124 * Returns the applicationId owning this map.
125 * @return application Id
126 */
127 public ApplicationId applicationId() {
128 return applicationId;
129 }
130
131 /**
132 * Returns whether the map entries should be purged when the application
133 * owning it is uninstalled.
134 * @return true is map needs to cleared on app uninstall; false otherwise
135 */
136 public boolean purgeOnUninstall() {
137 return purgeOnUninstall;
138 }
139
Madan Jampani7c521002015-03-23 12:23:01 -0700140 @Override
141 public CompletableFuture<Integer> size() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700142 return database.size(name);
Madan Jampani7c521002015-03-23 12:23:01 -0700143 }
144
145 @Override
146 public CompletableFuture<Boolean> isEmpty() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700147 return database.isEmpty(name);
Madan Jampani7c521002015-03-23 12:23:01 -0700148 }
149
150 @Override
151 public CompletableFuture<Boolean> containsKey(K key) {
152 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700153 return database.containsKey(name, keyCache.getUnchecked(key));
Madan Jampani7c521002015-03-23 12:23:01 -0700154 }
155
156 @Override
157 public CompletableFuture<Boolean> containsValue(V value) {
158 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700159 return database.containsValue(name, serializer.encode(value));
Madan Jampani7c521002015-03-23 12:23:01 -0700160 }
161
162 @Override
163 public CompletableFuture<Versioned<V>> get(K key) {
164 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700165 return database.get(name, keyCache.getUnchecked(key))
Madan Jampani7c521002015-03-23 12:23:01 -0700166 .thenApply(v -> v != null
167 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
168 }
169
170 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700171 public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
172 Function<? super K, ? extends V> mappingFunction) {
173 return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
174 }
175
176 @Override
177 public CompletableFuture<Versioned<V>> computeIfPresent(K key,
178 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
179 return computeIf(key, Objects::nonNull, remappingFunction);
180 }
181
182 @Override
183 public CompletableFuture<Versioned<V>> compute(K key,
184 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
185 return computeIf(key, v -> true, remappingFunction);
186 }
187
188 @Override
189 public CompletableFuture<Versioned<V>> computeIf(K key,
190 Predicate<? super V> condition,
191 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
192 checkNotNull(key, ERROR_NULL_KEY);
193 checkNotNull(condition, "predicate function cannot be null");
194 checkNotNull(remappingFunction, "Remapping function cannot be null");
Madan Jampani50589ac2015-06-08 11:38:46 -0700195 AtomicReference<MapEvent<K, V>> mapEvent = new AtomicReference<>();
Madan Jampani346d4f52015-05-04 11:09:39 -0700196 return get(key).thenCompose(r1 -> {
197 V existingValue = r1 == null ? null : r1.value();
198 // if the condition evaluates to false, return existing value.
199 if (!condition.test(existingValue)) {
200 return CompletableFuture.completedFuture(r1);
201 }
202
203 AtomicReference<V> computedValue = new AtomicReference<>();
204 // if remappingFunction throws an exception, return the exception.
205 try {
206 computedValue.set(remappingFunction.apply(key, existingValue));
207 } catch (Exception e) {
208 return Tools.exceptionalFuture(e);
209 }
210
211 // if the computed value is null, remove current value if one exists.
212 // throw an exception if concurrent modification is detected.
213 if (computedValue.get() == null) {
214 if (r1 != null) {
215 return remove(key, r1.version()).thenApply(result -> {
216 if (result) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700217 mapEvent.set(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r1));
Madan Jampani346d4f52015-05-04 11:09:39 -0700218 return null;
219 } else {
220 throw new ConsistentMapException.ConcurrentModification();
221 }
222 });
223 } else {
224 return CompletableFuture.completedFuture(null);
225 }
226 } else {
227 // replace current value; throw an exception if concurrent modification is detected
228 if (r1 != null) {
229 return replaceAndGet(key, r1.version(), computedValue.get())
230 .thenApply(v -> {
231 if (v.isPresent()) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700232 mapEvent.set(new MapEvent<>(name, MapEvent.Type.UPDATE, key, v.get()));
Madan Jampani346d4f52015-05-04 11:09:39 -0700233 return v.get();
234 } else {
235 throw new ConsistentMapException.ConcurrentModification();
236 }
237 });
238 } else {
239 return putIfAbsentAndGet(key, computedValue.get()).thenApply(result -> {
240 if (!result.isPresent()) {
241 throw new ConsistentMapException.ConcurrentModification();
242 } else {
Madan Jampani50589ac2015-06-08 11:38:46 -0700243 mapEvent.set(new MapEvent<>(name, MapEvent.Type.INSERT, key, result.get()));
Madan Jampani346d4f52015-05-04 11:09:39 -0700244 return result.get();
245 }
246 });
247 }
248 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700249 }).whenComplete((result, error) -> notifyListeners(mapEvent.get()));
Madan Jampani346d4f52015-05-04 11:09:39 -0700250 }
251
252 @Override
Madan Jampani7c521002015-03-23 12:23:01 -0700253 public CompletableFuture<Versioned<V>> put(K key, V value) {
254 checkNotNull(key, ERROR_NULL_KEY);
255 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700256 checkIfUnmodifiable();
Madan Jampanif1b8e172015-03-23 11:42:02 -0700257 return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
Madan Jampani346d4f52015-05-04 11:09:39 -0700258 .thenApply(this::unwrapResult)
259 .thenApply(v -> v != null
260 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
261 }
262
263 @Override
264 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
265 checkNotNull(key, ERROR_NULL_KEY);
266 checkNotNull(value, ERROR_NULL_VALUE);
267 checkIfUnmodifiable();
268 return database.putAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700269 .thenApply(this::unwrapResult)
Madan Jampani346d4f52015-05-04 11:09:39 -0700270 .thenApply(v -> {
271 Versioned<byte[]> rawNewValue = v.newValue();
272 return new Versioned<>(serializer.decode(rawNewValue.value()),
273 rawNewValue.version(),
274 rawNewValue.creationTime());
275 });
276 }
277
278 @Override
279 public CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value) {
280 checkNotNull(key, ERROR_NULL_KEY);
281 checkNotNull(value, ERROR_NULL_VALUE);
282 checkIfUnmodifiable();
283 return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
284 .thenApply(this::unwrapResult)
285 .thenApply(v -> {
286 if (v.updated()) {
287 Versioned<byte[]> rawNewValue = v.newValue();
288 return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
289 rawNewValue.version(),
290 rawNewValue.creationTime()));
291 } else {
292 return Optional.empty();
293 }
294 });
Madan Jampani7c521002015-03-23 12:23:01 -0700295 }
296
297 @Override
298 public CompletableFuture<Versioned<V>> remove(K key) {
299 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700300 checkIfUnmodifiable();
Madan Jampanif1b8e172015-03-23 11:42:02 -0700301 return database.remove(name, keyCache.getUnchecked(key))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700302 .thenApply(this::unwrapResult)
Madan Jampanic6069562015-06-12 13:22:45 -0700303 .thenApply(v -> v != null ? v.<V>map(serializer::decode) : null)
304 .whenComplete((r, e) -> {
305 if (r != null) {
306 notifyListeners(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r));
307 }
308 });
Madan Jampani7c521002015-03-23 12:23:01 -0700309 }
310
311 @Override
312 public CompletableFuture<Void> clear() {
Madan Jampani02b7fb82015-05-01 13:01:20 -0700313 checkIfUnmodifiable();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700314 return database.clear(name).thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700315 }
316
317 @Override
318 public CompletableFuture<Set<K>> keySet() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700319 return database.keySet(name)
Madan Jampani7c521002015-03-23 12:23:01 -0700320 .thenApply(s -> s
321 .stream()
322 .map(this::dK)
323 .collect(Collectors.toSet()));
324 }
325
326 @Override
327 public CompletableFuture<Collection<Versioned<V>>> values() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700328 return database.values(name).thenApply(c -> c
Madan Jampani7c521002015-03-23 12:23:01 -0700329 .stream()
330 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
331 .collect(Collectors.toList()));
332 }
333
334 @Override
335 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700336 return database.entrySet(name).thenApply(s -> s
Madan Jampani7c521002015-03-23 12:23:01 -0700337 .stream()
338 .map(this::fromRawEntry)
339 .collect(Collectors.toSet()));
340 }
341
342 @Override
343 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
344 checkNotNull(key, ERROR_NULL_KEY);
345 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700346 checkIfUnmodifiable();
Madan Jampanic6069562015-06-12 13:22:45 -0700347 AtomicReference<MapEvent<K, V>> event = new AtomicReference<>();
348 return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
Madan Jampani346d4f52015-05-04 11:09:39 -0700349 .thenApply(this::unwrapResult)
Madan Jampanic6069562015-06-12 13:22:45 -0700350 .whenComplete((r, e) -> {
351 if (r != null && r.updated()) {
352 event.set(new MapEvent<K, V>(name,
353 MapEvent.Type.INSERT,
354 key,
355 r.newValue().<V>map(serializer::decode)));
356 }
357 })
358 .thenApply(v -> v.updated() ? null : v.oldValue().<V>map(serializer::decode))
359 .whenComplete((r, e) -> notifyListeners(event.get()));
Madan Jampani7c521002015-03-23 12:23:01 -0700360 }
361
362 @Override
363 public CompletableFuture<Boolean> remove(K key, V value) {
364 checkNotNull(key, ERROR_NULL_KEY);
365 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700366 checkIfUnmodifiable();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700367 return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
Madan Jampani346d4f52015-05-04 11:09:39 -0700368 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700369 }
370
371 @Override
372 public CompletableFuture<Boolean> remove(K key, long version) {
373 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700374 checkIfUnmodifiable();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700375 return database.remove(name, keyCache.getUnchecked(key), version)
Madan Jampani346d4f52015-05-04 11:09:39 -0700376 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700377
378 }
379
380 @Override
381 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
382 checkNotNull(key, ERROR_NULL_KEY);
383 checkNotNull(newValue, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700384 checkIfUnmodifiable();
Madan Jampani7c521002015-03-23 12:23:01 -0700385 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700386 return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
Madan Jampani346d4f52015-05-04 11:09:39 -0700387 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700388 }
389
390 @Override
391 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700392 return replaceAndGet(key, oldVersion, newValue).thenApply(Optional::isPresent);
393 }
394
395 @Override
396 public CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue) {
Madan Jampani7c521002015-03-23 12:23:01 -0700397 checkNotNull(key, ERROR_NULL_KEY);
398 checkNotNull(newValue, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700399 checkIfUnmodifiable();
Madan Jampani346d4f52015-05-04 11:09:39 -0700400 return database.replaceAndGet(name,
401 keyCache.getUnchecked(key),
402 oldVersion,
403 serializer.encode(newValue))
404 .thenApply(this::unwrapResult)
405 .thenApply(v -> {
406 if (v.updated()) {
407 Versioned<byte[]> rawNewValue = v.newValue();
408 return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
409 rawNewValue.version(),
410 rawNewValue.creationTime()));
411 } else {
412 return Optional.empty();
413 }
414 });
Madan Jampani7c521002015-03-23 12:23:01 -0700415 }
416
417 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
418 return Pair.of(
419 dK(e.getKey()),
420 new Versioned<>(
421 serializer.decode(e.getValue().value()),
422 e.getValue().version(),
423 e.getValue().creationTime()));
424 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700425
426 private <T> T unwrapResult(Result<T> result) {
427 if (result.status() == Result.Status.LOCKED) {
428 throw new ConsistentMapException.ConcurrentModification();
429 } else if (result.success()) {
430 return result.value();
431 } else {
432 throw new IllegalStateException("Must not be here");
433 }
434 }
Madan Jampani02b7fb82015-05-01 13:01:20 -0700435
436 private void checkIfUnmodifiable() {
437 if (readOnly) {
438 throw new UnsupportedOperationException();
439 }
440 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700441
442 @Override
443 public void addListener(MapEventListener<K, V> listener) {
444 listeners.add(listener);
445 }
446
447 @Override
448 public void removeListener(MapEventListener<K, V> listener) {
449 listeners.remove(listener);
450 }
451
452 protected void notifyListeners(MapEvent<K, V> event) {
453 try {
454 if (event != null) {
455 notifyLocalListeners(event);
456 notifyRemoteListeners(event);
457 }
458 } catch (Exception e) {
459 log.warn("Failure notifying listeners about {}", event, e);
460 }
461 }
462
463 protected void notifyLocalListeners(MapEvent<K, V> event) {
464 listeners.forEach(listener -> listener.event(event));
465 }
466
467 protected void notifyRemoteListeners(MapEvent<K, V> event) {
468 if (eventPublisher != null) {
469 eventPublisher.accept(event);
470 }
471 }
Madan Jampani7c521002015-03-23 12:23:01 -0700472}