AtomicValue updates
    - events now have old and new values
    - simplify implementation by using map.replace for compareAndSet
    - Removed option for disabling it from builder. Will make it a system wide option in a subsequent update

Change-Id: I7c1424f8671c0e54688172c273b9262f305b0920
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
index d5f226f..d0ba25e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
@@ -60,14 +60,6 @@
     AtomicValueBuilder<V> withPartitionsDisabled();
 
     /**
-     * Instantiates Metering service to gather usage and performance metrics.
-     * By default, usage data will be stored.
-     *
-     * @return this AtomicValueBuilder for method chaining
-     */
-    AtomicValueBuilder<V> withMeteringDisabled();
-
-    /**
      * Builds a AsyncAtomicValue based on the configuration options
      * supplied to this builder.
      *
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java
index 1bce136..e1916bc 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java
@@ -38,20 +38,20 @@
     }
 
     private final String name;
-    private final Type type;
-    private final V value;
+    private final V newValue;
+    private final V oldValue;
 
     /**
      * Creates a new event object.
      *
      * @param name AtomicValue name
-     * @param type the type of the event
-     * @param value the new value
+     * @param newValue the new value
+     * @param oldValue the old value
      */
-    public AtomicValueEvent(String name, Type type, V value) {
+    public AtomicValueEvent(String name, V newValue, V oldValue) {
         this.name = name;
-        this.type = type;
-        this.value = value;
+        this.newValue = newValue;
+        this.oldValue = oldValue;
     }
 
     /**
@@ -69,16 +69,25 @@
      * @return the type of the event
      */
     public Type type() {
-        return type;
+        return AtomicValueEvent.Type.UPDATE;
     }
 
     /**
-     * Returns the new updated value.
+     * Returns the newly set value.
      *
-     * @return the value
+     * @return the new value
      */
-    public V value() {
-        return value;
+    public V newValue() {
+        return newValue;
+    }
+
+    /**
+     * Returns the old replaced value.
+     *
+     * @return the old value
+     */
+    public V oldValue() {
+        return oldValue;
     }
 
     @Override
@@ -89,21 +98,21 @@
 
         AtomicValueEvent that = (AtomicValueEvent) o;
         return Objects.equals(this.name, that.name) &&
-                Objects.equals(this.type, that.type) &&
-                Objects.equals(this.value, that.value);
+                Objects.equals(this.newValue, that.newValue) &&
+                Objects.equals(this.oldValue, that.oldValue);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, type, value);
+        return Objects.hash(name, newValue, oldValue);
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                 .add("name", name)
-                .add("type", type)
-                .add("value", value)
+                .add("newValue", newValue)
+                .add("oldValue", oldValue)
                 .toString();
     }
 }
diff --git a/core/api/src/test/java/org/onosproject/store/service/AtomicValueEventTest.java b/core/api/src/test/java/org/onosproject/store/service/AtomicValueEventTest.java
index 39481ca..4e9ca1c 100644
--- a/core/api/src/test/java/org/onosproject/store/service/AtomicValueEventTest.java
+++ b/core/api/src/test/java/org/onosproject/store/service/AtomicValueEventTest.java
@@ -30,13 +30,13 @@
 public class AtomicValueEventTest {
 
     AtomicValueEvent<String> event1 =
-            new AtomicValueEvent<>("map1", UPDATE, "e1");
+            new AtomicValueEvent<>("map1", "e1", "e0");
     AtomicValueEvent<String> event2 =
-            new AtomicValueEvent<>("map1", UPDATE, "e2");
+            new AtomicValueEvent<>("map1", "e2", "e1");
     AtomicValueEvent<String> sameAsEvent2 =
-            new AtomicValueEvent<>("map1", UPDATE, "e2");
+            new AtomicValueEvent<>("map1", "e2", "e1");
     AtomicValueEvent<String> event3 =
-            new AtomicValueEvent<>("map2", UPDATE, "e2");
+            new AtomicValueEvent<>("map2", "e2", "e1");
 
     /**
      * Checks that the SetEvent class is immutable.
@@ -64,7 +64,8 @@
     @Test
     public void testConstruction() {
         assertThat(event1.type(), is(UPDATE));
-        assertThat(event1.value(), is("e1"));
+        assertThat(event1.newValue(), is("e1"));
+        assertThat(event1.oldValue(), is("e0"));
         assertThat(event1.name(), is("map1"));
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
index 912d6d0..7bbd9d3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
@@ -13,31 +13,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.onosproject.store.primitives.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AtomicValueEvent;
 import org.onosproject.store.service.AtomicValueEventListener;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
 
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArraySet;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 
-/**
- * Default implementation of {@link AsyncAtomicValue}.
- *
- * @param <V> value type
- */
+
 public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
 
-    private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
-    private final AsyncConsistentMap<String, V> valueMap;
     private final String name;
-    private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener();
+    private final Serializer serializer;
+    private final AsyncConsistentMap<String, byte[]> backingMap;
+    private final Map<AtomicValueEventListener<V>, MapEventListener<String, byte[]>> listeners =
+            Maps.newIdentityHashMap();
     private final MeteringAgent monitor;
 
     private static final String COMPONENT_NAME = "atomicValue";
@@ -45,13 +48,15 @@
     private static final String GET_AND_SET = "getAndSet";
     private static final String SET = "set";
     private static final String COMPARE_AND_SET = "compareAndSet";
+    private static final String ADD_LISTENER = "addListener";
+    private static final String REMOVE_LISTENER = "removeListener";
+    private static final String NOTIFY_LISTENER = "notifyListener";
 
-    public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap,
-                              String name,
-                              boolean meteringEnabled) {
-        this.valueMap = valueMap;
-        this.name = name;
-        this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
+    public DefaultAsyncAtomicValue(String name, Serializer serializer, AsyncConsistentMap<String, byte[]> backingMap) {
+        this.name = checkNotNull(name, "name must not be null");
+        this.serializer = checkNotNull(serializer, "serializer must not be null");
+        this.backingMap = checkNotNull(backingMap, "backingMap must not be null");
+        this.monitor = new MeteringAgent(COMPONENT_NAME, name, true);
     }
 
     @Override
@@ -62,76 +67,95 @@
     @Override
     public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
         final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
-        CompletableFuture<Boolean> response;
-        if (expect == null) {
-            if (update == null) {
-                response = CompletableFuture.completedFuture(true);
-            }
-            response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
-        } else {
-             response = update == null
-                         ? valueMap.remove(name, expect)
-                         : valueMap.replace(name, expect, update);
-        }
-        return response.whenComplete((r, e) -> newTimer.stop(null));
+        return backingMap.replace(name, serializer.encode(expect), serializer.encode(update))
+                         .whenComplete((r, e) -> newTimer.stop(e));
     }
 
     @Override
     public CompletableFuture<V> get() {
         final MeteringAgent.Context newTimer = monitor.startTimer(GET);
-        return valueMap.get(name)
-                .thenApply(Versioned::valueOrNull)
-                .whenComplete((r, e) -> newTimer.stop(null));
+        return backingMap.get(name)
+                         .thenApply(Versioned::valueOrNull)
+                         .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+                         .whenComplete((r, e) -> newTimer.stop(e));
     }
 
     @Override
     public CompletableFuture<V> getAndSet(V value) {
         final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
-        CompletableFuture<Versioned<V>> previousValue = value == null ?
-                valueMap.remove(name) : valueMap.put(name, value);
-        return previousValue.thenApply(Versioned::valueOrNull)
-                            .whenComplete((r, e) -> newTimer.stop(null));
+        if (value == null) {
+            return backingMap.remove(name)
+                             .thenApply(Versioned::valueOrNull)
+                             .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+                             .whenComplete((r, e) -> newTimer.stop(e));
+        }
+        return backingMap.put(name, serializer.encode(value))
+                         .thenApply(Versioned::valueOrNull)
+                         .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+                         .whenComplete((r, e) -> newTimer.stop(e));
     }
 
     @Override
     public CompletableFuture<Void> set(V value) {
         final MeteringAgent.Context newTimer = monitor.startTimer(SET);
-        CompletableFuture<Void> previousValue = value == null ?
-                valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null);
-        return previousValue.whenComplete((r, e) -> newTimer.stop(null));
+        if (value == null) {
+            return backingMap.remove(name)
+                             .whenComplete((r, e) -> newTimer.stop(e))
+                             .thenApply(v -> null);
+
+        }
+        return backingMap.put(name, serializer.encode(value))
+                         .whenComplete((r, e) -> newTimer.stop(e))
+                         .thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
-        synchronized (listeners) {
-            if (listeners.add(listener)) {
-                if (listeners.size() == 1) {
-                    return valueMap.addListener(mapEventListener);
-                }
-            }
-        }
-        return CompletableFuture.completedFuture(null);
+        checkNotNull(listener, "listener must not be null");
+        final MeteringAgent.Context newTimer = monitor.startTimer(ADD_LISTENER);
+        MapEventListener<String, byte[]> mapListener =
+                listeners.computeIfAbsent(listener, key -> new InternalMapValueEventListener(listener));
+        return backingMap.addListener(mapListener).whenComplete((r, e) -> newTimer.stop(e));
     }
 
     @Override
     public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
-        synchronized (listeners) {
-            if (listeners.remove(listener)) {
-                if (listeners.size() == 0) {
-                    return valueMap.removeListener(mapEventListener);
+        checkNotNull(listener, "listener must not be null");
+        final MeteringAgent.Context newTimer = monitor.startTimer(REMOVE_LISTENER);
+        MapEventListener<String, byte[]> mapListener = listeners.remove(listener);
+        if (mapListener != null) {
+            return backingMap.removeListener(mapListener)
+                             .whenComplete((r, e) -> newTimer.stop(e));
+        } else {
+            newTimer.stop(null);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private class InternalMapValueEventListener implements MapEventListener<String, byte[]> {
+
+        private final AtomicValueEventListener<V> listener;
+
+        InternalMapValueEventListener(AtomicValueEventListener<V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void event(MapEvent<String, byte[]> event) {
+            if (event.key().equals(name)) {
+                final MeteringAgent.Context newTimer = monitor.startTimer(NOTIFY_LISTENER);
+                byte[] rawNewValue = Versioned.valueOrNull(event.newValue());
+                byte[] rawOldValue = Versioned.valueOrNull(event.oldValue());
+                try {
+                    listener.event(new AtomicValueEvent<>(name,
+                            rawNewValue == null ? null : serializer.decode(rawNewValue),
+                                    rawOldValue == null ? null : serializer.decode(rawOldValue)));
+                    newTimer.stop(null);
+                } catch (Exception e) {
+                    newTimer.stop(e);
+                    Throwables.propagate(e);
                 }
             }
         }
-        return CompletableFuture.completedFuture(null);
     }
-
-    private class InternalMapEventListener implements MapEventListener<String, V> {
-
-        @Override
-        public void event(MapEvent<String, V> mapEvent) {
-            V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value();
-            AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
-            listeners.forEach(l -> l.event(atomicValueEvent));
-        }
-    }
-}
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index e2f4971..3c2243d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -30,11 +30,11 @@
 public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
 
     private String name;
-    private ConsistentMapBuilder<String, V> mapBuilder;
-    private boolean metering = true;
+    private Serializer serializer;
+    private ConsistentMapBuilder<String, byte[]> mapBuilder;
 
     public DefaultAtomicValueBuilder(DatabaseManager manager) {
-        mapBuilder = manager.<String, V>consistentMapBuilder()
+        mapBuilder = manager.<String, byte[]>consistentMapBuilder()
                             .withName("onos-atomic-values")
                             .withMeteringDisabled()
                             .withSerializer(Serializer.using(KryoNamespaces.BASIC));
@@ -59,14 +59,8 @@
     }
 
     @Override
-    public AtomicValueBuilder<V> withMeteringDisabled() {
-        metering = false;
-        return this;
-    }
-
-    @Override
     public AsyncAtomicValue<V> buildAsyncValue() {
-        return new DefaultAsyncAtomicValue<>(mapBuilder.buildAsyncMap(), name, metering);
+        return new DefaultAsyncAtomicValue<>(name, serializer, mapBuilder.buildAsyncMap());
     }
 
     @Override