Throw ConcurrentModification exception when ConsistentMap compute call conflicts with concurrent operation(s)

Change-Id: Id07868873929f4f413878961c154b62933f3a3f2
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 5b59fb2..ad3bbb6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -29,6 +29,7 @@
 
 import io.atomix.protocols.raft.proxy.RaftProxy;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
@@ -49,6 +50,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
@@ -296,9 +298,7 @@
             try {
                 computedValue = remappingFunction.apply(key, existingValue);
             } catch (Exception e) {
-                CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
-                future.completeExceptionally(e);
-                return future;
+                return Tools.exceptionalFuture(e);
             }
 
             if (computedValue == null && r1 == null) {
@@ -312,9 +312,17 @@
                         new Put(key, computedValue),
                         serializer()::decode)
                         .whenComplete((r, e) -> throwIfLocked(r))
+                        .thenCompose(r -> checkLocked(r))
                         .thenApply(result -> new Versioned<>(computedValue, result.version()));
             } else if (computedValue == null) {
-                return remove(key, r1.version()).thenApply(v -> null);
+                return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
+                        REMOVE_VERSION,
+                        serializer()::encode,
+                        new RemoveVersion(key, r1.version()),
+                        serializer()::decode)
+                        .whenComplete((r, e) -> throwIfLocked(r))
+                        .thenCompose(r -> checkLocked(r))
+                        .thenApply(v -> null);
             } else {
                 return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
                         REPLACE_VERSION,
@@ -322,12 +330,22 @@
                         new ReplaceVersion(key, r1.version(), computedValue),
                         serializer()::decode)
                         .whenComplete((r, e) -> throwIfLocked(r))
+                        .thenCompose(r -> checkLocked(r))
                         .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
                                 ? new Versioned(computedValue, result.version()) : result.result());
             }
         });
     }
 
+    private CompletableFuture<MapEntryUpdateResult<String, byte[]>> checkLocked(
+            MapEntryUpdateResult<String, byte[]> result) {
+        if (result.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
+                result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
+            return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
+        }
+        return CompletableFuture.completedFuture(result);
+    }
+
     @Override
     public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
             Executor executor) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index fe5a2a9..9d72be5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -337,7 +337,11 @@
         String key = commit.value().key();
         MapEntryValue oldValue = entries().get(key);
         MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+
+        // If the value is null or a tombstone, this is an insert.
+        // Otherwise, only update the value if it has changed to reduce the number of events.
         if (valueIsNull(oldValue)) {
+            // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
                         MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -351,6 +355,7 @@
             publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
             return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
         } else if (!valuesEqual(oldValue, newValue)) {
+            // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
                         MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -364,6 +369,7 @@
             publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
             return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
         }
+        // If the value hasn't changed, return a NOOP result.
         return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
     }
 
@@ -376,7 +382,10 @@
     protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
         String key = commit.value().key();
         MapEntryValue oldValue = entries().get(key);
+
+        // If the value is null, this is an INSERT.
         if (valueIsNull(oldValue)) {
+            // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
                         MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -388,13 +397,16 @@
                     MapEntryValue.Type.VALUE,
                     commit.index(),
                     commit.value().value());
-            entries().put(commit.value().key(),
-                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+            entries().put(commit.value().key(), newValue);
             Versioned<byte[]> result = toVersioned(newValue);
             publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
             return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
         }
-        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+        return new MapEntryUpdateResult<>(
+                MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+                commit.index(),
+                key,
+                toVersioned(oldValue));
     }
 
     /**
@@ -407,7 +419,11 @@
         String key = commit.value().key();
         MapEntryValue oldValue = entries().get(key);
         MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+
+        // If the value is null or a tombstone, this is an insert.
+        // Otherwise, only update the value if it has changed to reduce the number of events.
         if (valueIsNull(oldValue)) {
+            // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
                         MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -420,6 +436,7 @@
             publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
             return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
         } else if (!valuesEqual(oldValue, newValue)) {
+            // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
                         MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -445,15 +462,26 @@
      */
     private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
         MapEntryValue value = entries().get(key);
-        if (value == null || !predicate.test(value)) {
-            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+
+        // If the value does not exist or doesn't match the predicate, return a PRECONDITION_FAILED error.
+        if (valueIsNull(value) || !predicate.test(value)) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.PRECONDITION_FAILED, index, key, null);
         }
-        entries().remove(key);
-        if (!activeTransactions.isEmpty()) {
-            entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+
+        // If the key has been locked by a transaction, return a WRITE_LOCK error.
+        if (preparedKeys.contains(key)) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
         }
+
+        // If no transactions are active, remove the key. Otherwise, replace it with a tombstone.
+        if (activeTransactions.isEmpty()) {
+            entries().remove(key);
+        } else {
+            entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, index, null));
+        }
+
         Versioned<byte[]> result = toVersioned(value);
-        publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)));
+        publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result));
         return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
     }
 
@@ -500,15 +528,24 @@
     private MapEntryUpdateResult<String, byte[]> replaceIf(
             long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
         MapEntryValue oldValue = entries().get(key);
-        if (oldValue == null) {
-            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+
+        // If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
+        if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
+            return new MapEntryUpdateResult<>(
+                    MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+                    index,
+                    key,
+                    toVersioned(oldValue));
         }
-        if (!predicate.test(oldValue)) {
-            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue));
+
+        // If the key has been locked by a transaction, return a WRITE_LOCK error.
+        if (preparedKeys.contains(key)) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
         }
+
         entries().put(key, newValue);
         Versioned<byte[]> result = toVersioned(oldValue);
-        publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)));
+        publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
         return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
     }