AtomicValue updates
    - events now have old and new values
    - simplify implementation by using map.replace for compareAndSet
    - Removed option for disabling it from builder. Will make it a system wide option in a subsequent update

Change-Id: I7c1424f8671c0e54688172c273b9262f305b0920
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
index 912d6d0..7bbd9d3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
@@ -13,31 +13,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.onosproject.store.primitives.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AtomicValueEvent;
 import org.onosproject.store.service.AtomicValueEventListener;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
 
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArraySet;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 
-/**
- * Default implementation of {@link AsyncAtomicValue}.
- *
- * @param <V> value type
- */
+
 public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
 
-    private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
-    private final AsyncConsistentMap<String, V> valueMap;
     private final String name;
-    private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener();
+    private final Serializer serializer;
+    private final AsyncConsistentMap<String, byte[]> backingMap;
+    private final Map<AtomicValueEventListener<V>, MapEventListener<String, byte[]>> listeners =
+            Maps.newIdentityHashMap();
     private final MeteringAgent monitor;
 
     private static final String COMPONENT_NAME = "atomicValue";
@@ -45,13 +48,15 @@
     private static final String GET_AND_SET = "getAndSet";
     private static final String SET = "set";
     private static final String COMPARE_AND_SET = "compareAndSet";
+    private static final String ADD_LISTENER = "addListener";
+    private static final String REMOVE_LISTENER = "removeListener";
+    private static final String NOTIFY_LISTENER = "notifyListener";
 
-    public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap,
-                              String name,
-                              boolean meteringEnabled) {
-        this.valueMap = valueMap;
-        this.name = name;
-        this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
+    public DefaultAsyncAtomicValue(String name, Serializer serializer, AsyncConsistentMap<String, byte[]> backingMap) {
+        this.name = checkNotNull(name, "name must not be null");
+        this.serializer = checkNotNull(serializer, "serializer must not be null");
+        this.backingMap = checkNotNull(backingMap, "backingMap must not be null");
+        this.monitor = new MeteringAgent(COMPONENT_NAME, name, true);
     }
 
     @Override
@@ -62,76 +67,95 @@
     @Override
     public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
         final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
-        CompletableFuture<Boolean> response;
-        if (expect == null) {
-            if (update == null) {
-                response = CompletableFuture.completedFuture(true);
-            }
-            response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
-        } else {
-             response = update == null
-                         ? valueMap.remove(name, expect)
-                         : valueMap.replace(name, expect, update);
-        }
-        return response.whenComplete((r, e) -> newTimer.stop(null));
+        return backingMap.replace(name, serializer.encode(expect), serializer.encode(update))
+                         .whenComplete((r, e) -> newTimer.stop(e));
     }
 
     @Override
     public CompletableFuture<V> get() {
         final MeteringAgent.Context newTimer = monitor.startTimer(GET);
-        return valueMap.get(name)
-                .thenApply(Versioned::valueOrNull)
-                .whenComplete((r, e) -> newTimer.stop(null));
+        return backingMap.get(name)
+                         .thenApply(Versioned::valueOrNull)
+                         .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+                         .whenComplete((r, e) -> newTimer.stop(e));
     }
 
     @Override
     public CompletableFuture<V> getAndSet(V value) {
         final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
-        CompletableFuture<Versioned<V>> previousValue = value == null ?
-                valueMap.remove(name) : valueMap.put(name, value);
-        return previousValue.thenApply(Versioned::valueOrNull)
-                            .whenComplete((r, e) -> newTimer.stop(null));
+        if (value == null) {
+            return backingMap.remove(name)
+                             .thenApply(Versioned::valueOrNull)
+                             .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+                             .whenComplete((r, e) -> newTimer.stop(e));
+        }
+        return backingMap.put(name, serializer.encode(value))
+                         .thenApply(Versioned::valueOrNull)
+                         .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+                         .whenComplete((r, e) -> newTimer.stop(e));
     }
 
     @Override
     public CompletableFuture<Void> set(V value) {
         final MeteringAgent.Context newTimer = monitor.startTimer(SET);
-        CompletableFuture<Void> previousValue = value == null ?
-                valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null);
-        return previousValue.whenComplete((r, e) -> newTimer.stop(null));
+        if (value == null) {
+            return backingMap.remove(name)
+                             .whenComplete((r, e) -> newTimer.stop(e))
+                             .thenApply(v -> null);
+
+        }
+        return backingMap.put(name, serializer.encode(value))
+                         .whenComplete((r, e) -> newTimer.stop(e))
+                         .thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
-        synchronized (listeners) {
-            if (listeners.add(listener)) {
-                if (listeners.size() == 1) {
-                    return valueMap.addListener(mapEventListener);
-                }
-            }
-        }
-        return CompletableFuture.completedFuture(null);
+        checkNotNull(listener, "listener must not be null");
+        final MeteringAgent.Context newTimer = monitor.startTimer(ADD_LISTENER);
+        MapEventListener<String, byte[]> mapListener =
+                listeners.computeIfAbsent(listener, key -> new InternalMapValueEventListener(listener));
+        return backingMap.addListener(mapListener).whenComplete((r, e) -> newTimer.stop(e));
     }
 
     @Override
     public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
-        synchronized (listeners) {
-            if (listeners.remove(listener)) {
-                if (listeners.size() == 0) {
-                    return valueMap.removeListener(mapEventListener);
+        checkNotNull(listener, "listener must not be null");
+        final MeteringAgent.Context newTimer = monitor.startTimer(REMOVE_LISTENER);
+        MapEventListener<String, byte[]> mapListener = listeners.remove(listener);
+        if (mapListener != null) {
+            return backingMap.removeListener(mapListener)
+                             .whenComplete((r, e) -> newTimer.stop(e));
+        } else {
+            newTimer.stop(null);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private class InternalMapValueEventListener implements MapEventListener<String, byte[]> {
+
+        private final AtomicValueEventListener<V> listener;
+
+        InternalMapValueEventListener(AtomicValueEventListener<V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void event(MapEvent<String, byte[]> event) {
+            if (event.key().equals(name)) {
+                final MeteringAgent.Context newTimer = monitor.startTimer(NOTIFY_LISTENER);
+                byte[] rawNewValue = Versioned.valueOrNull(event.newValue());
+                byte[] rawOldValue = Versioned.valueOrNull(event.oldValue());
+                try {
+                    listener.event(new AtomicValueEvent<>(name,
+                            rawNewValue == null ? null : serializer.decode(rawNewValue),
+                                    rawOldValue == null ? null : serializer.decode(rawOldValue)));
+                    newTimer.stop(null);
+                } catch (Exception e) {
+                    newTimer.stop(e);
+                    Throwables.propagate(e);
                 }
             }
         }
-        return CompletableFuture.completedFuture(null);
     }
-
-    private class InternalMapEventListener implements MapEventListener<String, V> {
-
-        @Override
-        public void event(MapEvent<String, V> mapEvent) {
-            V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value();
-            AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
-            listeners.forEach(l -> l.event(atomicValueEvent));
-        }
-    }
-}
+}
\ No newline at end of file