Added a compute method to ECMap to simplify map interactions following a read-modify-write template.

Change-Id: If8c791ce1f49a7b5b3d04941b6e03a10261c6f6f
diff --git a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
index 6414d6e..06395b8 100644
--- a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
@@ -18,6 +18,7 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiFunction;
 
 /**
  * A distributed, eventually consistent map.
@@ -130,6 +131,17 @@
     void remove(K key, V value);
 
     /**
+     * Attempts to compute a mapping for the specified key and its current mapped
+     * value (or null if there is no current mapping).
+     * <p>
+     * If the function returns null, the mapping is removed (or remains absent if initially absent).
+     * @param key map key
+     * @param recomputeFunction function to recompute a new value
+     * @return new value
+     */
+    V compute(K key, BiFunction<K, V, V> recomputeFunction);
+
+    /**
      * Adds mappings for all key-value pairs in the specified map to this map.
      * <p>
      * This will be more efficient in communication than calling individual put
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index b91df44..d1f4f2a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -381,6 +381,38 @@
     }
 
     @Override
+    public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(recomputeFunction, "Recompute function cannot be null");
+
+        AtomicBoolean updated = new AtomicBoolean(false);
+        AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
+        MapValue<V> computedValue = items.compute(key, (k, mv) -> {
+            previousValue.set(mv);
+            V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
+            MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
+            if (mv == null || newValue.isNewerThan(mv)) {
+                updated.set(true);
+                return newValue;
+            } else {
+                return mv;
+            }
+        });
+        if (updated.get()) {
+            notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
+            EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
+            V value = computedValue.isTombstone()
+                    ? previousValue.get() == null ? null : previousValue.get().get()
+                    : computedValue.get();
+            if (value != null) {
+                notifyListeners(new EventuallyConsistentMapEvent<>(updateType, key, value));
+            }
+        }
+        return computedValue.get();
+    }
+
+    @Override
     public void putAll(Map<? extends K, ? extends V> m) {
         checkState(!destroyed, destroyedMessage);
         m.forEach(this::put);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 9a65630..c6ad750 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -382,6 +382,68 @@
     }
 
     @Test
+    public void testCompute() throws Exception {
+        // Set up expectations of external events to be sent to listeners during
+        // the test. These don't use timestamps so we can set them all up at once.
+        EventuallyConsistentMapListener<String, String> listener
+                = getListener();
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
+        listener.event(new EventuallyConsistentMapEvent<>(
+                EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
+        replay(listener);
+
+        ecMap.addListener(listener);
+
+        // Put in an initial value
+        expectPeerMessage(clusterCommunicator);
+        ecMap.compute(KEY1, (k, v) -> VALUE1);
+        assertEquals(VALUE1, ecMap.get(KEY1));
+
+        // Remove the value and check the correct internal cluster messages
+        // are sent
+        expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+                UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
+
+        ecMap.compute(KEY1, (k, v) -> null);
+        assertNull(ecMap.get(KEY1));
+
+        verify(clusterCommunicator);
+
+        // Remove the same value again. Even though the value is no longer in
+        // the map, we expect that the tombstone is updated and another remove
+        // event is sent to the cluster and external listeners.
+        expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+                UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
+
+        ecMap.compute(KEY1, (k, v) -> null);
+        assertNull(ecMap.get(KEY1));
+
+        verify(clusterCommunicator);
+
+        // Put in a new value for us to try and remove
+        expectPeerMessage(clusterCommunicator);
+
+        ecMap.compute(KEY2, (k, v) -> VALUE2);
+
+        clockService.turnBackTime();
+
+        // Remove should have no effect, since it has an older timestamp than
+        // the put. Expect no notifications to be sent out
+        reset(clusterCommunicator);
+        replay(clusterCommunicator);
+
+        ecMap.compute(KEY2, (k, v) -> null);
+
+        verify(clusterCommunicator);
+
+        // Check that our listener received the correct events during the test
+        verify(listener);
+    }
+
+    @Test
     public void testPutAll() throws Exception {
         // putAll() with an empty map is a no-op - no messages will be sent
         reset(clusterCommunicator);