Added a compute method to ECMap to simplify map interactions following a read-modify-write template.
Change-Id: If8c791ce1f49a7b5b3d04941b6e03a10261c6f6f
diff --git a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
index 6414d6e..06395b8 100644
--- a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiFunction;
/**
* A distributed, eventually consistent map.
@@ -130,6 +131,17 @@
void remove(K key, V value);
/**
+ * Attempts to compute a mapping for the specified key and its current mapped
+ * value (or null if there is no current mapping).
+ * <p>
+ * If the function returns null, the mapping is removed (or remains absent if initially absent).
+ * @param key map key
+ * @param recomputeFunction function to recompute a new value
+ * @return new value
+ */
+ V compute(K key, BiFunction<K, V, V> recomputeFunction);
+
+ /**
* Adds mappings for all key-value pairs in the specified map to this map.
* <p>
* This will be more efficient in communication than calling individual put
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index b91df44..d1f4f2a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -381,6 +381,38 @@
}
@Override
+ public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
+ checkState(!destroyed, destroyedMessage);
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(recomputeFunction, "Recompute function cannot be null");
+
+ AtomicBoolean updated = new AtomicBoolean(false);
+ AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
+ MapValue<V> computedValue = items.compute(key, (k, mv) -> {
+ previousValue.set(mv);
+ V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
+ MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
+ if (mv == null || newValue.isNewerThan(mv)) {
+ updated.set(true);
+ return newValue;
+ } else {
+ return mv;
+ }
+ });
+ if (updated.get()) {
+ notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
+ EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
+ V value = computedValue.isTombstone()
+ ? previousValue.get() == null ? null : previousValue.get().get()
+ : computedValue.get();
+ if (value != null) {
+ notifyListeners(new EventuallyConsistentMapEvent<>(updateType, key, value));
+ }
+ }
+ return computedValue.get();
+ }
+
+ @Override
public void putAll(Map<? extends K, ? extends V> m) {
checkState(!destroyed, destroyedMessage);
m.forEach(this::put);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 9a65630..c6ad750 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -382,6 +382,68 @@
}
@Test
+ public void testCompute() throws Exception {
+ // Set up expectations of external events to be sent to listeners during
+ // the test. These don't use timestamps so we can set them all up at once.
+ EventuallyConsistentMapListener<String, String> listener
+ = getListener();
+ listener.event(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
+ listener.event(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
+ listener.event(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
+ replay(listener);
+
+ ecMap.addListener(listener);
+
+ // Put in an initial value
+ expectPeerMessage(clusterCommunicator);
+ ecMap.compute(KEY1, (k, v) -> VALUE1);
+ assertEquals(VALUE1, ecMap.get(KEY1));
+
+ // Remove the value and check the correct internal cluster messages
+ // are sent
+ expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+ UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
+
+ ecMap.compute(KEY1, (k, v) -> null);
+ assertNull(ecMap.get(KEY1));
+
+ verify(clusterCommunicator);
+
+ // Remove the same value again. Even though the value is no longer in
+ // the map, we expect that the tombstone is updated and another remove
+ // event is sent to the cluster and external listeners.
+ expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+ UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
+
+ ecMap.compute(KEY1, (k, v) -> null);
+ assertNull(ecMap.get(KEY1));
+
+ verify(clusterCommunicator);
+
+ // Put in a new value for us to try and remove
+ expectPeerMessage(clusterCommunicator);
+
+ ecMap.compute(KEY2, (k, v) -> VALUE2);
+
+ clockService.turnBackTime();
+
+ // Remove should have no effect, since it has an older timestamp than
+ // the put. Expect no notifications to be sent out
+ reset(clusterCommunicator);
+ replay(clusterCommunicator);
+
+ ecMap.compute(KEY2, (k, v) -> null);
+
+ verify(clusterCommunicator);
+
+ // Check that our listener received the correct events during the test
+ verify(listener);
+ }
+
+ @Test
public void testPutAll() throws Exception {
// putAll() with an empty map is a no-op - no messages will be sent
reset(clusterCommunicator);