Add new methods to ConsistentMap API to improve usability.
Change-Id: I1e82f0ab191edc6b0f52c7d7b0307aa3d2ef9d1f
Change-Id: I4c5982fe6596f716729b7885eb584a60735cd41b
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index f58f5d5..8a88ed6 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -18,8 +18,12 @@
import java.util.Collection;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
/**
* A distributed, strongly consistent map whose methods are all executed asynchronously.
@@ -84,6 +88,61 @@
CompletableFuture<Versioned<V>> get(K key);
/**
+ * If the specified key is not already associated with a value (or is mapped to null),
+ * attempts to compute its value using the given mapping function and enters it into
+ * this map unless null.
+ * If a conflicting concurrent modification attempt is detected, the returned future
+ * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
+ * @param key key with which the specified value is to be associated
+ * @param mappingFunction the function to compute a value
+ * @return the current (existing or computed) value associated with the specified key,
+ * or null if the computed value is null
+ */
+ CompletableFuture<Versioned<V>> computeIfAbsent(K key,
+ Function<? super K, ? extends V> mappingFunction);
+
+ /**
+ * If the value for the specified key is present and non-null, attempts to compute a new
+ * mapping given the key and its current mapped value.
+ * If the computed value is null, the current mapping will be removed from the map.
+ * If a conflicting concurrent modification attempt is detected, the returned future
+ * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
+ * @param key key with which the specified value is to be associated
+ * @param remappingFunction the function to compute a value
+ * @return the new value associated with the specified key, or null if computed value is null
+ */
+ CompletableFuture<Versioned<V>> computeIfPresent(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+ /**
+ * Attempts to compute a mapping for the specified key and its current mapped value (or
+ * null if there is no current mapping).
+ * If the computed value is null, the current mapping (if one exists) will be removed from the map.
+ * If a conflicting concurrent modification attempt is detected, the returned future
+ * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
+ * @param key key with which the specified value is to be associated
+ * @param remappingFunction the function to compute a value
+ * @return the new value associated with the specified key, or null if computed value is null
+ */
+ CompletableFuture<Versioned<V>> compute(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+ /**
+ * If the value for the specified key satisfies a condition, attempts to compute a new
+ * mapping given the key and its current mapped value.
+ * If the computed value is null, the current mapping will be removed from the map.
+ * If a conflicting concurrent modification attempt is detected, the returned future
+ * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
+ * @param key key with which the specified value is to be associated
+ * @param condition condition that should evaluate to true for the computation to proceed
+ * @param remappingFunction the function to compute a value
+ * @return the new value associated with the specified key, or the old value if condition evaluates to false
+ */
+ CompletableFuture<Versioned<V>> computeIf(K key,
+ Predicate<? super V> condition,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+ /**
* Associates the specified value with the specified key in this map (optional operation).
* If the map previously contained a mapping for the key, the old value is replaced by the
* specified value.
@@ -96,6 +155,28 @@
CompletableFuture<Versioned<V>> put(K key, V value);
/**
+ * Associates the specified value with the specified key in this map (optional operation).
+ * If the map previously contained a mapping for the key, the old value is replaced by the
+ * specified value.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param value value to be associated with the specified key
+ * @return new value.
+ */
+ CompletableFuture<Versioned<V>> putAndGet(K key, V value);
+
+ /**
+ * Associates the specified value with the specified key in this map (optional operation).
+ * If the map previously contained a mapping for the key, the old value is replaced by the
+ * specified value.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param value value to be associated with the specified key
+ * @return optional updated value. Will be empty if update did not happen
+ */
+ CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value);
+
+ /**
* Removes the mapping for a key from this map if it is present (optional operation).
*
* @param key key whose value is to be removed from the map
@@ -196,4 +277,15 @@
* @return true if the value was replaced
*/
CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue);
+
+ /**
+ * Replaces the entry for the specified key only if it is currently mapped to the
+ * specified version.
+ *
+ * @param key key key with which the specified value is associated
+ * @param oldVersion version expected to be associated with the specified key
+ * @param newValue value to be associated with the specified key
+ * @return optional updated value. Will be empty if update did not happen.
+ */
+ CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 6d19259..2893509 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -18,7 +18,11 @@
import java.util.Collection;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
/**
* A distributed, strongly consistent map.
@@ -83,6 +87,64 @@
Versioned<V> get(K key);
/**
+ * If the specified key is not already associated with a value (or is mapped to null),
+ * attempts to compute its value using the given mapping function and enters it into
+ * this map unless null.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param mappingFunction the function to compute a value
+ * @return the current (existing or computed) value associated with the specified key,
+ * or null if the computed value is null. Method throws {@code ConsistentMapException.ConcurrentModification}
+ * if a concurrent modification of map is detected
+ */
+ Versioned<V> computeIfAbsent(K key,
+ Function<? super K, ? extends V> mappingFunction);
+
+ /**
+ * Attempts to compute a mapping for the specified key and its current mapped value (or
+ * null if there is no current mapping).
+ * If the computed value is null, the current mapping will be removed from the map.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param remappingFunction the function to compute a value
+ * @return the new value associated with the specified key, or null if none.
+ * This method throws {@code ConsistentMapException.ConcurrentModification}
+ * if a concurrent modification of map is detected
+ */
+ Versioned<V> compute(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+ /**
+ * If the value for the specified key is present and non-null, attempts to compute a new
+ * mapping given the key and its current mapped value.
+ * If the computed value is null, the current mapping will be removed from the map.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param remappingFunction the function to compute a value
+ * @return the new value associated with the specified key, or null if none.
+ * This method throws {@code ConsistentMapException.ConcurrentModification}
+ * if a concurrent modification of map is detected
+ */
+ Versioned<V> computeIfPresent(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+ /**
+ * If the value for the specified key satisfies a condition, attempts to compute a new
+ * mapping given the key and its current mapped value.
+ * If the computed value is null, the current mapping will be removed from the map.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param condition condition that should evaluate to true for the computation to proceed
+ * @param remappingFunction the function to compute a value
+ * @return the new value associated with the specified key, or the old value if condition evaluates to false.
+ * This method throws {@code ConsistentMapException.ConcurrentModification} if a concurrent
+ * modification of map is detected
+ */
+ Versioned<V> computeIf(K key,
+ Predicate<? super V> condition,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+ /**
* Associates the specified value with the specified key in this map (optional operation).
* If the map previously contained a mapping for the key, the old value is replaced by the
* specified value.
@@ -95,6 +157,28 @@
Versioned<V> put(K key, V value);
/**
+ * Associates the specified value with the specified key in this map (optional operation).
+ * If the map previously contained a mapping for the key, the old value is replaced by the
+ * specified value.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param value value to be associated with the specified key
+ * @return new value.
+ */
+ Versioned<V> putAndGet(K key, V value);
+
+ /**
+ * Associates the specified value with the specified key in this map (optional operation).
+ * If the map previously contained a mapping for the key, the old value is replaced by the
+ * specified value.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param value value to be associated with the specified key
+ * @return optional updated value. Will be empty if update did not happen
+ */
+ Optional<Versioned<V>> putIfAbsentAndGet(K key, V value);
+
+ /**
* Removes the mapping for a key from this map if it is present (optional operation).
*
* @param key key whose value is to be removed from the map
@@ -194,4 +278,15 @@
* @return true if the value was replaced
*/
boolean replace(K key, long oldVersion, V newValue);
+
+ /**
+ * Replaces the entry for the specified key only if it is currently mapped to the
+ * specified version.
+ *
+ * @param key key key with which the specified value is associated
+ * @param oldVersion version expected to be associated with the specified key
+ * @param newValue value to be associated with the specified key
+ * @return optional new value. Will be empty if replace did not happen
+ */
+ Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 72c2188..815b142 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -97,6 +97,26 @@
CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value);
/**
+ * Puts a value in the table.
+ *
+ * @param tableName table name
+ * @param key The key to set.
+ * @param value The value to set.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Result<UpdateResult<Versioned<V>>>> putAndGet(String tableName, K key, V value);
+
+ /**
+ * Puts a value in the table.
+ *
+ * @param tableName table name
+ * @param key The key to set.
+ * @param value The value to set.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Result<UpdateResult<Versioned<V>>>> putIfAbsentAndGet(String tableName, K key, V value);
+
+ /**
* Removes a value from the table.
*
* @param tableName table name
@@ -190,6 +210,19 @@
CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
/**
+ * Replaces the entry for the specified key only if currently mapped to the specified version.
+ *
+ * @param tableName table name
+ * @param key The key to update
+ * @param oldVersion existing version in the map for this replace to succeed.
+ * @param newValue The value with which to replace the given key and version.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Result<UpdateResult<Versioned<V>>>> replaceAndGet(String tableName,
+ K key, long oldVersion,
+ V newValue);
+
+ /**
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index 42b0313..9b6a322 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -74,6 +74,7 @@
.register(Pair.class)
.register(ImmutablePair.class)
.register(Result.class)
+ .register(UpdateResult.class)
.register(Result.Status.class)
.register(DefaultTransaction.class)
.register(Transaction.State.class)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 83909d8..73eacdd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -68,6 +68,12 @@
Result<Versioned<V>> put(String tableName, K key, V value);
@Command
+ Result<UpdateResult<Versioned<V>>> putAndGet(String tableName, K key, V value);
+
+ @Command
+ Result<UpdateResult<Versioned<V>>> putIfAbsentAndGet(String tableName, K key, V value);
+
+ @Command
Result<Versioned<V>> remove(String tableName, K key);
@Command
@@ -98,6 +104,9 @@
Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
@Command
+ Result<UpdateResult<Versioned<V>>> replaceAndGet(String tableName, K key, long oldVersion, V newValue);
+
+ @Command
Long counterAddAndGet(String counterName, long delta);
@Command
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index dd80db6..3fb6f4a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -21,12 +21,19 @@
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
+import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
@@ -108,14 +115,126 @@
}
@Override
+ public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
+ Function<? super K, ? extends V> mappingFunction) {
+ return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> computeIfPresent(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ return computeIf(key, Objects::nonNull, remappingFunction);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> compute(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ return computeIf(key, v -> true, remappingFunction);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> computeIf(K key,
+ Predicate<? super V> condition,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(condition, "predicate function cannot be null");
+ checkNotNull(remappingFunction, "Remapping function cannot be null");
+ return get(key).thenCompose(r1 -> {
+ V existingValue = r1 == null ? null : r1.value();
+ // if the condition evaluates to false, return existing value.
+ if (!condition.test(existingValue)) {
+ return CompletableFuture.completedFuture(r1);
+ }
+
+ AtomicReference<V> computedValue = new AtomicReference<>();
+ // if remappingFunction throws an exception, return the exception.
+ try {
+ computedValue.set(remappingFunction.apply(key, existingValue));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+
+ // if the computed value is null, remove current value if one exists.
+ // throw an exception if concurrent modification is detected.
+ if (computedValue.get() == null) {
+ if (r1 != null) {
+ return remove(key, r1.version()).thenApply(result -> {
+ if (result) {
+ return null;
+ } else {
+ throw new ConsistentMapException.ConcurrentModification();
+ }
+ });
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ } else {
+ // replace current value; throw an exception if concurrent modification is detected
+ if (r1 != null) {
+ return replaceAndGet(key, r1.version(), computedValue.get())
+ .thenApply(v -> {
+ if (v.isPresent()) {
+ return v.get();
+ } else {
+ throw new ConsistentMapException.ConcurrentModification();
+ }
+ });
+ } else {
+ return putIfAbsentAndGet(key, computedValue.get()).thenApply(result -> {
+ if (!result.isPresent()) {
+ throw new ConsistentMapException.ConcurrentModification();
+ } else {
+ return result.get();
+ }
+ });
+ }
+ }
+ });
+ }
+
+ @Override
public CompletableFuture<Versioned<V>> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
checkIfUnmodifiable();
return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
+ .thenApply(this::unwrapResult)
+ .thenApply(v -> v != null
+ ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ checkIfUnmodifiable();
+ return database.putAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
.thenApply(this::unwrapResult)
- .thenApply(v -> v != null
- ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+ .thenApply(v -> {
+ Versioned<byte[]> rawNewValue = v.newValue();
+ return new Versioned<>(serializer.decode(rawNewValue.value()),
+ rawNewValue.version(),
+ rawNewValue.creationTime());
+ });
+ }
+
+ @Override
+ public CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ checkIfUnmodifiable();
+ return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
+ .thenApply(this::unwrapResult)
+ .thenApply(v -> {
+ if (v.updated()) {
+ Versioned<byte[]> rawNewValue = v.newValue();
+ return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
+ rawNewValue.version(),
+ rawNewValue.creationTime()));
+ } else {
+ return Optional.empty();
+ }
+ });
}
@Override
@@ -167,9 +286,9 @@
return database.putIfAbsent(name,
keyCache.getUnchecked(key),
serializer.encode(value))
- .thenApply(this::unwrapResult)
- .thenApply(v -> v != null ?
- new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+ .thenApply(this::unwrapResult)
+ .thenApply(v -> v != null ?
+ new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
@@ -178,7 +297,7 @@
checkNotNull(value, ERROR_NULL_VALUE);
checkIfUnmodifiable();
return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
- .thenApply(this::unwrapResult);
+ .thenApply(this::unwrapResult);
}
@Override
@@ -186,7 +305,7 @@
checkNotNull(key, ERROR_NULL_KEY);
checkIfUnmodifiable();
return database.remove(name, keyCache.getUnchecked(key), version)
- .thenApply(this::unwrapResult);
+ .thenApply(this::unwrapResult);
}
@@ -197,16 +316,34 @@
checkIfUnmodifiable();
byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
- .thenApply(this::unwrapResult);
+ .thenApply(this::unwrapResult);
}
@Override
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
+ return replaceAndGet(key, oldVersion, newValue).thenApply(Optional::isPresent);
+ }
+
+ @Override
+ public CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
checkIfUnmodifiable();
- return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue))
- .thenApply(this::unwrapResult);
+ return database.replaceAndGet(name,
+ keyCache.getUnchecked(key),
+ oldVersion,
+ serializer.encode(newValue))
+ .thenApply(this::unwrapResult)
+ .thenApply(v -> {
+ if (v.updated()) {
+ Versioned<byte[]> rawNewValue = v.newValue();
+ return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
+ rawNewValue.version(),
+ rawNewValue.creationTime()));
+ } else {
+ return Optional.empty();
+ }
+ });
}
private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index 7b0ad10..b85dfa2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -18,10 +18,14 @@
import java.util.Collection;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.Set;
import org.onosproject.store.service.AsyncConsistentMap;
@@ -76,11 +80,46 @@
}
@Override
+ public Versioned<V> computeIfAbsent(K key,
+ Function<? super K, ? extends V> mappingFunction) {
+ return complete(asyncMap.computeIfAbsent(key, mappingFunction));
+ }
+
+ @Override
+ public Versioned<V> computeIfPresent(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ return complete(asyncMap.computeIfPresent(key, remappingFunction));
+ }
+
+ @Override
+ public Versioned<V> compute(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ return complete(asyncMap.compute(key, remappingFunction));
+ }
+
+ @Override
+ public Versioned<V> computeIf(K key,
+ Predicate<? super V> condition,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ return complete(asyncMap.computeIf(key, condition, remappingFunction));
+ }
+
+ @Override
public Versioned<V> put(K key, V value) {
return complete(asyncMap.put(key, value));
}
@Override
+ public Versioned<V> putAndGet(K key, V value) {
+ return complete(asyncMap.putAndGet(key, value));
+ }
+
+ @Override
+ public Optional<Versioned<V>> putIfAbsentAndGet(K key, V value) {
+ return complete(asyncMap.putIfAbsentAndGet(key, value));
+ }
+
+ @Override
public Versioned<V> remove(K key) {
return complete(asyncMap.remove(key));
}
@@ -130,6 +169,11 @@
return complete(asyncMap.replace(key, oldVersion, newValue));
}
+ @Override
+ public Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue) {
+ return complete(asyncMap.replaceAndGet(key, oldVersion, newValue));
+ }
+
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -139,7 +183,11 @@
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
} catch (ExecutionException e) {
- throw new ConsistentMapException(e.getCause());
+ if (e.getCause() instanceof ConsistentMapException) {
+ throw (ConsistentMapException) e.getCause();
+ } else {
+ throw new ConsistentMapException(e.getCause());
+ }
}
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 06c0ef4..52a1b20 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -100,6 +100,20 @@
}
@Override
+ public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
+ String key,
+ byte[] value) {
+ return checkOpen(() -> proxy.putAndGet(tableName, key, value));
+ }
+
+ @Override
+ public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
+ String key,
+ byte[] value) {
+ return checkOpen(() -> proxy.putIfAbsentAndGet(tableName, key, value));
+ }
+
+ @Override
public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
return checkOpen(() -> proxy.remove(tableName, key));
}
@@ -150,6 +164,14 @@
}
@Override
+ public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(String tableName,
+ String key,
+ long oldVersion,
+ byte[] newValue) {
+ return checkOpen(() -> proxy.replaceAndGet(tableName, key, oldVersion, newValue));
+ }
+
+ @Override
public CompletableFuture<Long> counterGet(String counterName) {
return checkOpen(() -> proxy.counterGet(counterName));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index f696139..7edeb44 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -30,6 +30,7 @@
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.DatabaseUpdate.Type;
+
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -128,6 +129,36 @@
}
@Override
+ public Result<UpdateResult<Versioned<byte[]>>> putAndGet(String tableName,
+ String key,
+ byte[] value) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ } else {
+ Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
+ Versioned<byte[]> oldValue = getTableMap(tableName).put(key, newValue);
+ return Result.ok(new UpdateResult<>(true, oldValue, newValue));
+ }
+ }
+
+ @Override
+ public Result<UpdateResult<Versioned<byte[]>>> putIfAbsentAndGet(String tableName,
+ String key,
+ byte[] value) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> currentValue = getTableMap(tableName).get(key);
+ if (currentValue != null) {
+ return Result.ok(new UpdateResult<>(false, currentValue, currentValue));
+ } else {
+ Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
+ getTableMap(tableName).put(key, newValue);
+ return Result.ok(new UpdateResult<>(true, null, newValue));
+ }
+ }
+
+ @Override
public Result<Versioned<byte[]>> remove(String tableName, String key) {
return isLockedForUpdates(tableName, key)
? Result.locked()
@@ -225,6 +256,23 @@
}
@Override
+ public Result<UpdateResult<Versioned<byte[]>>> replaceAndGet(
+ String tableName, String key, long oldVersion, byte[] newValue) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ boolean updated = false;
+ Versioned<byte[]> previous = get(tableName, key);
+ Versioned<byte[]> current = previous;
+ if (previous != null && previous.version() == oldVersion) {
+ current = new Versioned<>(newValue, ++nextVersion);
+ getTableMap(tableName).put(key, current);
+ updated = true;
+ }
+ return Result.ok(new UpdateResult<>(updated, previous, current));
+ }
+
+ @Override
public Long counterAddAndGet(String counterName, long delta) {
return getCounter(counterName).addAndGet(delta);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index fd9561a..2959880 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -216,39 +216,24 @@
private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
try {
- Versioned<List<NodeId>> candidates = candidateMap.get(path);
- if (candidates != null) {
- List<NodeId> candidateList = Lists.newArrayList(candidates.value());
- if (!candidateList.contains(localNodeId)) {
- candidateList.add(localNodeId);
- if (candidateMap.replace(path, candidates.version(), candidateList)) {
- Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- newCandidates.value(),
- newCandidates.version(),
- newCandidates.creationTime())));
- } else {
- rerunForLeadership(path, future);
- return;
- }
- }
- } else {
- List<NodeId> candidateList = ImmutableList.of(localNodeId);
- if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
- Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- newCandidates.value(),
- newCandidates.version(),
- newCandidates.creationTime())));
- } else {
- rerunForLeadership(path, future);
- return;
- }
- }
+ Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
+ currentList -> currentList == null || !currentList.contains(localNodeId),
+ (topic, currentList) -> {
+ if (currentList == null) {
+ return ImmutableList.of(localNodeId);
+ } else {
+ List<NodeId> newList = Lists.newLinkedList();
+ newList.addAll(currentList);
+ newList.add(localNodeId);
+ return newList;
+ }
+ });
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.CANDIDATES_CHANGED,
+ new Leadership(path,
+ candidates.value(),
+ candidates.version(),
+ candidates.creationTime())));
log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
activeTopics.add(path);
tryLeaderLock(path, future);
@@ -352,28 +337,22 @@
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
- Versioned<List<NodeId>> candidates = candidateMap.get(path);
- if (candidates == null || !candidates.value().contains(nodeId)) {
- return false;
- }
- List<NodeId> currentRoster = candidates.value();
- if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) {
- return true;
- }
- List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
- newRoster.add(nodeId);
- currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
- boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
- if (updated) {
- Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
- publish(new LeadershipEvent(
+ Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path,
+ candidates -> (candidates != null && candidates.contains(nodeId)) ||
+ (candidates != null && Objects.equals(nodeId, candidates.get(LEADER_CANDIDATE_POS))),
+ (topic, candidates) -> {
+ List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
+ updatedCandidates.add(nodeId);
+ candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
+ return updatedCandidates;
+ });
+ publish(new LeadershipEvent(
LeadershipEvent.Type.CANDIDATES_CHANGED,
new Leadership(path,
newCandidates.value(),
newCandidates.version(),
newCandidates.creationTime())));
- }
- return updated;
+ return true;
}
private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
@@ -403,41 +382,19 @@
private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
try {
- Versioned<NodeId> currentLeader = leaderMap.get(path);
- if (currentLeader != null) {
- if (localNodeId.equals(currentLeader.value())) {
- log.debug("Already has leadership for {}", path);
- // FIXME: candidates can get out of sync.
- Leadership leadership = new Leadership(path,
- localNodeId,
- currentLeader.version(),
- currentLeader.creationTime());
- future.complete(leadership);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.LEADER_ELECTED,
- leadership));
- } else {
- // someone else has leadership. will retry after sometime.
- retryLock(path, future);
- }
+ Versioned<NodeId> leader = leaderMap.computeIfAbsent(path, p -> localNodeId);
+ if (Objects.equals(leader.value(), localNodeId)) {
+ log.debug("Assumed leadership for {}", path);
+ Leadership leadership = new Leadership(path,
+ leader.value(),
+ leader.version(),
+ leader.creationTime());
+ future.complete(leadership);
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_ELECTED,
+ leadership));
} else {
- if (leaderMap.putIfAbsent(path, localNodeId) == null) {
- log.debug("Assumed leadership for {}", path);
- // do a get again to get the version (epoch)
- Versioned<NodeId> newLeader = leaderMap.get(path);
- // FIXME: candidates can get out of sync
- Leadership leadership = new Leadership(path,
- newLeader.value(),
- newLeader.version(),
- newLeader.creationTime());
- future.complete(leadership);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.LEADER_ELECTED,
- leadership));
- } else {
- // someone beat us to it.
- retryLock(path, future);
- }
+ retryLock(path, future);
}
} catch (Exception e) {
log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 1239475..2903e0a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -152,6 +152,22 @@
}
@Override
+ public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
+ String key,
+ byte[] value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(tableName, key).putAndGet(tableName, key, value);
+ }
+
+ @Override
+ public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
+ String key,
+ byte[] value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(tableName, key).putIfAbsentAndGet(tableName, key, value);
+ }
+
+ @Override
public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key);
@@ -235,6 +251,13 @@
}
@Override
+ public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(
+ String tableName, String key, long oldVersion, byte[] newValue) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(tableName, key).replaceAndGet(tableName, key, oldVersion, newValue);
+ }
+
+ @Override
public CompletableFuture<Long> counterGet(String counterName) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(counterName, counterName).counterGet(counterName);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java
new file mode 100644
index 0000000..4f3c663
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Result of a update operation.
+ * <p>
+ * Both old and new values are accessible along with a flag that indicates if the
+ * the value was updated. If flag is false, oldValue and newValue both
+ * point to the same unmodified value.
+ * @param <V> result type
+ */
+public class UpdateResult<V> {
+
+ private final boolean updated;
+ private final V oldValue;
+ private final V newValue;
+
+ public UpdateResult(boolean updated, V oldValue, V newValue) {
+ this.updated = updated;
+ this.oldValue = oldValue;
+ this.newValue = newValue;
+ }
+
+ public boolean updated() {
+ return updated;
+ }
+
+ public V oldValue() {
+ return oldValue;
+ }
+
+ public V newValue() {
+ return newValue;
+ }
+}
\ No newline at end of file