Concurrently update EventuallyConsistentMap
- Removed synchronized block on Map updates
which may result in anti-entropy AD sent to the peer containing out-of-sync update/remove,
such as update and remove for the same key, but stale information will be ignored
on the remote peer by timestamp if timestamps are properly generated.
Change-Id: Id4f993eb44b7858d37486be0d4baaff1f9025efa
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index d0e6733..1069f63 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -16,6 +16,7 @@
package org.onosproject.store.ecmap;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
@@ -42,6 +43,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -64,8 +66,8 @@
private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
- private final Map<K, Timestamped<V>> items;
- private final Map<K, Timestamp> removedItems;
+ private final ConcurrentMap<K, Timestamped<V>> items;
+ private final ConcurrentMap<K, Timestamp> removedItems;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
@@ -267,16 +269,21 @@
return false;
}
- boolean success;
- synchronized (this) {
- Timestamped<V> existing = items.get(key);
+ final MutableBoolean updated = new MutableBoolean(false);
+
+ items.compute(key, (k, existing) -> {
if (existing != null && existing.isNewerThan(timestamp)) {
- log.debug("ecmap - existing was newer {}", value);
- success = false;
+ updated.setFalse();
+ return existing;
} else {
- items.put(key, new Timestamped<>(value, timestamp));
- success = true;
+ updated.setTrue();
+ return new Timestamped<>(value, timestamp);
}
+ });
+
+ boolean success = updated.booleanValue();
+ if (!success) {
+ log.debug("ecmap - existing was newer {}", value);
}
if (success && removed != null) {
@@ -303,13 +310,21 @@
}
private boolean removeInternal(K key, Timestamp timestamp) {
- Timestamped<V> value = items.get(key);
- if (value != null) {
- if (value.isNewerThan(timestamp)) {
- return false;
+ final MutableBoolean updated = new MutableBoolean(false);
+
+ items.compute(key, (k, existing) -> {
+ if (existing != null && existing.isNewerThan(timestamp)) {
+ updated.setFalse();
+ return existing;
} else {
- items.remove(key, value);
+ updated.setTrue();
+ // remove from items map
+ return null;
}
+ });
+
+ if (updated.isFalse()) {
+ return false;
}
Timestamp removedTimestamp = removedItems.get(key);
@@ -554,23 +569,21 @@
List<EventuallyConsistentMapEvent<K, V>> externalEvents;
boolean sync = false;
- synchronized (this) {
- externalEvents = antiEntropyCheckLocalItems(ad);
+ externalEvents = antiEntropyCheckLocalItems(ad);
- antiEntropyCheckLocalRemoved(ad);
+ antiEntropyCheckLocalRemoved(ad);
- externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
+ externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
- // if remote ad has something unknown, actively sync
- for (K key : ad.timestamps().keySet()) {
- if (!items.containsKey(key)) {
- sync = true;
- break;
- }
+ // if remote ad has something unknown, actively sync
+ for (K key : ad.timestamps().keySet()) {
+ if (!items.containsKey(key)) {
+ sync = true;
+ break;
}
- } // synchronized (this)
+ }
- // Send the advertisement outside the synchronized block
+ // Send the advertisement back if this peer is out-of-sync
if (sync) {
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
@@ -596,7 +609,6 @@
* @param ad remote anti-entropy advertisement
* @return list of external events relating to local operations performed
*/
- // Guarded by synchronized (this)
private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents
@@ -652,7 +664,6 @@
*
* @param ad remote anti-entropy advertisement
*/
- // Guarded by synchronized (this)
private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
final NodeId sender = ad.sender();
@@ -690,7 +701,6 @@
* @param ad remote anti-entropy advertisement
* @return list of external events relating to local operations performed
*/
- // Guarded by synchronized (this)
private List<EventuallyConsistentMapEvent<K, V>>
antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents