Added a map API to transform Versioned<byte[]> to Versioned<V>
Fix bug where ConsistentMap.{putIfAbsent,remove} do not publish MapEvents
Change-Id: Ib7a9e01cad2b9099e6872916ae392351b68299ef
diff --git a/core/api/src/main/java/org/onosproject/store/service/Versioned.java b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
index 4541f2c..fb6a78b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Versioned.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
@@ -16,6 +16,8 @@
package org.onosproject.store.service;
+import java.util.function.Function;
+
import org.joda.time.DateTime;
import com.google.common.base.MoreObjects;
@@ -85,6 +87,16 @@
return creationTime;
}
+ /**
+ * Maps this instance into another after transforming its
+ * value while retaining the same version and creationTime.
+ * @param transformer function to mapping the value
+ * @return mapped instance
+ */
+ public <U> Versioned<U> map(Function<V, U> transformer) {
+ return new Versioned<>(transformer.apply(value), version, creationTime);
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 4bc93bc..9ef33d5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -276,8 +276,12 @@
checkIfUnmodifiable();
return database.remove(name, keyCache.getUnchecked(key))
.thenApply(this::unwrapResult)
- .thenApply(v -> v != null
- ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+ .thenApply(v -> v != null ? v.<V>map(serializer::decode) : null)
+ .whenComplete((r, e) -> {
+ if (r != null) {
+ notifyListeners(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r));
+ }
+ });
}
@Override
@@ -316,12 +320,19 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
checkIfUnmodifiable();
- return database.putIfAbsent(name,
- keyCache.getUnchecked(key),
- serializer.encode(value))
+ AtomicReference<MapEvent<K, V>> event = new AtomicReference<>();
+ return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
.thenApply(this::unwrapResult)
- .thenApply(v -> v != null ?
- new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+ .whenComplete((r, e) -> {
+ if (r != null && r.updated()) {
+ event.set(new MapEvent<K, V>(name,
+ MapEvent.Type.INSERT,
+ key,
+ r.newValue().<V>map(serializer::decode)));
+ }
+ })
+ .thenApply(v -> v.updated() ? null : v.oldValue().<V>map(serializer::decode))
+ .whenComplete((r, e) -> notifyListeners(event.get()));
}
@Override