Refactor AtomixConsistentMap to use separate operations per method call for better performance and control over operation semantics.

Change-Id: I948c5c73d4ab38c9c2b20f8c80ba01548f95dda6
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index 6d8c1b0..fe5a2a9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -22,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Throwables;
@@ -42,12 +44,18 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.Serializer;
@@ -69,14 +77,19 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
-import static org.onosproject.store.service.MapEvent.Type.INSERT;
-import static org.onosproject.store.service.MapEvent.Type.REMOVE;
-import static org.onosproject.store.service.MapEvent.Type.UPDATE;
 
 /**
  * State Machine for {@link AtomixConsistentMap} resource.
@@ -87,7 +100,7 @@
             .register(KryoNamespaces.BASIC)
             .register(AtomixConsistentMapOperations.NAMESPACE)
             .register(AtomixConsistentMapEvents.NAMESPACE)
-            .nextId(KryoNamespace.FLOATING_ID)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
             .register(TransactionScope.class)
             .register(TransactionLog.class)
             .register(TransactionId.class)
@@ -96,56 +109,80 @@
             .register(new HashMap().keySet().getClass())
             .build());
 
-    private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
-    private Map<String, MapEntryValue> mapEntries = new HashMap<>();
-    private Set<String> preparedKeys = Sets.newHashSet();
-    private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
-    private long currentVersion;
+    protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
+    private Map<String, MapEntryValue> map;
+    protected Set<String> preparedKeys = Sets.newHashSet();
+    protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+    protected long currentVersion;
+
+    public AtomixConsistentMapService() {
+        map = createMap();
+    }
+
+    protected Map<String, MapEntryValue> createMap() {
+        return Maps.newHashMap();
+    }
+
+    protected Map<String, MapEntryValue> entries() {
+        return map;
+    }
+
+    protected Serializer serializer() {
+        return SERIALIZER;
+    }
 
     @Override
     public void snapshot(SnapshotWriter writer) {
-        writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
-        writer.writeObject(preparedKeys, SERIALIZER::encode);
-        writer.writeObject(mapEntries, SERIALIZER::encode);
-        writer.writeObject(activeTransactions, SERIALIZER::encode);
+        writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
+        writer.writeObject(preparedKeys, serializer()::encode);
+        writer.writeObject(entries(), serializer()::encode);
+        writer.writeObject(activeTransactions, serializer()::encode);
         writer.writeLong(currentVersion);
     }
 
     @Override
     public void install(SnapshotReader reader) {
         listeners = new LinkedHashMap<>();
-        for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
+        for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
             listeners.put(sessionId, getSessions().getSession(sessionId));
         }
-        preparedKeys = reader.readObject(SERIALIZER::decode);
-        mapEntries = reader.readObject(SERIALIZER::decode);
-        activeTransactions = reader.readObject(SERIALIZER::decode);
+        preparedKeys = reader.readObject(serializer()::decode);
+        map = reader.readObject(serializer()::decode);
+        activeTransactions = reader.readObject(serializer()::decode);
         currentVersion = reader.readLong();
     }
 
     @Override
     protected void configure(RaftServiceExecutor executor) {
         // Listeners
-        executor.register(ADD_LISTENER, this::listen);
-        executor.register(REMOVE_LISTENER, this::unlisten);
+        executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
+        executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
         // Queries
-        executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
-        executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
-        executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
-        executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
-        executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
-        executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
-        executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
-        executor.register(SIZE, this::size, SERIALIZER::encode);
-        executor.register(VALUES, this::values, SERIALIZER::encode);
+        executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
+        executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
+        executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
+        executor.register(GET, serializer()::decode, this::get, serializer()::encode);
+        executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
+        executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
+        executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
+        executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
+        executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
         // Commands
-        executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
-        executor.register(CLEAR, this::clear, SERIALIZER::encode);
-        executor.register(BEGIN, SERIALIZER::decode, this::begin, SERIALIZER::encode);
-        executor.register(PREPARE, SERIALIZER::decode, this::prepare, SERIALIZER::encode);
-        executor.register(PREPARE_AND_COMMIT, SERIALIZER::decode, this::prepareAndCommit, SERIALIZER::encode);
-        executor.register(COMMIT, SERIALIZER::decode, this::commit, SERIALIZER::encode);
-        executor.register(ROLLBACK, SERIALIZER::decode, this::rollback, SERIALIZER::encode);
+        executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
+        executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
+        executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
+        executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
+        executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
+        executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
+        executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
+        executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
+        executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
+        executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
+        executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
+        executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
+        executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
+        executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
+        executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
     }
 
     /**
@@ -155,7 +192,7 @@
      * @return {@code true} if map contains key
      */
     protected boolean containsKey(Commit<? extends ContainsKey> commit) {
-        MapEntryValue value = mapEntries.get(commit.value().key());
+        MapEntryValue value = entries().get(commit.value().key());
         return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
     }
 
@@ -167,7 +204,7 @@
      */
     protected boolean containsValue(Commit<? extends ContainsValue> commit) {
         Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
-        return mapEntries.values().stream()
+        return entries().values().stream()
                 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
                 .anyMatch(value -> valueMatch.matches(value.value()));
     }
@@ -179,7 +216,7 @@
      * @return value mapped to key
      */
     protected Versioned<byte[]> get(Commit<? extends Get> commit) {
-        return toVersioned(mapEntries.get(commit.value().key()));
+        return toVersioned(entries().get(commit.value().key()));
     }
 
     /**
@@ -189,7 +226,7 @@
      * @return value mapped to key
      */
     protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
-        MapEntryValue value = mapEntries.get(commit.value().key());
+        MapEntryValue value = entries().get(commit.value().key());
         if (value == null) {
             return new Versioned<>(commit.value().defaultValue(), 0);
         } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
@@ -200,13 +237,12 @@
     }
 
     /**
-     * Handles a count commit.
+     * Handles a size commit.
      *
-     * @param commit size commit
      * @return number of entries in map
      */
-    protected int size(Commit<Void> commit) {
-        return (int) mapEntries.values().stream()
+    protected int size() {
+        return (int) entries().values().stream()
                 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
                 .count();
     }
@@ -214,22 +250,20 @@
     /**
      * Handles an is empty commit.
      *
-     * @param commit isEmpty commit
      * @return {@code true} if map is empty
      */
-    protected boolean isEmpty(Commit<Void> commit) {
-        return mapEntries.values().stream()
+    protected boolean isEmpty() {
+        return entries().values().stream()
                 .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
     }
 
     /**
      * Handles a keySet commit.
      *
-     * @param commit keySet commit
      * @return set of keys in map
      */
-    protected Set<String> keySet(Commit<Void> commit) {
-        return mapEntries.entrySet().stream()
+    protected Set<String> keySet() {
+        return entries().entrySet().stream()
                 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
                 .map(Map.Entry::getKey)
                 .collect(Collectors.toSet());
@@ -238,11 +272,10 @@
     /**
      * Handles a values commit.
      *
-     * @param commit values commit
      * @return collection of values in map
      */
-    protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
-        return mapEntries.entrySet().stream()
+    protected Collection<Versioned<byte[]>> values() {
+        return entries().entrySet().stream()
                 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
                 .map(entry -> toVersioned(entry.getValue()))
                 .collect(Collectors.toList());
@@ -251,103 +284,311 @@
     /**
      * Handles a entry set commit.
      *
-     * @param commit entrySet commit
      * @return set of map entries
      */
-    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
-        return mapEntries.entrySet().stream()
+    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
+        return entries().entrySet().stream()
                 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
                 .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
                 .collect(Collectors.toSet());
     }
 
     /**
-     * Handles a update and get commit.
+     * Returns a boolean indicating whether the given MapEntryValues are equal.
      *
-     * @param commit updateAndGet commit
-     * @return update result
+     * @param oldValue the first value to compare
+     * @param newValue the second value to compare
+     * @return indicates whether the two values are equal
      */
-    protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
-        try {
-            MapEntryUpdateResult.Status updateStatus = validate(commit.value());
-            String key = commit.value().key();
-            MapEntryValue oldCommitValue = mapEntries.get(commit.value().key());
-            Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
+    protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
+        return (oldValue == null && newValue == null)
+                || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
+    }
 
-            if (updateStatus != MapEntryUpdateResult.Status.OK) {
-                return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, oldMapValue);
-            }
+    /**
+     * Returns a boolean indicating whether the given entry values are equal.
+     *
+     * @param oldValue the first value to compare
+     * @param newValue the second value to compare
+     * @return indicates whether the two values are equal
+     */
+    protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
+        return (oldValue == null && newValue == null)
+                || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
+    }
 
-            byte[] newValue = commit.value().value();
-            currentVersion = commit.index();
-            Versioned<byte[]> newMapValue = newValue == null ? null
-                    : new Versioned<>(newValue, currentVersion);
+    /**
+     * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
+     *
+     * @param value the value to check
+     * @return indicates whether the given value is null or is a tombstone
+     */
+    protected boolean valueIsNull(MapEntryValue value) {
+        return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
+    }
 
-            MapEvent.Type updateType = newValue == null ? REMOVE
-                    : oldCommitValue == null ? INSERT : UPDATE;
-
-            // If a value existed in the map, remove and discard the value to ensure disk can be freed.
-            if (updateType == REMOVE || updateType == UPDATE) {
-                mapEntries.remove(key);
-            }
-
-            // If this is an insert/update commit, add the commit to the map entries.
-            if (updateType == INSERT || updateType == UPDATE) {
-                mapEntries.put(key, new MapEntryValue(
-                        MapEntryValue.Type.VALUE,
+    /**
+     * Handles a put commit.
+     *
+     * @param commit put commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
+        String key = commit.value().key();
+        MapEntryValue oldValue = entries().get(key);
+        MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+        if (valueIsNull(oldValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
                         commit.index(),
-                        commit.value().value()));
-            } else if (!activeTransactions.isEmpty()) {
-                // If this is a delete but transactions are currently running, ensure tombstones are retained
-                // for version checks.
-                mapEntries.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, commit.index(), null));
+                        key,
+                        toVersioned(oldValue));
             }
-
-            publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
-            return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, newMapValue);
-        } catch (Exception e) {
-            getLogger().error("State machine operation failed", e);
-            throw Throwables.propagate(e);
+            entries().put(commit.value().key(),
+                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+            Versioned<byte[]> result = toVersioned(oldValue);
+            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+        } else if (!valuesEqual(oldValue, newValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
+                        commit.index(),
+                        key,
+                        toVersioned(oldValue));
+            }
+            entries().put(commit.value().key(),
+                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+            Versioned<byte[]> result = toVersioned(oldValue);
+            publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
         }
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+    }
+
+    /**
+     * Handles a putIfAbsent commit.
+     *
+     * @param commit putIfAbsent commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
+        String key = commit.value().key();
+        MapEntryValue oldValue = entries().get(key);
+        if (valueIsNull(oldValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
+                        commit.index(),
+                        key,
+                        toVersioned(oldValue));
+            }
+            MapEntryValue newValue = new MapEntryValue(
+                    MapEntryValue.Type.VALUE,
+                    commit.index(),
+                    commit.value().value());
+            entries().put(commit.value().key(),
+                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+            Versioned<byte[]> result = toVersioned(newValue);
+            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
+        }
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+    }
+
+    /**
+     * Handles a putAndGet commit.
+     *
+     * @param commit putAndGet commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
+        String key = commit.value().key();
+        MapEntryValue oldValue = entries().get(key);
+        MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+        if (valueIsNull(oldValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
+                        commit.index(),
+                        key,
+                        toVersioned(oldValue));
+            }
+            entries().put(commit.value().key(), newValue);
+            Versioned<byte[]> result = toVersioned(newValue);
+            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+        } else if (!valuesEqual(oldValue, newValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
+                        commit.index(),
+                        key,
+                        toVersioned(oldValue));
+            }
+            entries().put(commit.value().key(), newValue);
+            Versioned<byte[]> result = toVersioned(newValue);
+            publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+        }
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+    }
+
+    /**
+     * Handles a remove commit.
+     *
+     * @param index the commit index
+     * @param key the key to remove
+     * @param predicate predicate to determine whether to remove the entry
+     * @return map entry update result
+     */
+    private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
+        MapEntryValue value = entries().get(key);
+        if (value == null || !predicate.test(value)) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+        }
+        entries().remove(key);
+        if (!activeTransactions.isEmpty()) {
+            entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+        }
+        Versioned<byte[]> result = toVersioned(value);
+        publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)));
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
+    }
+
+    /**
+     * Handles a remove commit.
+     *
+     * @param commit remove commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
+        return removeIf(commit.index(), commit.value().key(), v -> true);
+    }
+
+    /**
+     * Handles a removeValue commit.
+     *
+     * @param commit removeValue commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
+        return removeIf(commit.index(), commit.value().key(), v ->
+                valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
+    }
+
+    /**
+     * Handles a removeVersion commit.
+     *
+     * @param commit removeVersion commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
+        return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
+    }
+
+    /**
+     * Handles a replace commit.
+     *
+     * @param index the commit index
+     * @param key the key to replace
+     * @param newValue the value with which to replace the key
+     * @param predicate a predicate to determine whether to replace the key
+     * @return map entry update result
+     */
+    private MapEntryUpdateResult<String, byte[]> replaceIf(
+            long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
+        MapEntryValue oldValue = entries().get(key);
+        if (oldValue == null) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+        }
+        if (!predicate.test(oldValue)) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue));
+        }
+        entries().put(key, newValue);
+        Versioned<byte[]> result = toVersioned(oldValue);
+        publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)));
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
+    }
+
+    /**
+     * Handles a replace commit.
+     *
+     * @param commit replace commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
+        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+        return replaceIf(commit.index(), commit.value().key(), value, v -> true);
+    }
+
+    /**
+     * Handles a replaceValue commit.
+     *
+     * @param commit replaceValue commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
+        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
+        return replaceIf(commit.index(), commit.value().key(), value,
+                v -> valuesEqual(v.value(), commit.value().oldValue()));
+    }
+
+    /**
+     * Handles a replaceVersion commit.
+     *
+     * @param commit replaceVersion commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
+        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
+        return replaceIf(commit.index(), commit.value().key(), value,
+                v -> v.version() == commit.value().oldVersion());
     }
 
     /**
      * Handles a clear commit.
      *
-     * @param commit clear commit
      * @return clear result
      */
-    protected MapEntryUpdateResult.Status clear(Commit<Void> commit) {
-        Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
-                .entrySet().iterator();
+    protected MapEntryUpdateResult.Status clear() {
+        Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
+        Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
         while (iterator.hasNext()) {
             Map.Entry<String, MapEntryValue> entry = iterator.next();
             String key = entry.getKey();
             MapEntryValue value = entry.getValue();
-            Versioned<byte[]> removedValue = new Versioned<>(value.value(),
-                    value.version());
-            publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
-            iterator.remove();
+            if (!valueIsNull(value)) {
+                Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
+                publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
+                if (activeTransactions.isEmpty()) {
+                    iterator.remove();
+                } else {
+                    entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+                }
+            }
         }
+        entries().putAll(entriesToAdd);
         return MapEntryUpdateResult.Status.OK;
     }
 
     /**
      * Handles a listen commit.
      *
-     * @param commit listen commit
+     * @param session listen session
      */
-    protected void listen(Commit<Void> commit) {
-        listeners.put(commit.session().sessionId().id(), commit.session());
+    protected void listen(RaftSession session) {
+        listeners.put(session.sessionId().id(), session);
     }
 
     /**
      * Handles an unlisten commit.
      *
-     * @param commit unlisten commit
+     * @param session unlisten session
      */
-    protected void unlisten(Commit<Void> commit) {
-        listeners.remove(commit.session().sessionId().id());
+    protected void unlisten(RaftSession session) {
+        listeners.remove(session.sessionId().id());
     }
 
     /**
@@ -412,7 +653,7 @@
                 }
 
                 // Read the existing value from the map.
-                MapEntryValue existingValue = mapEntries.get(key);
+                MapEntryValue existingValue = entries().get(key);
 
                 // Note: if the existing value is null, that means the key has not changed during the transaction,
                 // otherwise a tombstone would have been retained.
@@ -501,7 +742,7 @@
                 continue;
             }
 
-            MapEntryValue previousValue = mapEntries.remove(key);
+            MapEntryValue previousValue = entries().remove(key);
             MapEntryValue newValue = null;
 
             // If the record is not a delete, create a transactional commit.
@@ -512,11 +753,42 @@
                 newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
             }
 
-            eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
-
+            MapEvent<String, byte[]> event;
             if (newValue != null) {
-                mapEntries.put(key, newValue);
+                entries().put(key, newValue);
+                if (!valueIsNull(newValue)) {
+                    if (!valueIsNull(previousValue)) {
+                        event = new MapEvent<>(
+                                MapEvent.Type.UPDATE,
+                                "",
+                                key,
+                                toVersioned(newValue),
+                                toVersioned(previousValue));
+                    } else {
+                        event = new MapEvent<>(
+                                MapEvent.Type.INSERT,
+                                "",
+                                key,
+                                toVersioned(newValue),
+                                null);
+                    }
+                } else {
+                    event = new MapEvent<>(
+                            MapEvent.Type.REMOVE,
+                            "",
+                            key,
+                            null,
+                            toVersioned(previousValue));
+                }
+            } else {
+                event = new MapEvent<>(
+                        MapEvent.Type.REMOVE,
+                        "",
+                        key,
+                        null,
+                        toVersioned(previousValue));
             }
+            eventsToPublish.add(event);
         }
         publish(eventsToPublish);
         return CommitResult.OK;
@@ -557,7 +829,7 @@
      */
     private void discardTombstones() {
         if (activeTransactions.isEmpty()) {
-            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
             while (iterator.hasNext()) {
                 MapEntryValue value = iterator.next().getValue();
                 if (value.type() == MapEntryValue.Type.TOMBSTONE) {
@@ -568,7 +840,7 @@
             long lowWaterMark = activeTransactions.values().stream()
                     .mapToLong(TransactionScope::version)
                     .min().getAsLong();
-            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
             while (iterator.hasNext()) {
                 MapEntryValue value = iterator.next().getValue();
                 if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
@@ -579,46 +851,32 @@
     }
 
     /**
-     * Computes the update status that would result if the specified update were to applied to
-     * the state machine.
-     *
-     * @param update update
-     * @return status
-     */
-    private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
-        MapEntryValue existingValue = mapEntries.get(update.key());
-        boolean isEmpty = existingValue == null || existingValue.type() == MapEntryValue.Type.TOMBSTONE;
-        if (isEmpty && update.value() == null) {
-            return MapEntryUpdateResult.Status.NOOP;
-        }
-        if (preparedKeys.contains(update.key())) {
-            return MapEntryUpdateResult.Status.WRITE_LOCK;
-        }
-        byte[] existingRawValue = isEmpty ? null : existingValue.value();
-        Long existingVersion = isEmpty ? null : existingValue.version();
-        return update.valueMatch().matches(existingRawValue)
-                && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
-                : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
-    }
-
-    /**
      * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
      * @param value map entry value
      * @return versioned instance
      */
-    private Versioned<byte[]> toVersioned(MapEntryValue value) {
+    protected Versioned<byte[]> toVersioned(MapEntryValue value) {
         return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
                 ? new Versioned<>(value.value(), value.version()) : null;
     }
 
     /**
+     * Publishes an event to listeners.
+     *
+     * @param event event to publish
+     */
+    private void publish(MapEvent<String, byte[]> event) {
+        publish(Lists.newArrayList(event));
+    }
+
+    /**
      * Publishes events to listeners.
      *
      * @param events list of map event to publish
      */
     private void publish(List<MapEvent<String, byte[]>> events) {
         listeners.values().forEach(session -> {
-            session.publish(CHANGE, SERIALIZER::encode, events);
+            session.publish(CHANGE, serializer()::encode, events);
         });
     }
 
@@ -639,7 +897,7 @@
     /**
      * Interface implemented by map values.
      */
-    private static class MapEntryValue {
+    protected static class MapEntryValue {
         protected final Type type;
         protected final long version;
         protected final byte[] value;
@@ -689,7 +947,7 @@
     /**
      * Map transaction scope.
      */
-    private static final class TransactionScope {
+    protected static final class TransactionScope {
         private final long version;
         private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;