| /* |
| * Copyright 2015 Open Networking Laboratory |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.onosproject.store.consistent.impl; |
| |
| import static com.google.common.base.Preconditions.*; |
| |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.BiFunction; |
| import java.util.function.Function; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| import java.util.Set; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.onlab.util.HexString; |
| import org.onlab.util.Tools; |
| import org.onosproject.store.service.AsyncConsistentMap; |
| import org.onosproject.store.service.ConsistentMapException; |
| import org.onosproject.store.service.Serializer; |
| import org.onosproject.store.service.Versioned; |
| |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| |
| /** |
| * AsyncConsistentMap implementation that is backed by a Raft consensus |
| * based database. |
| * |
| * @param <K> type of key. |
| * @param <V> type of value. |
| */ |
| public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> { |
| |
| private final String name; |
| private final Database database; |
| private final Serializer serializer; |
| private final boolean readOnly; |
| |
| private static final String ERROR_NULL_KEY = "Key cannot be null"; |
| private static final String ERROR_NULL_VALUE = "Null values are not allowed"; |
| |
| private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder() |
| .softValues() |
| .build(new CacheLoader<K, String>() { |
| |
| @Override |
| public String load(K key) { |
| return HexString.toHexString(serializer.encode(key)); |
| } |
| }); |
| |
| protected K dK(String key) { |
| return serializer.decode(HexString.fromHexString(key)); |
| } |
| |
| public DefaultAsyncConsistentMap(String name, |
| Database database, |
| Serializer serializer, |
| boolean readOnly) { |
| this.name = checkNotNull(name, "map name cannot be null"); |
| this.database = checkNotNull(database, "database cannot be null"); |
| this.serializer = checkNotNull(serializer, "serializer cannot be null"); |
| this.readOnly = readOnly; |
| } |
| |
| @Override |
| public CompletableFuture<Integer> size() { |
| return database.size(name); |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> isEmpty() { |
| return database.isEmpty(name); |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> containsKey(K key) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| return database.containsKey(name, keyCache.getUnchecked(key)); |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> containsValue(V value) { |
| checkNotNull(value, ERROR_NULL_VALUE); |
| return database.containsValue(name, serializer.encode(value)); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> get(K key) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| return database.get(name, keyCache.getUnchecked(key)) |
| .thenApply(v -> v != null |
| ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> computeIfAbsent(K key, |
| Function<? super K, ? extends V> mappingFunction) { |
| return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k)); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> computeIfPresent(K key, |
| BiFunction<? super K, ? super V, ? extends V> remappingFunction) { |
| return computeIf(key, Objects::nonNull, remappingFunction); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> compute(K key, |
| BiFunction<? super K, ? super V, ? extends V> remappingFunction) { |
| return computeIf(key, v -> true, remappingFunction); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> computeIf(K key, |
| Predicate<? super V> condition, |
| BiFunction<? super K, ? super V, ? extends V> remappingFunction) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkNotNull(condition, "predicate function cannot be null"); |
| checkNotNull(remappingFunction, "Remapping function cannot be null"); |
| return get(key).thenCompose(r1 -> { |
| V existingValue = r1 == null ? null : r1.value(); |
| // if the condition evaluates to false, return existing value. |
| if (!condition.test(existingValue)) { |
| return CompletableFuture.completedFuture(r1); |
| } |
| |
| AtomicReference<V> computedValue = new AtomicReference<>(); |
| // if remappingFunction throws an exception, return the exception. |
| try { |
| computedValue.set(remappingFunction.apply(key, existingValue)); |
| } catch (Exception e) { |
| return Tools.exceptionalFuture(e); |
| } |
| |
| // if the computed value is null, remove current value if one exists. |
| // throw an exception if concurrent modification is detected. |
| if (computedValue.get() == null) { |
| if (r1 != null) { |
| return remove(key, r1.version()).thenApply(result -> { |
| if (result) { |
| return null; |
| } else { |
| throw new ConsistentMapException.ConcurrentModification(); |
| } |
| }); |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| } else { |
| // replace current value; throw an exception if concurrent modification is detected |
| if (r1 != null) { |
| return replaceAndGet(key, r1.version(), computedValue.get()) |
| .thenApply(v -> { |
| if (v.isPresent()) { |
| return v.get(); |
| } else { |
| throw new ConsistentMapException.ConcurrentModification(); |
| } |
| }); |
| } else { |
| return putIfAbsentAndGet(key, computedValue.get()).thenApply(result -> { |
| if (!result.isPresent()) { |
| throw new ConsistentMapException.ConcurrentModification(); |
| } else { |
| return result.get(); |
| } |
| }); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> put(K key, V value) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkNotNull(value, ERROR_NULL_VALUE); |
| checkIfUnmodifiable(); |
| return database.put(name, keyCache.getUnchecked(key), serializer.encode(value)) |
| .thenApply(this::unwrapResult) |
| .thenApply(v -> v != null |
| ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> putAndGet(K key, V value) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkNotNull(value, ERROR_NULL_VALUE); |
| checkIfUnmodifiable(); |
| return database.putAndGet(name, keyCache.getUnchecked(key), serializer.encode(value)) |
| .thenApply(this::unwrapResult) |
| .thenApply(v -> { |
| Versioned<byte[]> rawNewValue = v.newValue(); |
| return new Versioned<>(serializer.decode(rawNewValue.value()), |
| rawNewValue.version(), |
| rawNewValue.creationTime()); |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkNotNull(value, ERROR_NULL_VALUE); |
| checkIfUnmodifiable(); |
| return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value)) |
| .thenApply(this::unwrapResult) |
| .thenApply(v -> { |
| if (v.updated()) { |
| Versioned<byte[]> rawNewValue = v.newValue(); |
| return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()), |
| rawNewValue.version(), |
| rawNewValue.creationTime())); |
| } else { |
| return Optional.empty(); |
| } |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> remove(K key) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkIfUnmodifiable(); |
| return database.remove(name, keyCache.getUnchecked(key)) |
| .thenApply(this::unwrapResult) |
| .thenApply(v -> v != null |
| ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); |
| } |
| |
| @Override |
| public CompletableFuture<Void> clear() { |
| checkIfUnmodifiable(); |
| return database.clear(name).thenApply(this::unwrapResult); |
| } |
| |
| @Override |
| public CompletableFuture<Set<K>> keySet() { |
| return database.keySet(name) |
| .thenApply(s -> s |
| .stream() |
| .map(this::dK) |
| .collect(Collectors.toSet())); |
| } |
| |
| @Override |
| public CompletableFuture<Collection<Versioned<V>>> values() { |
| return database.values(name).thenApply(c -> c |
| .stream() |
| .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime())) |
| .collect(Collectors.toList())); |
| } |
| |
| @Override |
| public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() { |
| return database.entrySet(name).thenApply(s -> s |
| .stream() |
| .map(this::fromRawEntry) |
| .collect(Collectors.toSet())); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkNotNull(value, ERROR_NULL_VALUE); |
| checkIfUnmodifiable(); |
| return database.putIfAbsent(name, |
| keyCache.getUnchecked(key), |
| serializer.encode(value)) |
| .thenApply(this::unwrapResult) |
| .thenApply(v -> v != null ? |
| new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> remove(K key, V value) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkNotNull(value, ERROR_NULL_VALUE); |
| checkIfUnmodifiable(); |
| return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value)) |
| .thenApply(this::unwrapResult); |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> remove(K key, long version) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkIfUnmodifiable(); |
| return database.remove(name, keyCache.getUnchecked(key), version) |
| .thenApply(this::unwrapResult); |
| |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkNotNull(newValue, ERROR_NULL_VALUE); |
| checkIfUnmodifiable(); |
| byte[] existing = oldValue != null ? serializer.encode(oldValue) : null; |
| return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)) |
| .thenApply(this::unwrapResult); |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) { |
| return replaceAndGet(key, oldVersion, newValue).thenApply(Optional::isPresent); |
| } |
| |
| @Override |
| public CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue) { |
| checkNotNull(key, ERROR_NULL_KEY); |
| checkNotNull(newValue, ERROR_NULL_VALUE); |
| checkIfUnmodifiable(); |
| return database.replaceAndGet(name, |
| keyCache.getUnchecked(key), |
| oldVersion, |
| serializer.encode(newValue)) |
| .thenApply(this::unwrapResult) |
| .thenApply(v -> { |
| if (v.updated()) { |
| Versioned<byte[]> rawNewValue = v.newValue(); |
| return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()), |
| rawNewValue.version(), |
| rawNewValue.creationTime())); |
| } else { |
| return Optional.empty(); |
| } |
| }); |
| } |
| |
| private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) { |
| return Pair.of( |
| dK(e.getKey()), |
| new Versioned<>( |
| serializer.decode(e.getValue().value()), |
| e.getValue().version(), |
| e.getValue().creationTime())); |
| } |
| |
| private <T> T unwrapResult(Result<T> result) { |
| if (result.status() == Result.Status.LOCKED) { |
| throw new ConsistentMapException.ConcurrentModification(); |
| } else if (result.success()) { |
| return result.value(); |
| } else { |
| throw new IllegalStateException("Must not be here"); |
| } |
| } |
| |
| private void checkIfUnmodifiable() { |
| if (readOnly) { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| } |