Throw ConcurrentModification exception when ConsistentMap compute call conflicts with concurrent operation(s)
Change-Id: Id07868873929f4f413878961c154b62933f3a3f2
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index fe5a2a9..9d72be5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -337,7 +337,11 @@
String key = commit.value().key();
MapEntryValue oldValue = entries().get(key);
MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+
+ // If the value is null or a tombstone, this is an insert.
+ // Otherwise, only update the value if it has changed to reduce the number of events.
if (valueIsNull(oldValue)) {
+ // If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -351,6 +355,7 @@
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
} else if (!valuesEqual(oldValue, newValue)) {
+ // If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -364,6 +369,7 @@
publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
}
+ // If the value hasn't changed, return a NOOP result.
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
}
@@ -376,7 +382,10 @@
protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
String key = commit.value().key();
MapEntryValue oldValue = entries().get(key);
+
+ // If the value is null, this is an INSERT.
if (valueIsNull(oldValue)) {
+ // If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -388,13 +397,16 @@
MapEntryValue.Type.VALUE,
commit.index(),
commit.value().value());
- entries().put(commit.value().key(),
- new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ entries().put(commit.value().key(), newValue);
Versioned<byte[]> result = toVersioned(newValue);
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
}
- return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
/**
@@ -407,7 +419,11 @@
String key = commit.value().key();
MapEntryValue oldValue = entries().get(key);
MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+
+ // If the value is null or a tombstone, this is an insert.
+ // Otherwise, only update the value if it has changed to reduce the number of events.
if (valueIsNull(oldValue)) {
+ // If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -420,6 +436,7 @@
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
} else if (!valuesEqual(oldValue, newValue)) {
+ // If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
MapEntryUpdateResult.Status.WRITE_LOCK,
@@ -445,15 +462,26 @@
*/
private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
MapEntryValue value = entries().get(key);
- if (value == null || !predicate.test(value)) {
- return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+
+ // If the value does not exist or doesn't match the predicate, return a PRECONDITION_FAILED error.
+ if (valueIsNull(value) || !predicate.test(value)) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.PRECONDITION_FAILED, index, key, null);
}
- entries().remove(key);
- if (!activeTransactions.isEmpty()) {
- entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+
+ // If the key has been locked by a transaction, return a WRITE_LOCK error.
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
}
+
+ // If no transactions are active, remove the key. Otherwise, replace it with a tombstone.
+ if (activeTransactions.isEmpty()) {
+ entries().remove(key);
+ } else {
+ entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, index, null));
+ }
+
Versioned<byte[]> result = toVersioned(value);
- publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)));
+ publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
}
@@ -500,15 +528,24 @@
private MapEntryUpdateResult<String, byte[]> replaceIf(
long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
MapEntryValue oldValue = entries().get(key);
- if (oldValue == null) {
- return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+
+ // If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
+ if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+ index,
+ key,
+ toVersioned(oldValue));
}
- if (!predicate.test(oldValue)) {
- return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue));
+
+ // If the key has been locked by a transaction, return a WRITE_LOCK error.
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
}
+
entries().put(key, newValue);
Versioned<byte[]> result = toVersioned(oldValue);
- publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)));
+ publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
}