Concurrently update EventuallyConsistentMap

- Removed synchronized block on Map updates
  which may result in anti-entropy AD sent to the peer containing out-of-sync update/remove,
  such as update and remove for the same key, but stale information will be ignored
  on the remote peer by timestamp if timestamps are properly generated.

Change-Id: Id4f993eb44b7858d37486be0d4baaff1f9025efa
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index d0e6733..1069f63 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.ecmap;
 
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
@@ -42,6 +43,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -64,8 +66,8 @@
 
     private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
 
-    private final Map<K, Timestamped<V>> items;
-    private final Map<K, Timestamp> removedItems;
+    private final ConcurrentMap<K, Timestamped<V>> items;
+    private final ConcurrentMap<K, Timestamp> removedItems;
 
     private final ClusterService clusterService;
     private final ClusterCommunicationService clusterCommunicator;
@@ -267,16 +269,21 @@
             return false;
         }
 
-        boolean success;
-        synchronized (this) {
-            Timestamped<V> existing = items.get(key);
+        final MutableBoolean updated = new MutableBoolean(false);
+
+        items.compute(key, (k, existing) -> {
             if (existing != null && existing.isNewerThan(timestamp)) {
-                log.debug("ecmap - existing was newer {}", value);
-                success = false;
+                updated.setFalse();
+                return existing;
             } else {
-                items.put(key, new Timestamped<>(value, timestamp));
-                success = true;
+                updated.setTrue();
+                return new Timestamped<>(value, timestamp);
             }
+            });
+
+        boolean success = updated.booleanValue();
+        if (!success) {
+            log.debug("ecmap - existing was newer {}", value);
         }
 
         if (success && removed != null) {
@@ -303,13 +310,21 @@
     }
 
     private boolean removeInternal(K key, Timestamp timestamp) {
-        Timestamped<V> value = items.get(key);
-        if (value != null) {
-            if (value.isNewerThan(timestamp)) {
-                return false;
+        final MutableBoolean updated = new MutableBoolean(false);
+
+        items.compute(key, (k, existing) -> {
+            if (existing != null && existing.isNewerThan(timestamp)) {
+                updated.setFalse();
+                return existing;
             } else {
-                items.remove(key, value);
+                updated.setTrue();
+                // remove from items map
+                return null;
             }
+            });
+
+        if (updated.isFalse()) {
+            return false;
         }
 
         Timestamp removedTimestamp = removedItems.get(key);
@@ -554,23 +569,21 @@
         List<EventuallyConsistentMapEvent<K, V>> externalEvents;
         boolean sync = false;
 
-        synchronized (this) {
-            externalEvents = antiEntropyCheckLocalItems(ad);
+        externalEvents = antiEntropyCheckLocalItems(ad);
 
-            antiEntropyCheckLocalRemoved(ad);
+        antiEntropyCheckLocalRemoved(ad);
 
-            externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
+        externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
 
-            // if remote ad has something unknown, actively sync
-            for (K key : ad.timestamps().keySet()) {
-                if (!items.containsKey(key)) {
-                    sync = true;
-                    break;
-                }
+        // if remote ad has something unknown, actively sync
+        for (K key : ad.timestamps().keySet()) {
+            if (!items.containsKey(key)) {
+                sync = true;
+                break;
             }
-        } // synchronized (this)
+        }
 
-        // Send the advertisement outside the synchronized block
+        // Send the advertisement back if this peer is out-of-sync
         if (sync) {
             final NodeId sender = ad.sender();
             AntiEntropyAdvertisement<K> myAd = createAdvertisement();
@@ -596,7 +609,6 @@
      * @param ad remote anti-entropy advertisement
      * @return list of external events relating to local operations performed
      */
-    // Guarded by synchronized (this)
     private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
             AntiEntropyAdvertisement<K> ad) {
         final List<EventuallyConsistentMapEvent<K, V>> externalEvents
@@ -652,7 +664,6 @@
      *
      * @param ad remote anti-entropy advertisement
      */
-    // Guarded by synchronized (this)
     private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
         final NodeId sender = ad.sender();
 
@@ -690,7 +701,6 @@
      * @param ad remote anti-entropy advertisement
      * @return list of external events relating to local operations performed
      */
-    // Guarded by synchronized (this)
     private List<EventuallyConsistentMapEvent<K, V>>
     antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
         final List<EventuallyConsistentMapEvent<K, V>> externalEvents