Add new methods to ConsistentMap API to improve usability.

Change-Id: I1e82f0ab191edc6b0f52c7d7b0307aa3d2ef9d1f

Change-Id: I4c5982fe6596f716729b7885eb584a60735cd41b
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index f58f5d5..8a88ed6 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -18,8 +18,12 @@
 
 import java.util.Collection;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
 
 /**
  * A distributed, strongly consistent map whose methods are all executed asynchronously.
@@ -84,6 +88,61 @@
     CompletableFuture<Versioned<V>> get(K key);
 
     /**
+     * If the specified key is not already associated with a value (or is mapped to null),
+     * attempts to compute its value using the given mapping function and enters it into
+     * this map unless null.
+     * If a conflicting concurrent modification attempt is detected, the returned future
+     * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
+     * @param key key with which the specified value is to be associated
+     * @param mappingFunction the function to compute a value
+     * @return the current (existing or computed) value associated with the specified key,
+     * or null if the computed value is null
+     */
+    CompletableFuture<Versioned<V>> computeIfAbsent(K key,
+            Function<? super K, ? extends V> mappingFunction);
+
+    /**
+     * If the value for the specified key is present and non-null, attempts to compute a new
+     * mapping given the key and its current mapped value.
+     * If the computed value is null, the current mapping will be removed from the map.
+     * If a conflicting concurrent modification attempt is detected, the returned future
+     * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
+     * @param key key with which the specified value is to be associated
+     * @param remappingFunction the function to compute a value
+     * @return the new value associated with the specified key, or null if computed value is null
+     */
+    CompletableFuture<Versioned<V>> computeIfPresent(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+    /**
+     * Attempts to compute a mapping for the specified key and its current mapped value (or
+     * null if there is no current mapping).
+     * If the computed value is null, the current mapping (if one exists) will be removed from the map.
+     * If a conflicting concurrent modification attempt is detected, the returned future
+     * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
+     * @param key key with which the specified value is to be associated
+     * @param remappingFunction the function to compute a value
+     * @return the new value associated with the specified key, or null if computed value is null
+     */
+    CompletableFuture<Versioned<V>> compute(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+    /**
+     * If the value for the specified key satisfies a condition, attempts to compute a new
+     * mapping given the key and its current mapped value.
+     * If the computed value is null, the current mapping will be removed from the map.
+     * If a conflicting concurrent modification attempt is detected, the returned future
+     * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
+     * @param key key with which the specified value is to be associated
+     * @param condition condition that should evaluate to true for the computation to proceed
+     * @param remappingFunction the function to compute a value
+     * @return the new value associated with the specified key, or the old value if condition evaluates to false
+     */
+    CompletableFuture<Versioned<V>> computeIf(K key,
+            Predicate<? super V> condition,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+    /**
      * Associates the specified value with the specified key in this map (optional operation).
      * If the map previously contained a mapping for the key, the old value is replaced by the
      * specified value.
@@ -96,6 +155,28 @@
     CompletableFuture<Versioned<V>> put(K key, V value);
 
     /**
+     * Associates the specified value with the specified key in this map (optional operation).
+     * If the map previously contained a mapping for the key, the old value is replaced by the
+     * specified value.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param value value to be associated with the specified key
+     * @return new value.
+     */
+    CompletableFuture<Versioned<V>> putAndGet(K key, V value);
+
+    /**
+     * Associates the specified value with the specified key in this map (optional operation).
+     * If the map previously contained a mapping for the key, the old value is replaced by the
+     * specified value.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param value value to be associated with the specified key
+     * @return optional updated value. Will be empty if update did not happen
+     */
+    CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value);
+
+    /**
      * Removes the mapping for a key from this map if it is present (optional operation).
      *
      * @param key key whose value is to be removed from the map
@@ -196,4 +277,15 @@
      * @return true if the value was replaced
      */
     CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue);
+
+    /**
+     * Replaces the entry for the specified key only if it is currently mapped to the
+     * specified version.
+     *
+     * @param key key key with which the specified value is associated
+     * @param oldVersion version expected to be associated with the specified key
+     * @param newValue value to be associated with the specified key
+     * @return optional updated value. Will be empty if update did not happen.
+     */
+    CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 6d19259..2893509 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -18,7 +18,11 @@
 
 import java.util.Collection;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
 
 /**
  * A distributed, strongly consistent map.
@@ -83,6 +87,64 @@
     Versioned<V> get(K key);
 
     /**
+     * If the specified key is not already associated with a value (or is mapped to null),
+     * attempts to compute its value using the given mapping function and enters it into
+     * this map unless null.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param mappingFunction the function to compute a value
+     * @return the current (existing or computed) value associated with the specified key,
+     * or null if the computed value is null. Method throws {@code ConsistentMapException.ConcurrentModification}
+     * if a concurrent modification of map is detected
+     */
+    Versioned<V> computeIfAbsent(K key,
+            Function<? super K, ? extends V> mappingFunction);
+
+    /**
+     * Attempts to compute a mapping for the specified key and its current mapped value (or
+     * null if there is no current mapping).
+     * If the computed value is null, the current mapping will be removed from the map.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param remappingFunction the function to compute a value
+     * @return the new value associated with the specified key, or null if none.
+     * This method throws {@code ConsistentMapException.ConcurrentModification}
+     * if a concurrent modification of map is detected
+     */
+    Versioned<V> compute(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+    /**
+     * If the value for the specified key is present and non-null, attempts to compute a new
+     * mapping given the key and its current mapped value.
+     * If the computed value is null, the current mapping will be removed from the map.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param remappingFunction the function to compute a value
+     * @return the new value associated with the specified key, or null if none.
+     * This method throws {@code ConsistentMapException.ConcurrentModification}
+     * if a concurrent modification of map is detected
+     */
+    Versioned<V> computeIfPresent(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+    /**
+     * If the value for the specified key satisfies a condition, attempts to compute a new
+     * mapping given the key and its current mapped value.
+     * If the computed value is null, the current mapping will be removed from the map.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param condition condition that should evaluate to true for the computation to proceed
+     * @param remappingFunction the function to compute a value
+     * @return the new value associated with the specified key, or the old value if condition evaluates to false.
+     * This method throws {@code ConsistentMapException.ConcurrentModification} if a concurrent
+     * modification of map is detected
+     */
+    Versioned<V> computeIf(K key,
+            Predicate<? super V> condition,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+
+    /**
      * Associates the specified value with the specified key in this map (optional operation).
      * If the map previously contained a mapping for the key, the old value is replaced by the
      * specified value.
@@ -95,6 +157,28 @@
     Versioned<V> put(K key, V value);
 
     /**
+     * Associates the specified value with the specified key in this map (optional operation).
+     * If the map previously contained a mapping for the key, the old value is replaced by the
+     * specified value.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param value value to be associated with the specified key
+     * @return new value.
+     */
+    Versioned<V> putAndGet(K key, V value);
+
+    /**
+     * Associates the specified value with the specified key in this map (optional operation).
+     * If the map previously contained a mapping for the key, the old value is replaced by the
+     * specified value.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param value value to be associated with the specified key
+     * @return optional updated value. Will be empty if update did not happen
+     */
+    Optional<Versioned<V>> putIfAbsentAndGet(K key, V value);
+
+    /**
      * Removes the mapping for a key from this map if it is present (optional operation).
      *
      * @param key key whose value is to be removed from the map
@@ -194,4 +278,15 @@
      * @return true if the value was replaced
      */
     boolean replace(K key, long oldVersion, V newValue);
+
+    /**
+     * Replaces the entry for the specified key only if it is currently mapped to the
+     * specified version.
+     *
+     * @param key key key with which the specified value is associated
+     * @param oldVersion version expected to be associated with the specified key
+     * @param newValue value to be associated with the specified key
+     * @return optional new value. Will be empty if replace did not happen
+     */
+    Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 72c2188..815b142 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -97,6 +97,26 @@
     CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value);
 
     /**
+     * Puts a value in the table.
+     *
+     * @param tableName table name
+     * @param key       The key to set.
+     * @param value     The value to set.
+     * @return A completable future to be completed with the result once complete.
+     */
+    CompletableFuture<Result<UpdateResult<Versioned<V>>>> putAndGet(String tableName, K key, V value);
+
+    /**
+     * Puts a value in the table.
+     *
+     * @param tableName table name
+     * @param key       The key to set.
+     * @param value     The value to set.
+     * @return A completable future to be completed with the result once complete.
+     */
+    CompletableFuture<Result<UpdateResult<Versioned<V>>>> putIfAbsentAndGet(String tableName, K key, V value);
+
+    /**
      * Removes a value from the table.
      *
      * @param tableName table name
@@ -190,6 +210,19 @@
     CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
 
     /**
+     * Replaces the entry for the specified key only if currently mapped to the specified version.
+     *
+     * @param tableName  table name
+     * @param key        The key to update
+     * @param oldVersion existing version in the map for this replace to succeed.
+     * @param newValue   The value with which to replace the given key and version.
+     * @return A completable future to be completed with the result once complete.
+     */
+    CompletableFuture<Result<UpdateResult<Versioned<V>>>> replaceAndGet(String tableName,
+            K key, long oldVersion,
+            V newValue);
+
+    /**
      * Atomically add the given value to current value of the specified counter.
      *
      * @param counterName counter name
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index 42b0313..9b6a322 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -74,6 +74,7 @@
             .register(Pair.class)
             .register(ImmutablePair.class)
             .register(Result.class)
+            .register(UpdateResult.class)
             .register(Result.Status.class)
             .register(DefaultTransaction.class)
             .register(Transaction.State.class)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 83909d8..73eacdd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -68,6 +68,12 @@
   Result<Versioned<V>> put(String tableName, K key, V value);
 
   @Command
+  Result<UpdateResult<Versioned<V>>> putAndGet(String tableName, K key, V value);
+
+  @Command
+  Result<UpdateResult<Versioned<V>>> putIfAbsentAndGet(String tableName, K key, V value);
+
+  @Command
   Result<Versioned<V>> remove(String tableName, K key);
 
   @Command
@@ -98,6 +104,9 @@
   Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
 
   @Command
+  Result<UpdateResult<Versioned<V>>> replaceAndGet(String tableName, K key, long oldVersion, V newValue);
+
+  @Command
   Long counterAddAndGet(String counterName, long delta);
 
   @Command
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index dd80db6..3fb6f4a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -21,12 +21,19 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.HexString;
+import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.Serializer;
@@ -108,14 +115,126 @@
     }
 
     @Override
+    public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
+            Function<? super K, ? extends V> mappingFunction) {
+        return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> computeIfPresent(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        return computeIf(key, Objects::nonNull, remappingFunction);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> compute(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        return computeIf(key, v -> true, remappingFunction);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> computeIf(K key,
+            Predicate<? super V> condition,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(condition, "predicate function cannot be null");
+        checkNotNull(remappingFunction, "Remapping function cannot be null");
+        return get(key).thenCompose(r1 -> {
+            V existingValue = r1 == null ? null : r1.value();
+            // if the condition evaluates to false, return existing value.
+            if (!condition.test(existingValue)) {
+                return CompletableFuture.completedFuture(r1);
+            }
+
+            AtomicReference<V> computedValue = new AtomicReference<>();
+            // if remappingFunction throws an exception, return the exception.
+            try {
+                computedValue.set(remappingFunction.apply(key, existingValue));
+            } catch (Exception e) {
+                return Tools.exceptionalFuture(e);
+            }
+
+            // if the computed value is null, remove current value if one exists.
+            // throw an exception if concurrent modification is detected.
+            if (computedValue.get() == null) {
+                if (r1 != null) {
+                    return remove(key, r1.version()).thenApply(result -> {
+                        if (result) {
+                            return null;
+                        } else {
+                            throw new ConsistentMapException.ConcurrentModification();
+                        }
+                    });
+                } else {
+                    return CompletableFuture.completedFuture(null);
+                }
+            } else {
+                // replace current value; throw an exception if concurrent modification is detected
+                if (r1 != null) {
+                    return replaceAndGet(key, r1.version(), computedValue.get())
+                            .thenApply(v -> {
+                                if (v.isPresent()) {
+                                    return v.get();
+                                } else {
+                                    throw new ConsistentMapException.ConcurrentModification();
+                                }
+                            });
+                } else {
+                    return putIfAbsentAndGet(key, computedValue.get()).thenApply(result -> {
+                        if (!result.isPresent()) {
+                            throw new ConsistentMapException.ConcurrentModification();
+                        } else {
+                            return result.get();
+                        }
+                    });
+                }
+            }
+        });
+    }
+
+    @Override
     public CompletableFuture<Versioned<V>> put(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
         checkIfUnmodifiable();
         return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
+                       .thenApply(this::unwrapResult)
+                       .thenApply(v -> v != null
+                       ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        checkIfUnmodifiable();
+        return database.putAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
                 .thenApply(this::unwrapResult)
-                .thenApply(v -> v != null
-                ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+                .thenApply(v -> {
+                    Versioned<byte[]> rawNewValue = v.newValue();
+                    return new Versioned<>(serializer.decode(rawNewValue.value()),
+                            rawNewValue.version(),
+                            rawNewValue.creationTime());
+                });
+    }
+
+    @Override
+    public CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        checkIfUnmodifiable();
+        return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
+                .thenApply(this::unwrapResult)
+                .thenApply(v -> {
+                    if (v.updated()) {
+                        Versioned<byte[]> rawNewValue = v.newValue();
+                        return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
+                                                           rawNewValue.version(),
+                                                           rawNewValue.creationTime()));
+                    } else {
+                        return Optional.empty();
+                    }
+                });
     }
 
     @Override
@@ -167,9 +286,9 @@
         return database.putIfAbsent(name,
                                     keyCache.getUnchecked(key),
                                     serializer.encode(value))
-               .thenApply(this::unwrapResult)
-               .thenApply(v -> v != null ?
-                       new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+                .thenApply(this::unwrapResult)
+                .thenApply(v -> v != null ?
+                        new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
 
     @Override
@@ -178,7 +297,7 @@
         checkNotNull(value, ERROR_NULL_VALUE);
         checkIfUnmodifiable();
         return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
-                .thenApply(this::unwrapResult);
+                       .thenApply(this::unwrapResult);
     }
 
     @Override
@@ -186,7 +305,7 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkIfUnmodifiable();
         return database.remove(name, keyCache.getUnchecked(key), version)
-                .thenApply(this::unwrapResult);
+                       .thenApply(this::unwrapResult);
 
     }
 
@@ -197,16 +316,34 @@
         checkIfUnmodifiable();
         byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
         return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
-                .thenApply(this::unwrapResult);
+                       .thenApply(this::unwrapResult);
     }
 
     @Override
     public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
+        return replaceAndGet(key, oldVersion, newValue).thenApply(Optional::isPresent);
+    }
+
+    @Override
+    public CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(newValue, ERROR_NULL_VALUE);
         checkIfUnmodifiable();
-        return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue))
-                .thenApply(this::unwrapResult);
+        return database.replaceAndGet(name,
+                                      keyCache.getUnchecked(key),
+                                      oldVersion,
+                                      serializer.encode(newValue))
+                       .thenApply(this::unwrapResult)
+                       .thenApply(v -> {
+                                   if (v.updated()) {
+                                       Versioned<byte[]> rawNewValue = v.newValue();
+                                       return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
+                                                                            rawNewValue.version(),
+                                                                            rawNewValue.creationTime()));
+                                   } else {
+                                       return Optional.empty();
+                                   }
+                       });
     }
 
     private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index 7b0ad10..b85dfa2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -18,10 +18,14 @@
 
 import java.util.Collection;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.Set;
 
 import org.onosproject.store.service.AsyncConsistentMap;
@@ -76,11 +80,46 @@
     }
 
     @Override
+    public Versioned<V> computeIfAbsent(K key,
+            Function<? super K, ? extends V> mappingFunction) {
+        return complete(asyncMap.computeIfAbsent(key, mappingFunction));
+    }
+
+    @Override
+    public Versioned<V> computeIfPresent(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        return complete(asyncMap.computeIfPresent(key, remappingFunction));
+    }
+
+    @Override
+    public Versioned<V> compute(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        return complete(asyncMap.compute(key, remappingFunction));
+    }
+
+    @Override
+    public Versioned<V> computeIf(K key,
+            Predicate<? super V> condition,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        return complete(asyncMap.computeIf(key, condition, remappingFunction));
+    }
+
+    @Override
     public Versioned<V> put(K key, V value) {
         return complete(asyncMap.put(key, value));
     }
 
     @Override
+    public Versioned<V> putAndGet(K key, V value) {
+        return complete(asyncMap.putAndGet(key, value));
+    }
+
+    @Override
+    public Optional<Versioned<V>> putIfAbsentAndGet(K key, V value) {
+        return complete(asyncMap.putIfAbsentAndGet(key, value));
+    }
+
+    @Override
     public Versioned<V> remove(K key) {
         return complete(asyncMap.remove(key));
     }
@@ -130,6 +169,11 @@
         return complete(asyncMap.replace(key, oldVersion, newValue));
     }
 
+    @Override
+    public Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue) {
+        return complete(asyncMap.replaceAndGet(key, oldVersion, newValue));
+    }
+
     private static <T> T complete(CompletableFuture<T> future) {
         try {
             return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -139,7 +183,11 @@
         } catch (TimeoutException e) {
             throw new ConsistentMapException.Timeout();
         } catch (ExecutionException e) {
-            throw new ConsistentMapException(e.getCause());
+            if (e.getCause() instanceof ConsistentMapException) {
+                throw (ConsistentMapException) e.getCause();
+            } else {
+                throw new ConsistentMapException(e.getCause());
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 06c0ef4..52a1b20 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -100,6 +100,20 @@
     }
 
     @Override
+    public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
+            String key,
+            byte[] value) {
+        return checkOpen(() -> proxy.putAndGet(tableName, key, value));
+    }
+
+    @Override
+    public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
+            String key,
+            byte[] value) {
+        return checkOpen(() -> proxy.putIfAbsentAndGet(tableName, key, value));
+    }
+
+    @Override
     public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
         return checkOpen(() -> proxy.remove(tableName, key));
     }
@@ -150,6 +164,14 @@
     }
 
     @Override
+    public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(String tableName,
+            String key,
+            long oldVersion,
+            byte[] newValue) {
+        return checkOpen(() -> proxy.replaceAndGet(tableName, key, oldVersion, newValue));
+    }
+
+    @Override
     public CompletableFuture<Long> counterGet(String counterName) {
         return checkOpen(() -> proxy.counterGet(counterName));
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index f696139..7edeb44 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -30,6 +30,7 @@
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.DatabaseUpdate.Type;
+
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -128,6 +129,36 @@
     }
 
     @Override
+    public Result<UpdateResult<Versioned<byte[]>>> putAndGet(String tableName,
+            String key,
+            byte[] value) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        } else {
+            Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
+            Versioned<byte[]> oldValue = getTableMap(tableName).put(key, newValue);
+            return Result.ok(new UpdateResult<>(true, oldValue, newValue));
+        }
+    }
+
+    @Override
+    public Result<UpdateResult<Versioned<byte[]>>> putIfAbsentAndGet(String tableName,
+            String key,
+            byte[] value) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> currentValue = getTableMap(tableName).get(key);
+        if (currentValue != null) {
+            return Result.ok(new UpdateResult<>(false, currentValue, currentValue));
+        } else {
+            Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
+            getTableMap(tableName).put(key, newValue);
+            return Result.ok(new UpdateResult<>(true, null, newValue));
+        }
+    }
+
+    @Override
     public Result<Versioned<byte[]>> remove(String tableName, String key) {
         return isLockedForUpdates(tableName, key)
                 ? Result.locked()
@@ -225,6 +256,23 @@
     }
 
     @Override
+    public Result<UpdateResult<Versioned<byte[]>>> replaceAndGet(
+            String tableName, String key, long oldVersion, byte[] newValue) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        boolean updated = false;
+        Versioned<byte[]> previous = get(tableName, key);
+        Versioned<byte[]> current = previous;
+        if (previous != null && previous.version() == oldVersion) {
+            current = new Versioned<>(newValue, ++nextVersion);
+            getTableMap(tableName).put(key, current);
+            updated = true;
+        }
+        return Result.ok(new UpdateResult<>(updated, previous, current));
+    }
+
+    @Override
     public Long counterAddAndGet(String counterName, long delta) {
         return getCounter(counterName).addAndGet(delta);
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index fd9561a..2959880 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -216,39 +216,24 @@
 
     private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
         try {
-            Versioned<List<NodeId>> candidates = candidateMap.get(path);
-            if (candidates != null) {
-                List<NodeId> candidateList = Lists.newArrayList(candidates.value());
-                if (!candidateList.contains(localNodeId)) {
-                    candidateList.add(localNodeId);
-                    if (candidateMap.replace(path, candidates.version(), candidateList)) {
-                        Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                        publish(new LeadershipEvent(
-                                LeadershipEvent.Type.CANDIDATES_CHANGED,
-                                new Leadership(path,
-                                    newCandidates.value(),
-                                    newCandidates.version(),
-                                    newCandidates.creationTime())));
-                    } else {
-                        rerunForLeadership(path, future);
-                        return;
-                    }
-                }
-            } else {
-                List<NodeId> candidateList = ImmutableList.of(localNodeId);
-                if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
-                    Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                    publish(new LeadershipEvent(
-                            LeadershipEvent.Type.CANDIDATES_CHANGED,
-                            new Leadership(path,
-                                newCandidates.value(),
-                                newCandidates.version(),
-                                newCandidates.creationTime())));
-                } else {
-                    rerunForLeadership(path, future);
-                    return;
-                }
-            }
+            Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
+                    currentList -> currentList == null || !currentList.contains(localNodeId),
+                    (topic, currentList) -> {
+                        if (currentList == null) {
+                            return ImmutableList.of(localNodeId);
+                        } else {
+                            List<NodeId> newList = Lists.newLinkedList();
+                            newList.addAll(currentList);
+                            newList.add(localNodeId);
+                            return newList;
+                        }
+                    });
+            publish(new LeadershipEvent(
+                    LeadershipEvent.Type.CANDIDATES_CHANGED,
+                    new Leadership(path,
+                            candidates.value(),
+                            candidates.version(),
+                            candidates.creationTime())));
             log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
             activeTopics.add(path);
             tryLeaderLock(path, future);
@@ -352,28 +337,22 @@
 
     @Override
     public boolean makeTopCandidate(String path, NodeId nodeId) {
-        Versioned<List<NodeId>> candidates = candidateMap.get(path);
-        if (candidates == null || !candidates.value().contains(nodeId)) {
-            return false;
-        }
-        List<NodeId> currentRoster = candidates.value();
-        if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) {
-            return true;
-        }
-        List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
-        newRoster.add(nodeId);
-        currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
-        boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
-        if (updated) {
-            Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-            publish(new LeadershipEvent(
+        Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path,
+                candidates -> (candidates != null && candidates.contains(nodeId)) ||
+                              (candidates != null && Objects.equals(nodeId, candidates.get(LEADER_CANDIDATE_POS))),
+                (topic, candidates) -> {
+                    List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
+                    updatedCandidates.add(nodeId);
+                    candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
+                    return updatedCandidates;
+                });
+        publish(new LeadershipEvent(
                     LeadershipEvent.Type.CANDIDATES_CHANGED,
                     new Leadership(path,
                         newCandidates.value(),
                         newCandidates.version(),
                         newCandidates.creationTime())));
-        }
-        return updated;
+        return true;
     }
 
     private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
@@ -403,41 +382,19 @@
 
     private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
         try {
-            Versioned<NodeId> currentLeader = leaderMap.get(path);
-            if (currentLeader != null) {
-                if (localNodeId.equals(currentLeader.value())) {
-                    log.debug("Already has leadership for {}", path);
-                    // FIXME: candidates can get out of sync.
-                    Leadership leadership = new Leadership(path,
-                            localNodeId,
-                            currentLeader.version(),
-                            currentLeader.creationTime());
-                    future.complete(leadership);
-                    publish(new LeadershipEvent(
-                            LeadershipEvent.Type.LEADER_ELECTED,
-                            leadership));
-                } else {
-                    // someone else has leadership. will retry after sometime.
-                    retryLock(path, future);
-                }
+            Versioned<NodeId> leader = leaderMap.computeIfAbsent(path, p -> localNodeId);
+            if (Objects.equals(leader.value(), localNodeId)) {
+                log.debug("Assumed leadership for {}", path);
+                Leadership leadership = new Leadership(path,
+                        leader.value(),
+                        leader.version(),
+                        leader.creationTime());
+                future.complete(leadership);
+                publish(new LeadershipEvent(
+                        LeadershipEvent.Type.LEADER_ELECTED,
+                        leadership));
             } else {
-                if (leaderMap.putIfAbsent(path, localNodeId) == null) {
-                    log.debug("Assumed leadership for {}", path);
-                    // do a get again to get the version (epoch)
-                    Versioned<NodeId> newLeader = leaderMap.get(path);
-                    // FIXME: candidates can get out of sync
-                    Leadership leadership = new Leadership(path,
-                            newLeader.value(),
-                            newLeader.version(),
-                            newLeader.creationTime());
-                    future.complete(leadership);
-                    publish(new LeadershipEvent(
-                            LeadershipEvent.Type.LEADER_ELECTED,
-                            leadership));
-                } else {
-                    // someone beat us to it.
-                    retryLock(path, future);
-                }
+                retryLock(path, future);
             }
         } catch (Exception e) {
             log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 1239475..2903e0a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -152,6 +152,22 @@
     }
 
     @Override
+    public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
+            String key,
+            byte[] value) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(tableName, key).putAndGet(tableName, key, value);
+    }
+
+    @Override
+    public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
+            String key,
+            byte[] value) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(tableName, key).putIfAbsentAndGet(tableName, key, value);
+    }
+
+    @Override
     public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key);
@@ -235,6 +251,13 @@
     }
 
     @Override
+    public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(
+            String tableName, String key, long oldVersion, byte[] newValue) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(tableName, key).replaceAndGet(tableName, key, oldVersion, newValue);
+    }
+
+    @Override
     public CompletableFuture<Long> counterGet(String counterName) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(counterName, counterName).counterGet(counterName);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java
new file mode 100644
index 0000000..4f3c663
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Result of a update operation.
+ * <p>
+ * Both old and new values are accessible along with a flag that indicates if the
+ * the value was updated. If flag is false, oldValue and newValue both
+ * point to the same unmodified value.
+ * @param <V> result type
+ */
+public class UpdateResult<V> {
+
+    private final boolean updated;
+    private final V oldValue;
+    private final V newValue;
+
+    public UpdateResult(boolean updated, V oldValue, V newValue) {
+        this.updated = updated;
+        this.oldValue = oldValue;
+        this.newValue = newValue;
+    }
+
+    public boolean updated() {
+        return updated;
+    }
+
+    public V oldValue() {
+        return oldValue;
+    }
+
+    public V newValue() {
+        return newValue;
+    }
+}
\ No newline at end of file