Added a map API to transform Versioned<byte[]> to Versioned<V>
Fix bug where ConsistentMap.{putIfAbsent,remove} do not publish MapEvents

Change-Id: Ib7a9e01cad2b9099e6872916ae392351b68299ef
diff --git a/core/api/src/main/java/org/onosproject/store/service/Versioned.java b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
index 4541f2c..fb6a78b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Versioned.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.store.service;
 
+import java.util.function.Function;
+
 import org.joda.time.DateTime;
 
 import com.google.common.base.MoreObjects;
@@ -85,6 +87,16 @@
         return creationTime;
     }
 
+    /**
+     * Maps this instance into another after transforming its
+     * value while retaining the same version and creationTime.
+     * @param transformer function to mapping the value
+     * @return mapped instance
+     */
+    public <U> Versioned<U> map(Function<V, U> transformer) {
+        return new Versioned<>(transformer.apply(value), version, creationTime);
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 4bc93bc..9ef33d5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -276,8 +276,12 @@
         checkIfUnmodifiable();
         return database.remove(name, keyCache.getUnchecked(key))
                 .thenApply(this::unwrapResult)
-                .thenApply(v -> v != null
-                ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+                .thenApply(v -> v != null ? v.<V>map(serializer::decode) : null)
+                .whenComplete((r, e) -> {
+                    if (r != null) {
+                        notifyListeners(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r));
+                    }
+                });
     }
 
     @Override
@@ -316,12 +320,19 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
         checkIfUnmodifiable();
-        return database.putIfAbsent(name,
-                                    keyCache.getUnchecked(key),
-                                    serializer.encode(value))
+        AtomicReference<MapEvent<K, V>> event = new AtomicReference<>();
+        return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
                 .thenApply(this::unwrapResult)
-                .thenApply(v -> v != null ?
-                        new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+                .whenComplete((r, e) -> {
+                    if (r != null && r.updated()) {
+                        event.set(new MapEvent<K, V>(name,
+                                                 MapEvent.Type.INSERT,
+                                                 key,
+                                                 r.newValue().<V>map(serializer::decode)));
+                    }
+                })
+                .thenApply(v -> v.updated() ? null : v.oldValue().<V>map(serializer::decode))
+                .whenComplete((r, e) -> notifyListeners(event.get()));
     }
 
     @Override