Refactor AtomixConsistentMap to use separate operations per method call for better performance and control over operation semantics.
Change-Id: I948c5c73d4ab38c9c2b20f8c80ba01548f95dda6
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index 6d8c1b0..fe5a2a9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -22,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import com.google.common.base.Throwables;
@@ -42,12 +44,18 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
@@ -69,14 +77,19 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
-import static org.onosproject.store.service.MapEvent.Type.INSERT;
-import static org.onosproject.store.service.MapEvent.Type.REMOVE;
-import static org.onosproject.store.service.MapEvent.Type.UPDATE;
/**
* State Machine for {@link AtomixConsistentMap} resource.
@@ -87,7 +100,7 @@
.register(KryoNamespaces.BASIC)
.register(AtomixConsistentMapOperations.NAMESPACE)
.register(AtomixConsistentMapEvents.NAMESPACE)
- .nextId(KryoNamespace.FLOATING_ID)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
.register(TransactionScope.class)
.register(TransactionLog.class)
.register(TransactionId.class)
@@ -96,56 +109,80 @@
.register(new HashMap().keySet().getClass())
.build());
- private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
- private Map<String, MapEntryValue> mapEntries = new HashMap<>();
- private Set<String> preparedKeys = Sets.newHashSet();
- private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
- private long currentVersion;
+ protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
+ private Map<String, MapEntryValue> map;
+ protected Set<String> preparedKeys = Sets.newHashSet();
+ protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+ protected long currentVersion;
+
+ public AtomixConsistentMapService() {
+ map = createMap();
+ }
+
+ protected Map<String, MapEntryValue> createMap() {
+ return Maps.newHashMap();
+ }
+
+ protected Map<String, MapEntryValue> entries() {
+ return map;
+ }
+
+ protected Serializer serializer() {
+ return SERIALIZER;
+ }
@Override
public void snapshot(SnapshotWriter writer) {
- writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
- writer.writeObject(preparedKeys, SERIALIZER::encode);
- writer.writeObject(mapEntries, SERIALIZER::encode);
- writer.writeObject(activeTransactions, SERIALIZER::encode);
+ writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
+ writer.writeObject(preparedKeys, serializer()::encode);
+ writer.writeObject(entries(), serializer()::encode);
+ writer.writeObject(activeTransactions, serializer()::encode);
writer.writeLong(currentVersion);
}
@Override
public void install(SnapshotReader reader) {
listeners = new LinkedHashMap<>();
- for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
+ for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
listeners.put(sessionId, getSessions().getSession(sessionId));
}
- preparedKeys = reader.readObject(SERIALIZER::decode);
- mapEntries = reader.readObject(SERIALIZER::decode);
- activeTransactions = reader.readObject(SERIALIZER::decode);
+ preparedKeys = reader.readObject(serializer()::decode);
+ map = reader.readObject(serializer()::decode);
+ activeTransactions = reader.readObject(serializer()::decode);
currentVersion = reader.readLong();
}
@Override
protected void configure(RaftServiceExecutor executor) {
// Listeners
- executor.register(ADD_LISTENER, this::listen);
- executor.register(REMOVE_LISTENER, this::unlisten);
+ executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
+ executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
// Queries
- executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
- executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
- executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
- executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
- executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
- executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
- executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
- executor.register(SIZE, this::size, SERIALIZER::encode);
- executor.register(VALUES, this::values, SERIALIZER::encode);
+ executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
+ executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
+ executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
+ executor.register(GET, serializer()::decode, this::get, serializer()::encode);
+ executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
+ executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
+ executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
+ executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
+ executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
// Commands
- executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
- executor.register(CLEAR, this::clear, SERIALIZER::encode);
- executor.register(BEGIN, SERIALIZER::decode, this::begin, SERIALIZER::encode);
- executor.register(PREPARE, SERIALIZER::decode, this::prepare, SERIALIZER::encode);
- executor.register(PREPARE_AND_COMMIT, SERIALIZER::decode, this::prepareAndCommit, SERIALIZER::encode);
- executor.register(COMMIT, SERIALIZER::decode, this::commit, SERIALIZER::encode);
- executor.register(ROLLBACK, SERIALIZER::decode, this::rollback, SERIALIZER::encode);
+ executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
+ executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
+ executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
+ executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
+ executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
+ executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
+ executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
+ executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
+ executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
+ executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
+ executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
+ executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
+ executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
+ executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
+ executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
}
/**
@@ -155,7 +192,7 @@
* @return {@code true} if map contains key
*/
protected boolean containsKey(Commit<? extends ContainsKey> commit) {
- MapEntryValue value = mapEntries.get(commit.value().key());
+ MapEntryValue value = entries().get(commit.value().key());
return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
}
@@ -167,7 +204,7 @@
*/
protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
- return mapEntries.values().stream()
+ return entries().values().stream()
.filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
.anyMatch(value -> valueMatch.matches(value.value()));
}
@@ -179,7 +216,7 @@
* @return value mapped to key
*/
protected Versioned<byte[]> get(Commit<? extends Get> commit) {
- return toVersioned(mapEntries.get(commit.value().key()));
+ return toVersioned(entries().get(commit.value().key()));
}
/**
@@ -189,7 +226,7 @@
* @return value mapped to key
*/
protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
- MapEntryValue value = mapEntries.get(commit.value().key());
+ MapEntryValue value = entries().get(commit.value().key());
if (value == null) {
return new Versioned<>(commit.value().defaultValue(), 0);
} else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
@@ -200,13 +237,12 @@
}
/**
- * Handles a count commit.
+ * Handles a size commit.
*
- * @param commit size commit
* @return number of entries in map
*/
- protected int size(Commit<Void> commit) {
- return (int) mapEntries.values().stream()
+ protected int size() {
+ return (int) entries().values().stream()
.filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
.count();
}
@@ -214,22 +250,20 @@
/**
* Handles an is empty commit.
*
- * @param commit isEmpty commit
* @return {@code true} if map is empty
*/
- protected boolean isEmpty(Commit<Void> commit) {
- return mapEntries.values().stream()
+ protected boolean isEmpty() {
+ return entries().values().stream()
.noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
}
/**
* Handles a keySet commit.
*
- * @param commit keySet commit
* @return set of keys in map
*/
- protected Set<String> keySet(Commit<Void> commit) {
- return mapEntries.entrySet().stream()
+ protected Set<String> keySet() {
+ return entries().entrySet().stream()
.filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
@@ -238,11 +272,10 @@
/**
* Handles a values commit.
*
- * @param commit values commit
* @return collection of values in map
*/
- protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
- return mapEntries.entrySet().stream()
+ protected Collection<Versioned<byte[]>> values() {
+ return entries().entrySet().stream()
.filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
.map(entry -> toVersioned(entry.getValue()))
.collect(Collectors.toList());
@@ -251,103 +284,311 @@
/**
* Handles a entry set commit.
*
- * @param commit entrySet commit
* @return set of map entries
*/
- protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
- return mapEntries.entrySet().stream()
+ protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
+ return entries().entrySet().stream()
.filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
.map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
.collect(Collectors.toSet());
}
/**
- * Handles a update and get commit.
+ * Returns a boolean indicating whether the given MapEntryValues are equal.
*
- * @param commit updateAndGet commit
- * @return update result
+ * @param oldValue the first value to compare
+ * @param newValue the second value to compare
+ * @return indicates whether the two values are equal
*/
- protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
- try {
- MapEntryUpdateResult.Status updateStatus = validate(commit.value());
- String key = commit.value().key();
- MapEntryValue oldCommitValue = mapEntries.get(commit.value().key());
- Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
+ protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
+ return (oldValue == null && newValue == null)
+ || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
+ }
- if (updateStatus != MapEntryUpdateResult.Status.OK) {
- return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, oldMapValue);
- }
+ /**
+ * Returns a boolean indicating whether the given entry values are equal.
+ *
+ * @param oldValue the first value to compare
+ * @param newValue the second value to compare
+ * @return indicates whether the two values are equal
+ */
+ protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
+ return (oldValue == null && newValue == null)
+ || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
+ }
- byte[] newValue = commit.value().value();
- currentVersion = commit.index();
- Versioned<byte[]> newMapValue = newValue == null ? null
- : new Versioned<>(newValue, currentVersion);
+ /**
+ * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
+ *
+ * @param value the value to check
+ * @return indicates whether the given value is null or is a tombstone
+ */
+ protected boolean valueIsNull(MapEntryValue value) {
+ return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
+ }
- MapEvent.Type updateType = newValue == null ? REMOVE
- : oldCommitValue == null ? INSERT : UPDATE;
-
- // If a value existed in the map, remove and discard the value to ensure disk can be freed.
- if (updateType == REMOVE || updateType == UPDATE) {
- mapEntries.remove(key);
- }
-
- // If this is an insert/update commit, add the commit to the map entries.
- if (updateType == INSERT || updateType == UPDATE) {
- mapEntries.put(key, new MapEntryValue(
- MapEntryValue.Type.VALUE,
+ /**
+ * Handles a put commit.
+ *
+ * @param commit put commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
+ String key = commit.value().key();
+ MapEntryValue oldValue = entries().get(key);
+ MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+ if (valueIsNull(oldValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
commit.index(),
- commit.value().value()));
- } else if (!activeTransactions.isEmpty()) {
- // If this is a delete but transactions are currently running, ensure tombstones are retained
- // for version checks.
- mapEntries.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, commit.index(), null));
+ key,
+ toVersioned(oldValue));
}
-
- publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
- return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, newMapValue);
- } catch (Exception e) {
- getLogger().error("State machine operation failed", e);
- throw Throwables.propagate(e);
+ entries().put(commit.value().key(),
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ Versioned<byte[]> result = toVersioned(oldValue);
+ publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+ } else if (!valuesEqual(oldValue, newValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
+ }
+ entries().put(commit.value().key(),
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ Versioned<byte[]> result = toVersioned(oldValue);
+ publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
}
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+ }
+
+ /**
+ * Handles a putIfAbsent commit.
+ *
+ * @param commit putIfAbsent commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
+ String key = commit.value().key();
+ MapEntryValue oldValue = entries().get(key);
+ if (valueIsNull(oldValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
+ }
+ MapEntryValue newValue = new MapEntryValue(
+ MapEntryValue.Type.VALUE,
+ commit.index(),
+ commit.value().value());
+ entries().put(commit.value().key(),
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ Versioned<byte[]> result = toVersioned(newValue);
+ publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
+ }
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+ }
+
+ /**
+ * Handles a putAndGet commit.
+ *
+ * @param commit putAndGet commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
+ String key = commit.value().key();
+ MapEntryValue oldValue = entries().get(key);
+ MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+ if (valueIsNull(oldValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
+ }
+ entries().put(commit.value().key(), newValue);
+ Versioned<byte[]> result = toVersioned(newValue);
+ publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+ } else if (!valuesEqual(oldValue, newValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
+ }
+ entries().put(commit.value().key(), newValue);
+ Versioned<byte[]> result = toVersioned(newValue);
+ publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+ }
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+ }
+
+ /**
+ * Handles a remove commit.
+ *
+ * @param index the commit index
+ * @param key the key to remove
+ * @param predicate predicate to determine whether to remove the entry
+ * @return map entry update result
+ */
+ private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
+ MapEntryValue value = entries().get(key);
+ if (value == null || !predicate.test(value)) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+ }
+ entries().remove(key);
+ if (!activeTransactions.isEmpty()) {
+ entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+ }
+ Versioned<byte[]> result = toVersioned(value);
+ publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
+ }
+
+ /**
+ * Handles a remove commit.
+ *
+ * @param commit remove commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
+ return removeIf(commit.index(), commit.value().key(), v -> true);
+ }
+
+ /**
+ * Handles a removeValue commit.
+ *
+ * @param commit removeValue commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
+ return removeIf(commit.index(), commit.value().key(), v ->
+ valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
+ }
+
+ /**
+ * Handles a removeVersion commit.
+ *
+ * @param commit removeVersion commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
+ return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
+ }
+
+ /**
+ * Handles a replace commit.
+ *
+ * @param index the commit index
+ * @param key the key to replace
+ * @param newValue the value with which to replace the key
+ * @param predicate a predicate to determine whether to replace the key
+ * @return map entry update result
+ */
+ private MapEntryUpdateResult<String, byte[]> replaceIf(
+ long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
+ MapEntryValue oldValue = entries().get(key);
+ if (oldValue == null) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+ }
+ if (!predicate.test(oldValue)) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue));
+ }
+ entries().put(key, newValue);
+ Versioned<byte[]> result = toVersioned(oldValue);
+ publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
+ }
+
+ /**
+ * Handles a replace commit.
+ *
+ * @param commit replace commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
+ MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+ return replaceIf(commit.index(), commit.value().key(), value, v -> true);
+ }
+
+ /**
+ * Handles a replaceValue commit.
+ *
+ * @param commit replaceValue commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
+ MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
+ return replaceIf(commit.index(), commit.value().key(), value,
+ v -> valuesEqual(v.value(), commit.value().oldValue()));
+ }
+
+ /**
+ * Handles a replaceVersion commit.
+ *
+ * @param commit replaceVersion commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
+ MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
+ return replaceIf(commit.index(), commit.value().key(), value,
+ v -> v.version() == commit.value().oldVersion());
}
/**
* Handles a clear commit.
*
- * @param commit clear commit
* @return clear result
*/
- protected MapEntryUpdateResult.Status clear(Commit<Void> commit) {
- Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
- .entrySet().iterator();
+ protected MapEntryUpdateResult.Status clear() {
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
+ Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
while (iterator.hasNext()) {
Map.Entry<String, MapEntryValue> entry = iterator.next();
String key = entry.getKey();
MapEntryValue value = entry.getValue();
- Versioned<byte[]> removedValue = new Versioned<>(value.value(),
- value.version());
- publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
- iterator.remove();
+ if (!valueIsNull(value)) {
+ Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
+ publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
+ if (activeTransactions.isEmpty()) {
+ iterator.remove();
+ } else {
+ entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+ }
+ }
}
+ entries().putAll(entriesToAdd);
return MapEntryUpdateResult.Status.OK;
}
/**
* Handles a listen commit.
*
- * @param commit listen commit
+ * @param session listen session
*/
- protected void listen(Commit<Void> commit) {
- listeners.put(commit.session().sessionId().id(), commit.session());
+ protected void listen(RaftSession session) {
+ listeners.put(session.sessionId().id(), session);
}
/**
* Handles an unlisten commit.
*
- * @param commit unlisten commit
+ * @param session unlisten session
*/
- protected void unlisten(Commit<Void> commit) {
- listeners.remove(commit.session().sessionId().id());
+ protected void unlisten(RaftSession session) {
+ listeners.remove(session.sessionId().id());
}
/**
@@ -412,7 +653,7 @@
}
// Read the existing value from the map.
- MapEntryValue existingValue = mapEntries.get(key);
+ MapEntryValue existingValue = entries().get(key);
// Note: if the existing value is null, that means the key has not changed during the transaction,
// otherwise a tombstone would have been retained.
@@ -501,7 +742,7 @@
continue;
}
- MapEntryValue previousValue = mapEntries.remove(key);
+ MapEntryValue previousValue = entries().remove(key);
MapEntryValue newValue = null;
// If the record is not a delete, create a transactional commit.
@@ -512,11 +753,42 @@
newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
}
- eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
-
+ MapEvent<String, byte[]> event;
if (newValue != null) {
- mapEntries.put(key, newValue);
+ entries().put(key, newValue);
+ if (!valueIsNull(newValue)) {
+ if (!valueIsNull(previousValue)) {
+ event = new MapEvent<>(
+ MapEvent.Type.UPDATE,
+ "",
+ key,
+ toVersioned(newValue),
+ toVersioned(previousValue));
+ } else {
+ event = new MapEvent<>(
+ MapEvent.Type.INSERT,
+ "",
+ key,
+ toVersioned(newValue),
+ null);
+ }
+ } else {
+ event = new MapEvent<>(
+ MapEvent.Type.REMOVE,
+ "",
+ key,
+ null,
+ toVersioned(previousValue));
+ }
+ } else {
+ event = new MapEvent<>(
+ MapEvent.Type.REMOVE,
+ "",
+ key,
+ null,
+ toVersioned(previousValue));
}
+ eventsToPublish.add(event);
}
publish(eventsToPublish);
return CommitResult.OK;
@@ -557,7 +829,7 @@
*/
private void discardTombstones() {
if (activeTransactions.isEmpty()) {
- Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
while (iterator.hasNext()) {
MapEntryValue value = iterator.next().getValue();
if (value.type() == MapEntryValue.Type.TOMBSTONE) {
@@ -568,7 +840,7 @@
long lowWaterMark = activeTransactions.values().stream()
.mapToLong(TransactionScope::version)
.min().getAsLong();
- Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
while (iterator.hasNext()) {
MapEntryValue value = iterator.next().getValue();
if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
@@ -579,46 +851,32 @@
}
/**
- * Computes the update status that would result if the specified update were to applied to
- * the state machine.
- *
- * @param update update
- * @return status
- */
- private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
- MapEntryValue existingValue = mapEntries.get(update.key());
- boolean isEmpty = existingValue == null || existingValue.type() == MapEntryValue.Type.TOMBSTONE;
- if (isEmpty && update.value() == null) {
- return MapEntryUpdateResult.Status.NOOP;
- }
- if (preparedKeys.contains(update.key())) {
- return MapEntryUpdateResult.Status.WRITE_LOCK;
- }
- byte[] existingRawValue = isEmpty ? null : existingValue.value();
- Long existingVersion = isEmpty ? null : existingValue.version();
- return update.valueMatch().matches(existingRawValue)
- && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
- : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
- }
-
- /**
* Utility for turning a {@code MapEntryValue} to {@code Versioned}.
* @param value map entry value
* @return versioned instance
*/
- private Versioned<byte[]> toVersioned(MapEntryValue value) {
+ protected Versioned<byte[]> toVersioned(MapEntryValue value) {
return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
? new Versioned<>(value.value(), value.version()) : null;
}
/**
+ * Publishes an event to listeners.
+ *
+ * @param event event to publish
+ */
+ private void publish(MapEvent<String, byte[]> event) {
+ publish(Lists.newArrayList(event));
+ }
+
+ /**
* Publishes events to listeners.
*
* @param events list of map event to publish
*/
private void publish(List<MapEvent<String, byte[]>> events) {
listeners.values().forEach(session -> {
- session.publish(CHANGE, SERIALIZER::encode, events);
+ session.publish(CHANGE, serializer()::encode, events);
});
}
@@ -639,7 +897,7 @@
/**
* Interface implemented by map values.
*/
- private static class MapEntryValue {
+ protected static class MapEntryValue {
protected final Type type;
protected final long version;
protected final byte[] value;
@@ -689,7 +947,7 @@
/**
* Map transaction scope.
*/
- private static final class TransactionScope {
+ protected static final class TransactionScope {
private final long version;
private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;