Added an async version for AtomicValue and misc javadoc improvements

Change-Id: Idc401964a726d221c01ecda0cc42c4a92551113f
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
new file mode 100644
index 0000000..531721a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Distributed version of java.util.concurrent.atomic.AtomicReference.
+ * <p>
+ * All methods of this interface return a {@link CompletableFuture future} immediately
+ * after a successful invocation. The operation itself is executed asynchronous and
+ * the returned future will be {@link CompletableFuture#complete completed} when the
+ * operation finishes.
+ *
+ * @param <V> value type
+ */
+public interface AsyncAtomicValue<V> {
+
+    /**
+     * Atomically sets the value to the given updated value if the current value is equal to the expected value.
+     * <p>
+     * IMPORTANT: Equality is based on the equality of the serialized {code byte[]} representations.
+     * <p>
+     * @param expect  the expected value
+     * @param update  the new value
+     * @return CompletableFuture that will be completed with {@code true} if update was successful. Otherwise future
+     * will be completed with a value of {@code false}
+     */
+    CompletableFuture<Boolean> compareAndSet(V expect, V update);
+
+    /**
+     * Gets the current value.
+     * @return CompletableFuture that will be completed with the value
+     */
+    CompletableFuture<V> get();
+
+    /**
+     * Atomically sets to the given value and returns the old value.
+     * @param value the new value
+     * @return CompletableFuture that will be completed with the previous value
+     */
+    CompletableFuture<V> getAndSet(V value);
+
+    /**
+     * Sets to the given value.
+     * @param value value to set
+     * @return CompletableFuture that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> set(V value);
+
+    /**
+     * Registers the specified listener to be notified whenever the atomic value is updated.
+     * @param listener listener to notify about events
+     * @return CompletableFuture that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener);
+
+    /**
+     * Unregisters the specified listener such that it will no longer
+     * receive atomic value update notifications.
+     * @param listener listener to unregister
+     * @return CompletableFuture that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 5e54eec..c7b6eac 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -17,6 +17,7 @@
 package org.onosproject.store.service;
 
 import java.util.Collection;
+import java.util.Objects;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -42,7 +43,11 @@
  * </p><p>
  * This map does not allow null values. All methods can throw a ConsistentMapException
  * (which extends RuntimeException) to indicate failures.
- *
+ * <p>
+ * All methods of this interface return a {@link CompletableFuture future} immediately
+ * after a successful invocation. The operation itself is executed asynchronous and
+ * the returned future will be {@link CompletableFuture#complete completed} when the
+ * operation finishes.
  */
 public interface AsyncConsistentMap<K, V> {
 
@@ -58,7 +63,9 @@
      *
      * @return a future whose value will be true if map has no entries, false otherwise.
      */
-    CompletableFuture<Boolean> isEmpty();
+    default CompletableFuture<Boolean> isEmpty() {
+        return size().thenApply(s -> s == 0);
+    }
 
     /**
      * Returns true if this map contains a mapping for the specified key.
@@ -97,8 +104,10 @@
      * @return the current (existing or computed) value associated with the specified key,
      * or null if the computed value is null
      */
-    CompletableFuture<Versioned<V>> computeIfAbsent(K key,
-            Function<? super K, ? extends V> mappingFunction);
+    default CompletableFuture<Versioned<V>> computeIfAbsent(K key,
+            Function<? super K, ? extends V> mappingFunction) {
+        return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
+    }
 
     /**
      * If the value for the specified key is present and non-null, attempts to compute a new
@@ -110,8 +119,10 @@
      * @param remappingFunction the function to compute a value
      * @return the new value associated with the specified key, or null if computed value is null
      */
-    CompletableFuture<Versioned<V>> computeIfPresent(K key,
-            BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+    default CompletableFuture<Versioned<V>> computeIfPresent(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        return computeIf(key, Objects::nonNull, remappingFunction);
+    }
 
     /**
      * Attempts to compute a mapping for the specified key and its current mapped value (or
@@ -123,8 +134,10 @@
      * @param remappingFunction the function to compute a value
      * @return the new value associated with the specified key, or null if computed value is null
      */
-    CompletableFuture<Versioned<V>> compute(K key,
-            BiFunction<? super K, ? super V, ? extends V> remappingFunction);
+    default CompletableFuture<Versioned<V>> compute(K key,
+            BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        return computeIf(key, v -> true, remappingFunction);
+    }
 
     /**
      * If the value for the specified key satisfies a condition, attempts to compute a new
@@ -280,14 +293,16 @@
      * Registers the specified listener to be notified whenever the map is updated.
      *
      * @param listener listener to notify about map events
+     * @return future that will be completed when the operation finishes
      */
-    void addListener(MapEventListener<K, V> listener);
+    CompletableFuture<Void> addListener(MapEventListener<K, V> listener);
 
     /**
      * Unregisters the specified listener such that it will no longer
      * receive map change notifications.
      *
      * @param listener listener to unregister
+     * @return future that will be completed when the operation finishes
      */
-    void removeListener(MapEventListener<K, V> listener);
+    CompletableFuture<Void> removeListener(MapEventListener<K, V> listener);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
index 3478ce0..d5f226f 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
@@ -68,6 +68,15 @@
     AtomicValueBuilder<V> withMeteringDisabled();
 
     /**
+     * Builds a AsyncAtomicValue based on the configuration options
+     * supplied to this builder.
+     *
+     * @return new AsyncAtomicValue
+     * @throws java.lang.RuntimeException if a mandatory parameter is missing
+     */
+    AsyncAtomicValue<V> buildAsyncValue();
+
+    /**
      * Builds a AtomicValue based on the configuration options
      * supplied to this builder.
      *
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 9a6d409..93abf78 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -298,4 +298,4 @@
      * @return java.util.Map
      */
     Map<K, V> asJavaMap();
-}
\ No newline at end of file
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
index 847adaf..466c8f1 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
@@ -18,7 +18,7 @@
 import org.onosproject.core.ApplicationId;
 
 /**
- * Builder for consistent maps.
+ * Builder for {@link ConsistentMap} instances.
  *
  * @param <K> type for map key
  * @param <V> type for map value
@@ -28,19 +28,20 @@
     /**
      * Sets the name of the map.
      * <p>
-     * Each consistent map is identified by a unique map name.
+     * Each map is identified by a unique map name. Different instances with the same name are all backed by the
+     * same backend state.
      * </p>
      * <p>
-     * Note: This is a mandatory parameter.
+     * <b>Note:</b> This is a mandatory parameter.
      * </p>
      *
-     * @param name name of the consistent map
+     * @param name name of the map
      * @return this ConsistentMapBuilder
      */
     ConsistentMapBuilder<K, V> withName(String name);
 
     /**
-     * Sets the owner applicationId for the map.
+     * Sets the identifier of the application that owns this map instance.
      * <p>
      * Note: If {@code purgeOnUninstall} option is enabled, applicationId
      * must be specified.
diff --git a/core/api/src/main/java/org/onosproject/store/service/Versioned.java b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
index 89bd302..0eec3ff 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Versioned.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
@@ -111,6 +111,16 @@
         return versioned == null ? defaultValue : versioned.value();
     }
 
+    /**
+     * Returns the value of the specified Versioned object if non-null or else returns null.
+     * @param versioned versioned object
+     * @param <U> type of the versioned value
+     * @return versioned value or null if versioned object is null
+     */
+    public static <U> U valueOrNull(Versioned<U> versioned) {
+        return valueOrElse(versioned, null);
+    }
+
     @Override
     public int hashCode() {
         return Objects.hashCode(value, version, creationTime);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
new file mode 100644
index 0000000..454d46c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AtomicValueEvent;
+import org.onosproject.store.service.AtomicValueEventListener;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * Default implementation of {@link AsyncAtomicValue}.
+ *
+ * @param <V> value type
+ */
+public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
+
+    private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
+    private final AsyncConsistentMap<String, V> valueMap;
+    private final String name;
+    private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener();
+    private final MeteringAgent monitor;
+
+    private static final String COMPONENT_NAME = "atomicValue";
+    private static final String GET = "get";
+    private static final String GET_AND_SET = "getAndSet";
+    private static final String SET = "set";
+    private static final String COMPARE_AND_SET = "compareAndSet";
+
+    public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap,
+                              String name,
+                              boolean meteringEnabled) {
+        this.valueMap = valueMap;
+        this.name = name;
+        this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
+        final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
+        CompletableFuture<Boolean> response;
+        if (expect == null) {
+            if (update == null) {
+                response = CompletableFuture.completedFuture(true);
+            }
+            response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
+        } else {
+             response = update == null
+                         ? valueMap.remove(name, expect)
+                         : valueMap.replace(name, expect, update);
+        }
+        return response.whenComplete((r, e) -> newTimer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<V> get() {
+        final MeteringAgent.Context newTimer = monitor.startTimer(GET);
+        return valueMap.get(name)
+                .thenApply(Versioned::valueOrNull)
+                .whenComplete((r, e) -> newTimer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<V> getAndSet(V value) {
+        final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
+        CompletableFuture<Versioned<V>> previousValue = value == null ?
+                valueMap.remove(name) : valueMap.put(name, value);
+        return previousValue.thenApply(Versioned::valueOrNull)
+                            .whenComplete((r, e) -> newTimer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Void> set(V value) {
+        final MeteringAgent.Context newTimer = monitor.startTimer(SET);
+        CompletableFuture<Void> previousValue = value == null ?
+                valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null);
+        return previousValue.whenComplete((r, e) -> newTimer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
+        synchronized (listeners) {
+            if (listeners.add(listener)) {
+                if (listeners.size() == 1) {
+                    return valueMap.addListener(mapEventListener);
+                }
+            }
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
+        synchronized (listeners) {
+            if (listeners.remove(listener)) {
+                if (listeners.size() == 0) {
+                    return valueMap.removeListener(mapEventListener);
+                }
+            }
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private class InternalMapEventListener implements MapEventListener<String, V> {
+
+        @Override
+        public void event(MapEvent<String, V> mapEvent) {
+            V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value();
+            AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
+            listeners.forEach(l -> l.event(atomicValueEvent));
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index a7823a4..46a097c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -477,13 +477,15 @@
     }
 
     @Override
-    public void addListener(MapEventListener<K, V> listener) {
+    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
         listeners.add(listener);
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
-    public void removeListener(MapEventListener<K, V> listener) {
+    public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
         listeners.remove(listener);
+        return CompletableFuture.completedFuture(null);
     }
 
     protected void notifyListeners(MapEvent<K, V> event) {
@@ -498,5 +500,4 @@
             }
         });
     }
-
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
index e8c93f3..20bfd5f9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
@@ -15,124 +15,60 @@
  */
 package org.onosproject.store.consistent.impl;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AtomicValue;
-import org.onosproject.store.service.AtomicValueEvent;
 import org.onosproject.store.service.AtomicValueEventListener;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.StorageException;
 
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import com.google.common.util.concurrent.Futures;
 
 /**
- * Default implementation of AtomicValue.
+ * Default implementation of {@link AtomicValue}.
  *
  * @param <V> value type
  */
 public class DefaultAtomicValue<V> implements AtomicValue<V> {
 
-    private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
-    private final ConsistentMap<String, byte[]> valueMap;
-    private final String name;
-    private final Serializer serializer;
-    private final MapEventListener<String, byte[]> mapEventListener = new InternalMapEventListener();
-    private final MeteringAgent monitor;
+    private static final int OPERATION_TIMEOUT_MILLIS = 5000;
+    private final AsyncAtomicValue<V> asyncValue;
 
-    private static final String COMPONENT_NAME = "atomicValue";
-    private static final String GET = "get";
-    private static final String GET_AND_SET = "getAndSet";
-    private static final String COMPARE_AND_SET = "compareAndSet";
-
-    public DefaultAtomicValue(ConsistentMap<String, byte[]> valueMap,
-                              String name,
-                              boolean meteringEnabled,
-                              Serializer serializer) {
-        this.valueMap = valueMap;
-        this.name = name;
-        this.serializer = serializer;
-        this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
+    public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue) {
+        this.asyncValue = asyncValue;
     }
 
     @Override
     public boolean compareAndSet(V expect, V update) {
-        final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
-        try {
-            if (expect == null) {
-                if (update == null) {
-                    return true;
-                }
-                return valueMap.putIfAbsent(name, serializer.encode(update)) == null;
-            } else {
-                if (update == null) {
-                    return valueMap.remove(name, serializer.encode(expect));
-                }
-                return valueMap.replace(name, serializer.encode(expect), serializer.encode(update));
-            }
-        } finally {
-            newTimer.stop(null);
-        }
+        return complete(asyncValue.compareAndSet(expect, update));
     }
 
     @Override
     public V get() {
-        final MeteringAgent.Context newTimer = monitor.startTimer(GET);
-        try {
-            Versioned<byte[]> rawValue = valueMap.get(name);
-            return rawValue == null ? null : serializer.decode(rawValue.value());
-        } finally {
-            newTimer.stop(null);
-        }
+        return complete(asyncValue.get());
     }
 
     @Override
     public V getAndSet(V value) {
-        final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
-        try {
-            Versioned<byte[]> previousValue = value == null ?
-                    valueMap.remove(name) : valueMap.put(name, serializer.encode(value));
-            return previousValue == null ? null : serializer.decode(previousValue.value());
-        } finally {
-            newTimer.stop(null);
-        }
+        return complete(asyncValue.getAndSet(value));
     }
 
     @Override
     public void set(V value) {
-        getAndSet(value);
+        complete(asyncValue.set(value));
     }
 
     @Override
     public void addListener(AtomicValueEventListener<V> listener) {
-        synchronized (listeners) {
-            if (listeners.add(listener)) {
-                if (listeners.size() == 1) {
-                    valueMap.addListener(mapEventListener);
-                }
-            }
-        }
+        complete(asyncValue.addListener(listener));
     }
 
     @Override
     public void removeListener(AtomicValueEventListener<V> listener) {
-        synchronized (listeners) {
-            if (listeners.remove(listener)) {
-                if (listeners.size() == 0) {
-                    valueMap.removeListener(mapEventListener);
-                }
-            }
-        }
+        complete(asyncValue.removeListener(listener));
     }
 
-    private class InternalMapEventListener implements MapEventListener<String, byte[]> {
-
-        @Override
-        public void event(MapEvent<String, byte[]> mapEvent) {
-            V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : serializer.decode(mapEvent.value().value());
-            AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
-            listeners.forEach(l -> l.event(atomicValueEvent));
-        }
+    private static <V> V complete(CompletableFuture<V> future) {
+        return Futures.getChecked(future, StorageException.class, OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
     }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java
index b39004b..7fc3e8d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.consistent.impl;
 
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AtomicValue;
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
@@ -28,13 +29,12 @@
  */
 public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
 
-    private Serializer serializer;
     private String name;
-    private ConsistentMapBuilder<String, byte[]> mapBuilder;
+    private ConsistentMapBuilder<String, V> mapBuilder;
     private boolean metering = true;
 
     public DefaultAtomicValueBuilder(DatabaseManager manager) {
-        mapBuilder = manager.<String, byte[]>consistentMapBuilder()
+        mapBuilder = manager.<String, V>consistentMapBuilder()
                             .withName("onos-atomic-values")
                             .withMeteringDisabled()
                             .withSerializer(Serializer.using(KryoNamespaces.BASIC));
@@ -48,7 +48,7 @@
 
     @Override
     public AtomicValueBuilder<V> withSerializer(Serializer serializer) {
-        this.serializer = serializer;
+        mapBuilder.withSerializer(serializer);
         return this;
     }
 
@@ -65,7 +65,12 @@
     }
 
     @Override
+    public AsyncAtomicValue<V> buildAsyncValue() {
+        return new DefaultAsyncAtomicValue<>(mapBuilder.buildAsyncMap(), name, metering);
+    }
+
+    @Override
     public AtomicValue<V> build() {
-        return new DefaultAtomicValue<>(mapBuilder.build(), name, metering, serializer);
+        return new DefaultAtomicValue<>(buildAsyncValue());
     }
 }
\ No newline at end of file