Refactor AtomixConsistentMap to use separate operations per method call for better performance and control over operation semantics.

Change-Id: I948c5c73d4ab38c9c2b20f8c80ba01548f95dda6
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEvent.java b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
index 818e749..2e8aa61 100644
--- a/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
@@ -62,12 +62,25 @@
      * @param previousValue value that was replaced
      */
     public MapEvent(String name, K key, Versioned<V> currentValue, Versioned<V> previousValue) {
+        this(currentValue != null ? previousValue != null ? Type.UPDATE : Type.INSERT : Type.REMOVE,
+                name, key, currentValue, previousValue);
+    }
+
+    /**
+     * Creates a new event object.
+     *
+     * @param type event type
+     * @param name map name
+     * @param key key the event concerns
+     * @param currentValue new value key is mapped to
+     * @param previousValue value that was replaced
+     */
+    public MapEvent(Type type, String name, K key, Versioned<V> currentValue, Versioned<V> previousValue) {
+        this.type = type;
         this.name = name;
         this.key = key;
         this.newValue = currentValue;
         this.oldValue = previousValue;
-        this.type = currentValue != null ?
-                    previousValue != null ? Type.UPDATE : Type.INSERT : Type.REMOVE;
     }
 
     /**
diff --git a/core/api/src/test/java/org/onosproject/store/service/MapEventTest.java b/core/api/src/test/java/org/onosproject/store/service/MapEventTest.java
index 698498e..10311a9 100644
--- a/core/api/src/test/java/org/onosproject/store/service/MapEventTest.java
+++ b/core/api/src/test/java/org/onosproject/store/service/MapEventTest.java
@@ -30,11 +30,14 @@
     private final Versioned<Integer> vStatsNew = new Versioned<>(2, 2);
     private final Versioned<Integer> vStatsOld = new Versioned<>(1, 1);
 
-    private final MapEvent<String, Integer> stats1 = new MapEvent<>("a", "1", vStatsNew, null);
+    private final MapEvent<String, Integer> stats1 =
+            new MapEvent<>(MapEvent.Type.INSERT, "a", "1", vStatsNew, null);
 
-    private final MapEvent<String, Integer> stats2 = new MapEvent<>("a", "1", null, vStatsOld);
+    private final MapEvent<String, Integer> stats2 =
+            new MapEvent<>(MapEvent.Type.REMOVE, "a", "1", null, vStatsOld);
 
-    private final MapEvent<String, Integer> stats3 = new MapEvent<>("a", "1", vStatsNew, vStatsOld);
+    private final MapEvent<String, Integer> stats3 =
+            new MapEvent<>(MapEvent.Type.UPDATE, "a", "1", vStatsNew, vStatsOld);
 
     /**
      * Tests the creation of the MapEvent object.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
index e0e3ad7..d6e5e91 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -79,7 +79,6 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations;
 import org.onosproject.store.primitives.resources.impl.AtomixCounterOperations;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents;
@@ -165,7 +164,6 @@
             .register(AtomixConsistentMapOperations.class)
             .register(AtomixConsistentSetMultimapOperations.class)
             .register(AtomixConsistentSetMultimapEvents.class)
-            .register(AtomixConsistentTreeMapEvents.class)
             .register(AtomixConsistentTreeMapOperations.class)
             .register(AtomixCounterOperations.class)
             .register(AtomixDocumentTreeEvents.class)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 52cf210..03e2f6d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -338,7 +338,9 @@
 
         @Override
         public void event(MapEvent<K2, V2> event) {
-            listener.event(new MapEvent<K1, V1>(event.name(),
+            listener.event(new MapEvent<K1, V1>(
+                    event.type(),
+                    event.name(),
                     keyDecoder.apply(event.key()),
                     event.newValue() != null ? event.newValue().map(valueDecoder) : null,
                     event.oldValue() != null ? event.oldValue().map(valueDecoder) : null));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index 2f0683e..2db8382 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -400,6 +400,7 @@
         @Override
         public void event(MapEvent<String, V2> event) {
             listener.event(new MapEvent<String, V1>(
+                    event.type(),
                     event.name(),
                     event.key(),
                     event.newValue() != null ?
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index f3a1ea2..5b59fb2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -24,29 +24,31 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
 
 import io.atomix.protocols.raft.proxy.RaftProxy;
 import org.onlab.util.KryoNamespace;
-import org.onlab.util.Match;
-import org.onlab.util.Tools;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
@@ -68,10 +70,18 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
 
 /**
@@ -82,6 +92,7 @@
             .register(KryoNamespaces.BASIC)
             .register(AtomixConsistentMapOperations.NAMESPACE)
             .register(AtomixConsistentMapEvents.NAMESPACE)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
             .build());
 
     private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
@@ -96,6 +107,10 @@
         });
     }
 
+    protected Serializer serializer() {
+        return SERIALIZER;
+    }
+
     private void handleEvent(List<MapEvent<String, byte[]>> events) {
         events.forEach(event ->
                 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
@@ -103,164 +118,164 @@
 
     @Override
     public CompletableFuture<Boolean> isEmpty() {
-        return proxy.invoke(IS_EMPTY, SERIALIZER::decode);
+        return proxy.invoke(IS_EMPTY, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Integer> size() {
-        return proxy.invoke(SIZE, SERIALIZER::decode);
+        return proxy.invoke(SIZE, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(String key) {
-        return proxy.invoke(CONTAINS_KEY, SERIALIZER::encode, new ContainsKey(key), SERIALIZER::decode);
+        return proxy.invoke(CONTAINS_KEY, serializer()::encode, new ContainsKey(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(byte[] value) {
-        return proxy.invoke(CONTAINS_VALUE, SERIALIZER::encode, new ContainsValue(value), SERIALIZER::decode);
+        return proxy.invoke(CONTAINS_VALUE, serializer()::encode, new ContainsValue(value), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> get(String key) {
-        return proxy.invoke(GET, SERIALIZER::encode, new Get(key), SERIALIZER::decode);
+        return proxy.invoke(GET, serializer()::encode, new Get(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
         return proxy.invoke(
                 GET_OR_DEFAULT,
-                SERIALIZER::encode,
+                serializer()::encode,
                 new GetOrDefault(key, defaultValue),
-                SERIALIZER::decode);
+                serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Set<String>> keySet() {
-        return proxy.invoke(KEY_SET, SERIALIZER::decode);
+        return proxy.invoke(KEY_SET, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Collection<Versioned<byte[]>>> values() {
-        return proxy.invoke(VALUES, SERIALIZER::decode);
+        return proxy.invoke(VALUES, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
-        return proxy.invoke(ENTRY_SET, SERIALIZER::decode);
+        return proxy.invoke(ENTRY_SET, serializer()::decode);
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, value, Match.ANY, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.oldValue());
+        return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
+                PUT,
+                serializer()::encode,
+                new Put(key, value),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
+                .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, value, Match.ANY, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.newValue());
+        return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
+                PUT_AND_GET,
+                serializer()::encode,
+                new Put(key, value),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
+                .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, value, Match.NULL, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.oldValue());
+        return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
+                PUT_IF_ABSENT,
+                serializer()::encode,
+                new Put(key, value),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
+                .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> remove(String key) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, null, Match.ANY, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.oldValue());
+        return proxy.<Remove, MapEntryUpdateResult<String, byte[]>>invoke(
+                REMOVE,
+                serializer()::encode,
+                new Remove(key),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
+                .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
+        return proxy.<RemoveValue, MapEntryUpdateResult<String, byte[]>>invoke(
+                REMOVE_VALUE,
+                serializer()::encode,
+                new RemoveValue(key, value),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> v.updated());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, long version) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
+        return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
+                REMOVE_VERSION,
+                serializer()::encode,
+                new RemoveVersion(key, version),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> v.updated());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.oldValue());
+        return proxy.<Replace, MapEntryUpdateResult<String, byte[]>>invoke(
+                REPLACE,
+                serializer()::encode,
+                new Replace(key, value),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
+                .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
+        return proxy.<ReplaceValue, MapEntryUpdateResult<String, byte[]>>invoke(
+                REPLACE_VALUE,
+                serializer()::encode,
+                new ReplaceValue(key, oldValue, newValue),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> v.updated());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
+        return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
+                REPLACE_VERSION,
+                serializer()::encode,
+                new ReplaceVersion(key, oldVersion, newValue),
+                serializer()::decode)
+                .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> v.updated());
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, SERIALIZER::decode)
+        return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, serializer()::decode)
                 .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> null);
     }
@@ -277,37 +292,39 @@
                 return CompletableFuture.completedFuture(r1);
             }
 
-            AtomicReference<byte[]> computedValue = new AtomicReference<>();
-            // if remappingFunction throws an exception, return the exception.
+            byte[] computedValue;
             try {
-                computedValue.set(remappingFunction.apply(key, existingValue));
+                computedValue = remappingFunction.apply(key, existingValue);
             } catch (Exception e) {
                 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
                 future.completeExceptionally(e);
                 return future;
             }
-            if (computedValue.get() == null && r1 == null) {
+
+            if (computedValue == null && r1 == null) {
                 return CompletableFuture.completedFuture(null);
             }
-            Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
-            Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
-            return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                    UPDATE_AND_GET,
-                    SERIALIZER::encode,
-                    new UpdateAndGet(key,
-                            computedValue.get(),
-                            valueMatch,
-                            versionMatch),
-                    SERIALIZER::decode)
-                    .whenComplete((r, e) -> throwIfLocked(r.status()))
-                    .thenCompose(r -> {
-                        if (r.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
-                                r.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
-                            return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
-                        }
-                        return CompletableFuture.completedFuture(r);
-                    })
-                    .thenApply(v -> v.newValue());
+
+            if (r1 == null) {
+                return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
+                        PUT_IF_ABSENT,
+                        serializer()::encode,
+                        new Put(key, computedValue),
+                        serializer()::decode)
+                        .whenComplete((r, e) -> throwIfLocked(r))
+                        .thenApply(result -> new Versioned<>(computedValue, result.version()));
+            } else if (computedValue == null) {
+                return remove(key, r1.version()).thenApply(v -> null);
+            } else {
+                return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
+                        REPLACE_VERSION,
+                        serializer()::encode,
+                        new ReplaceVersion(key, r1.version(), computedValue),
+                        serializer()::decode)
+                        .whenComplete((r, e) -> throwIfLocked(r))
+                        .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
+                                ? new Versioned(computedValue, result.version()) : result.result());
+            }
         });
     }
 
@@ -330,6 +347,12 @@
         return CompletableFuture.completedFuture(null);
     }
 
+    private void throwIfLocked(MapEntryUpdateResult<String, byte[]> result) {
+        if (result != null) {
+            throwIfLocked(result.status());
+        }
+    }
+
     private void throwIfLocked(MapEntryUpdateResult.Status status) {
         if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
             throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
@@ -340,9 +363,9 @@
     public CompletableFuture<Version> begin(TransactionId transactionId) {
         return proxy.<TransactionBegin, Long>invoke(
                 BEGIN,
-                SERIALIZER::encode,
+                serializer()::encode,
                 new TransactionBegin(transactionId),
-                SERIALIZER::decode)
+                serializer()::decode)
                 .thenApply(Version::new);
     }
 
@@ -350,9 +373,9 @@
     public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
         return proxy.<TransactionPrepare, PrepareResult>invoke(
                 PREPARE,
-                SERIALIZER::encode,
+                serializer()::encode,
                 new TransactionPrepare(transactionLog),
-                SERIALIZER::decode)
+                serializer()::decode)
                 .thenApply(v -> v == PrepareResult.OK);
     }
 
@@ -360,9 +383,9 @@
     public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
         return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
                 PREPARE_AND_COMMIT,
-                SERIALIZER::encode,
+                serializer()::encode,
                 new TransactionPrepareAndCommit(transactionLog),
-                SERIALIZER::decode)
+                serializer()::decode)
                 .thenApply(v -> v == PrepareResult.OK);
     }
 
@@ -370,9 +393,9 @@
     public CompletableFuture<Void> commit(TransactionId transactionId) {
         return proxy.<TransactionCommit, CommitResult>invoke(
                 COMMIT,
-                SERIALIZER::encode,
+                serializer()::encode,
                 new TransactionCommit(transactionId),
-                SERIALIZER::decode)
+                serializer()::decode)
                 .thenApply(v -> null);
     }
 
@@ -380,9 +403,9 @@
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
         return proxy.invoke(
                 ROLLBACK,
-                SERIALIZER::encode,
+                serializer()::encode,
                 new TransactionRollback(transactionId),
-                SERIALIZER::decode)
+                serializer()::decode)
                 .thenApply(v -> null);
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
index 2d81787..fb6ee9f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
@@ -15,9 +15,9 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import com.google.common.base.MoreObjects;
 import io.atomix.protocols.raft.operation.OperationId;
 import io.atomix.protocols.raft.operation.OperationType;
+import io.atomix.utils.ArraySizeHashPrinter;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Match;
 import org.onosproject.store.primitives.MapUpdate;
@@ -26,6 +26,7 @@
 import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -41,7 +42,15 @@
     KEY_SET("keySet", OperationType.QUERY),
     VALUES("values", OperationType.QUERY),
     ENTRY_SET("entrySet", OperationType.QUERY),
-    UPDATE_AND_GET("updateAndGet", OperationType.COMMAND),
+    PUT("put", OperationType.COMMAND),
+    PUT_IF_ABSENT("putIfAbsent", OperationType.COMMAND),
+    PUT_AND_GET("putAndGet", OperationType.COMMAND),
+    REMOVE("remove", OperationType.COMMAND),
+    REMOVE_VALUE("removeValue", OperationType.COMMAND),
+    REMOVE_VERSION("removeVersion", OperationType.COMMAND),
+    REPLACE("replace", OperationType.COMMAND),
+    REPLACE_VALUE("replaceValue", OperationType.COMMAND),
+    REPLACE_VERSION("replaceVersion", OperationType.COMMAND),
     CLEAR("clear", OperationType.COMMAND),
     ADD_LISTENER("addListener", OperationType.COMMAND),
     REMOVE_LISTENER("removeListener", OperationType.COMMAND),
@@ -76,7 +85,13 @@
             .register(ContainsValue.class)
             .register(Get.class)
             .register(GetOrDefault.class)
-            .register(UpdateAndGet.class)
+            .register(Put.class)
+            .register(Remove.class)
+            .register(RemoveValue.class)
+            .register(RemoveVersion.class)
+            .register(Replace.class)
+            .register(ReplaceValue.class)
+            .register(ReplaceVersion.class)
             .register(TransactionBegin.class)
             .register(TransactionPrepare.class)
             .register(TransactionPrepareAndCommit.class)
@@ -103,7 +118,7 @@
     public abstract static class MapOperation {
         @Override
         public String toString() {
-            return MoreObjects.toStringHelper(getClass())
+            return toStringHelper(getClass())
                     .toString();
         }
     }
@@ -132,7 +147,7 @@
 
         @Override
         public String toString() {
-            return MoreObjects.toStringHelper(getClass())
+            return toStringHelper(getClass())
                     .add("key", key)
                     .toString();
         }
@@ -162,13 +177,77 @@
 
         @Override
         public String toString() {
-            return MoreObjects.toStringHelper(getClass())
+            return toStringHelper(getClass())
                     .add("value", value)
                     .toString();
         }
     }
 
     /**
+     * Abstract key/value operation.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class KeyValueOperation extends KeyOperation {
+        protected byte[] value;
+
+        public KeyValueOperation() {
+        }
+
+        public KeyValueOperation(String key, byte[] value) {
+            super(key);
+            this.value = value;
+        }
+
+        /**
+         * Returns the value.
+         * @return value
+         */
+        public byte[] value() {
+            return value;
+        }
+
+        @Override
+        public String toString() {
+            return toStringHelper(getClass())
+                    .add("key", key)
+                    .add("value", ArraySizeHashPrinter.of(value))
+                    .toString();
+        }
+    }
+
+    /**
+     * Abstract key/version operation.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class KeyVersionOperation extends KeyOperation {
+        protected long version;
+
+        public KeyVersionOperation() {
+        }
+
+        public KeyVersionOperation(String key, long version) {
+            super(key);
+            this.version = version;
+        }
+
+        /**
+         * Returns the version.
+         * @return version
+         */
+        public long version() {
+            return version;
+        }
+
+        @Override
+        public String toString() {
+            return toStringHelper(getClass())
+                    .add("key", key)
+                    .add("version", version)
+                    .toString();
+        }
+    }
+
+    /**
      * Contains key command.
      */
     @SuppressWarnings("serial")
@@ -195,6 +274,134 @@
     }
 
     /**
+     * Map put operation.
+     */
+    public static class Put extends KeyValueOperation {
+        public Put() {
+        }
+
+        public Put(String key, byte[] value) {
+            super(key, value);
+        }
+    }
+
+    /**
+     * Remove operation.
+     */
+    public static class Remove extends KeyOperation {
+        public Remove() {
+        }
+
+        public Remove(String key) {
+            super(key);
+        }
+    }
+
+    /**
+     * Remove if value match operation.
+     */
+    public static class RemoveValue extends KeyValueOperation {
+        public RemoveValue() {
+        }
+
+        public RemoveValue(String key, byte[] value) {
+            super(key, value);
+        }
+    }
+
+    /**
+     * Remove if version match operation.
+     */
+    public static class RemoveVersion extends KeyVersionOperation {
+        public RemoveVersion() {
+        }
+
+        public RemoveVersion(String key, long version) {
+            super(key, version);
+        }
+    }
+
+    /**
+     * Replace operation.
+     */
+    public static class Replace extends KeyValueOperation {
+        public Replace() {
+        }
+
+        public Replace(String key, byte[] value) {
+            super(key, value);
+        }
+    }
+
+    /**
+     * Replace by value operation.
+     */
+    public static class ReplaceValue extends KeyOperation {
+        private byte[] oldValue;
+        private byte[] newValue;
+
+        public ReplaceValue() {
+        }
+
+        public ReplaceValue(String key, byte[] oldValue, byte[] newValue) {
+            super(key);
+            this.oldValue = oldValue;
+            this.newValue = newValue;
+        }
+
+        public byte[] oldValue() {
+            return oldValue;
+        }
+
+        public byte[] newValue() {
+            return newValue;
+        }
+
+        @Override
+        public String toString() {
+            return toStringHelper(this)
+                    .add("key", key)
+                    .add("oldValue", ArraySizeHashPrinter.of(oldValue))
+                    .add("newValue", ArraySizeHashPrinter.of(newValue))
+                    .toString();
+        }
+    }
+
+    /**
+     * Replace by version operation.
+     */
+    public static class ReplaceVersion extends KeyOperation {
+        private long oldVersion;
+        private byte[] newValue;
+
+        public ReplaceVersion() {
+        }
+
+        public ReplaceVersion(String key, long oldVersion, byte[] newValue) {
+            super(key);
+            this.oldVersion = oldVersion;
+            this.newValue = newValue;
+        }
+
+        public long oldVersion() {
+            return oldVersion;
+        }
+
+        public byte[] newValue() {
+            return newValue;
+        }
+
+        @Override
+        public String toString() {
+            return toStringHelper(this)
+                    .add("key", key)
+                    .add("oldVersion", oldVersion)
+                    .add("newValue", ArraySizeHashPrinter.of(newValue))
+                    .toString();
+        }
+    }
+
+    /**
      * Transaction begin command.
      */
     public static class TransactionBegin extends MapOperation {
@@ -232,7 +439,7 @@
 
         @Override
         public String toString() {
-            return MoreObjects.toStringHelper(getClass())
+            return toStringHelper(getClass())
                     .add("transactionLog", transactionLog)
                     .toString();
         }
@@ -275,7 +482,7 @@
 
         @Override
         public String toString() {
-            return MoreObjects.toStringHelper(getClass())
+            return toStringHelper(getClass())
                     .add("transactionId", transactionId)
                     .toString();
         }
@@ -305,79 +512,13 @@
 
         @Override
         public String toString() {
-            return MoreObjects.toStringHelper(getClass())
+            return toStringHelper(getClass())
                     .add("transactionId", transactionId)
                     .toString();
         }
     }
 
     /**
-     * Map update command.
-     */
-    @SuppressWarnings("serial")
-    public static class UpdateAndGet extends MapOperation {
-        private String key;
-        private byte[] value;
-        private Match<byte[]> valueMatch;
-        private Match<Long> versionMatch;
-
-        public UpdateAndGet() {
-        }
-
-        public UpdateAndGet(String key,
-                        byte[] value,
-                        Match<byte[]> valueMatch,
-                        Match<Long> versionMatch) {
-            this.key = key;
-            this.value = value;
-            this.valueMatch = valueMatch;
-            this.versionMatch = versionMatch;
-        }
-
-        /**
-         * Returns the key.
-         * @return key
-         */
-        public String key() {
-            return this.key;
-        }
-
-        /**
-         * Returns the value.
-         * @return value
-         */
-        public byte[] value() {
-            return this.value;
-        }
-
-        /**
-         * Returns the value match.
-         * @return value match
-         */
-        public Match<byte[]> valueMatch() {
-            return this.valueMatch;
-        }
-
-        /**
-         * Returns the version match.
-         * @return version match
-         */
-        public Match<Long> versionMatch() {
-            return this.versionMatch;
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(getClass())
-                    .add("key", key)
-                    .add("value", value)
-                    .add("valueMatch", valueMatch)
-                    .add("versionMatch", versionMatch)
-                    .toString();
-        }
-    }
-
-    /**
      * Get query.
      */
     @SuppressWarnings("serial")
@@ -413,5 +554,13 @@
         public byte[] defaultValue() {
             return defaultValue;
         }
+
+        @Override
+        public String toString() {
+            return toStringHelper(this)
+                    .add("key", key)
+                    .add("defaultValue", ArraySizeHashPrinter.of(defaultValue))
+                    .toString();
+        }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index 6d8c1b0..fe5a2a9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -22,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Throwables;
@@ -42,12 +44,18 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.Serializer;
@@ -69,14 +77,19 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
-import static org.onosproject.store.service.MapEvent.Type.INSERT;
-import static org.onosproject.store.service.MapEvent.Type.REMOVE;
-import static org.onosproject.store.service.MapEvent.Type.UPDATE;
 
 /**
  * State Machine for {@link AtomixConsistentMap} resource.
@@ -87,7 +100,7 @@
             .register(KryoNamespaces.BASIC)
             .register(AtomixConsistentMapOperations.NAMESPACE)
             .register(AtomixConsistentMapEvents.NAMESPACE)
-            .nextId(KryoNamespace.FLOATING_ID)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
             .register(TransactionScope.class)
             .register(TransactionLog.class)
             .register(TransactionId.class)
@@ -96,56 +109,80 @@
             .register(new HashMap().keySet().getClass())
             .build());
 
-    private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
-    private Map<String, MapEntryValue> mapEntries = new HashMap<>();
-    private Set<String> preparedKeys = Sets.newHashSet();
-    private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
-    private long currentVersion;
+    protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
+    private Map<String, MapEntryValue> map;
+    protected Set<String> preparedKeys = Sets.newHashSet();
+    protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+    protected long currentVersion;
+
+    public AtomixConsistentMapService() {
+        map = createMap();
+    }
+
+    protected Map<String, MapEntryValue> createMap() {
+        return Maps.newHashMap();
+    }
+
+    protected Map<String, MapEntryValue> entries() {
+        return map;
+    }
+
+    protected Serializer serializer() {
+        return SERIALIZER;
+    }
 
     @Override
     public void snapshot(SnapshotWriter writer) {
-        writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
-        writer.writeObject(preparedKeys, SERIALIZER::encode);
-        writer.writeObject(mapEntries, SERIALIZER::encode);
-        writer.writeObject(activeTransactions, SERIALIZER::encode);
+        writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
+        writer.writeObject(preparedKeys, serializer()::encode);
+        writer.writeObject(entries(), serializer()::encode);
+        writer.writeObject(activeTransactions, serializer()::encode);
         writer.writeLong(currentVersion);
     }
 
     @Override
     public void install(SnapshotReader reader) {
         listeners = new LinkedHashMap<>();
-        for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
+        for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
             listeners.put(sessionId, getSessions().getSession(sessionId));
         }
-        preparedKeys = reader.readObject(SERIALIZER::decode);
-        mapEntries = reader.readObject(SERIALIZER::decode);
-        activeTransactions = reader.readObject(SERIALIZER::decode);
+        preparedKeys = reader.readObject(serializer()::decode);
+        map = reader.readObject(serializer()::decode);
+        activeTransactions = reader.readObject(serializer()::decode);
         currentVersion = reader.readLong();
     }
 
     @Override
     protected void configure(RaftServiceExecutor executor) {
         // Listeners
-        executor.register(ADD_LISTENER, this::listen);
-        executor.register(REMOVE_LISTENER, this::unlisten);
+        executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
+        executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
         // Queries
-        executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
-        executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
-        executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
-        executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
-        executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
-        executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
-        executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
-        executor.register(SIZE, this::size, SERIALIZER::encode);
-        executor.register(VALUES, this::values, SERIALIZER::encode);
+        executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
+        executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
+        executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
+        executor.register(GET, serializer()::decode, this::get, serializer()::encode);
+        executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
+        executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
+        executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
+        executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
+        executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
         // Commands
-        executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
-        executor.register(CLEAR, this::clear, SERIALIZER::encode);
-        executor.register(BEGIN, SERIALIZER::decode, this::begin, SERIALIZER::encode);
-        executor.register(PREPARE, SERIALIZER::decode, this::prepare, SERIALIZER::encode);
-        executor.register(PREPARE_AND_COMMIT, SERIALIZER::decode, this::prepareAndCommit, SERIALIZER::encode);
-        executor.register(COMMIT, SERIALIZER::decode, this::commit, SERIALIZER::encode);
-        executor.register(ROLLBACK, SERIALIZER::decode, this::rollback, SERIALIZER::encode);
+        executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
+        executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
+        executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
+        executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
+        executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
+        executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
+        executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
+        executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
+        executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
+        executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
+        executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
+        executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
+        executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
+        executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
+        executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
     }
 
     /**
@@ -155,7 +192,7 @@
      * @return {@code true} if map contains key
      */
     protected boolean containsKey(Commit<? extends ContainsKey> commit) {
-        MapEntryValue value = mapEntries.get(commit.value().key());
+        MapEntryValue value = entries().get(commit.value().key());
         return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
     }
 
@@ -167,7 +204,7 @@
      */
     protected boolean containsValue(Commit<? extends ContainsValue> commit) {
         Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
-        return mapEntries.values().stream()
+        return entries().values().stream()
                 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
                 .anyMatch(value -> valueMatch.matches(value.value()));
     }
@@ -179,7 +216,7 @@
      * @return value mapped to key
      */
     protected Versioned<byte[]> get(Commit<? extends Get> commit) {
-        return toVersioned(mapEntries.get(commit.value().key()));
+        return toVersioned(entries().get(commit.value().key()));
     }
 
     /**
@@ -189,7 +226,7 @@
      * @return value mapped to key
      */
     protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
-        MapEntryValue value = mapEntries.get(commit.value().key());
+        MapEntryValue value = entries().get(commit.value().key());
         if (value == null) {
             return new Versioned<>(commit.value().defaultValue(), 0);
         } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
@@ -200,13 +237,12 @@
     }
 
     /**
-     * Handles a count commit.
+     * Handles a size commit.
      *
-     * @param commit size commit
      * @return number of entries in map
      */
-    protected int size(Commit<Void> commit) {
-        return (int) mapEntries.values().stream()
+    protected int size() {
+        return (int) entries().values().stream()
                 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
                 .count();
     }
@@ -214,22 +250,20 @@
     /**
      * Handles an is empty commit.
      *
-     * @param commit isEmpty commit
      * @return {@code true} if map is empty
      */
-    protected boolean isEmpty(Commit<Void> commit) {
-        return mapEntries.values().stream()
+    protected boolean isEmpty() {
+        return entries().values().stream()
                 .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
     }
 
     /**
      * Handles a keySet commit.
      *
-     * @param commit keySet commit
      * @return set of keys in map
      */
-    protected Set<String> keySet(Commit<Void> commit) {
-        return mapEntries.entrySet().stream()
+    protected Set<String> keySet() {
+        return entries().entrySet().stream()
                 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
                 .map(Map.Entry::getKey)
                 .collect(Collectors.toSet());
@@ -238,11 +272,10 @@
     /**
      * Handles a values commit.
      *
-     * @param commit values commit
      * @return collection of values in map
      */
-    protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
-        return mapEntries.entrySet().stream()
+    protected Collection<Versioned<byte[]>> values() {
+        return entries().entrySet().stream()
                 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
                 .map(entry -> toVersioned(entry.getValue()))
                 .collect(Collectors.toList());
@@ -251,103 +284,311 @@
     /**
      * Handles a entry set commit.
      *
-     * @param commit entrySet commit
      * @return set of map entries
      */
-    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
-        return mapEntries.entrySet().stream()
+    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
+        return entries().entrySet().stream()
                 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
                 .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
                 .collect(Collectors.toSet());
     }
 
     /**
-     * Handles a update and get commit.
+     * Returns a boolean indicating whether the given MapEntryValues are equal.
      *
-     * @param commit updateAndGet commit
-     * @return update result
+     * @param oldValue the first value to compare
+     * @param newValue the second value to compare
+     * @return indicates whether the two values are equal
      */
-    protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
-        try {
-            MapEntryUpdateResult.Status updateStatus = validate(commit.value());
-            String key = commit.value().key();
-            MapEntryValue oldCommitValue = mapEntries.get(commit.value().key());
-            Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
+    protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
+        return (oldValue == null && newValue == null)
+                || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
+    }
 
-            if (updateStatus != MapEntryUpdateResult.Status.OK) {
-                return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, oldMapValue);
-            }
+    /**
+     * Returns a boolean indicating whether the given entry values are equal.
+     *
+     * @param oldValue the first value to compare
+     * @param newValue the second value to compare
+     * @return indicates whether the two values are equal
+     */
+    protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
+        return (oldValue == null && newValue == null)
+                || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
+    }
 
-            byte[] newValue = commit.value().value();
-            currentVersion = commit.index();
-            Versioned<byte[]> newMapValue = newValue == null ? null
-                    : new Versioned<>(newValue, currentVersion);
+    /**
+     * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
+     *
+     * @param value the value to check
+     * @return indicates whether the given value is null or is a tombstone
+     */
+    protected boolean valueIsNull(MapEntryValue value) {
+        return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
+    }
 
-            MapEvent.Type updateType = newValue == null ? REMOVE
-                    : oldCommitValue == null ? INSERT : UPDATE;
-
-            // If a value existed in the map, remove and discard the value to ensure disk can be freed.
-            if (updateType == REMOVE || updateType == UPDATE) {
-                mapEntries.remove(key);
-            }
-
-            // If this is an insert/update commit, add the commit to the map entries.
-            if (updateType == INSERT || updateType == UPDATE) {
-                mapEntries.put(key, new MapEntryValue(
-                        MapEntryValue.Type.VALUE,
+    /**
+     * Handles a put commit.
+     *
+     * @param commit put commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
+        String key = commit.value().key();
+        MapEntryValue oldValue = entries().get(key);
+        MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+        if (valueIsNull(oldValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
                         commit.index(),
-                        commit.value().value()));
-            } else if (!activeTransactions.isEmpty()) {
-                // If this is a delete but transactions are currently running, ensure tombstones are retained
-                // for version checks.
-                mapEntries.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, commit.index(), null));
+                        key,
+                        toVersioned(oldValue));
             }
-
-            publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
-            return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, newMapValue);
-        } catch (Exception e) {
-            getLogger().error("State machine operation failed", e);
-            throw Throwables.propagate(e);
+            entries().put(commit.value().key(),
+                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+            Versioned<byte[]> result = toVersioned(oldValue);
+            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+        } else if (!valuesEqual(oldValue, newValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
+                        commit.index(),
+                        key,
+                        toVersioned(oldValue));
+            }
+            entries().put(commit.value().key(),
+                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+            Versioned<byte[]> result = toVersioned(oldValue);
+            publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
         }
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+    }
+
+    /**
+     * Handles a putIfAbsent commit.
+     *
+     * @param commit putIfAbsent commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
+        String key = commit.value().key();
+        MapEntryValue oldValue = entries().get(key);
+        if (valueIsNull(oldValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
+                        commit.index(),
+                        key,
+                        toVersioned(oldValue));
+            }
+            MapEntryValue newValue = new MapEntryValue(
+                    MapEntryValue.Type.VALUE,
+                    commit.index(),
+                    commit.value().value());
+            entries().put(commit.value().key(),
+                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+            Versioned<byte[]> result = toVersioned(newValue);
+            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
+        }
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+    }
+
+    /**
+     * Handles a putAndGet commit.
+     *
+     * @param commit putAndGet commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
+        String key = commit.value().key();
+        MapEntryValue oldValue = entries().get(key);
+        MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+        if (valueIsNull(oldValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
+                        commit.index(),
+                        key,
+                        toVersioned(oldValue));
+            }
+            entries().put(commit.value().key(), newValue);
+            Versioned<byte[]> result = toVersioned(newValue);
+            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+        } else if (!valuesEqual(oldValue, newValue)) {
+            if (preparedKeys.contains(key)) {
+                return new MapEntryUpdateResult<>(
+                        MapEntryUpdateResult.Status.WRITE_LOCK,
+                        commit.index(),
+                        key,
+                        toVersioned(oldValue));
+            }
+            entries().put(commit.value().key(), newValue);
+            Versioned<byte[]> result = toVersioned(newValue);
+            publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+        }
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+    }
+
+    /**
+     * Handles a remove commit.
+     *
+     * @param index the commit index
+     * @param key the key to remove
+     * @param predicate predicate to determine whether to remove the entry
+     * @return map entry update result
+     */
+    private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
+        MapEntryValue value = entries().get(key);
+        if (value == null || !predicate.test(value)) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+        }
+        entries().remove(key);
+        if (!activeTransactions.isEmpty()) {
+            entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+        }
+        Versioned<byte[]> result = toVersioned(value);
+        publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)));
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
+    }
+
+    /**
+     * Handles a remove commit.
+     *
+     * @param commit remove commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
+        return removeIf(commit.index(), commit.value().key(), v -> true);
+    }
+
+    /**
+     * Handles a removeValue commit.
+     *
+     * @param commit removeValue commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
+        return removeIf(commit.index(), commit.value().key(), v ->
+                valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
+    }
+
+    /**
+     * Handles a removeVersion commit.
+     *
+     * @param commit removeVersion commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
+        return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
+    }
+
+    /**
+     * Handles a replace commit.
+     *
+     * @param index the commit index
+     * @param key the key to replace
+     * @param newValue the value with which to replace the key
+     * @param predicate a predicate to determine whether to replace the key
+     * @return map entry update result
+     */
+    private MapEntryUpdateResult<String, byte[]> replaceIf(
+            long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
+        MapEntryValue oldValue = entries().get(key);
+        if (oldValue == null) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+        }
+        if (!predicate.test(oldValue)) {
+            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue));
+        }
+        entries().put(key, newValue);
+        Versioned<byte[]> result = toVersioned(oldValue);
+        publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)));
+        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
+    }
+
+    /**
+     * Handles a replace commit.
+     *
+     * @param commit replace commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
+        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+        return replaceIf(commit.index(), commit.value().key(), value, v -> true);
+    }
+
+    /**
+     * Handles a replaceValue commit.
+     *
+     * @param commit replaceValue commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
+        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
+        return replaceIf(commit.index(), commit.value().key(), value,
+                v -> valuesEqual(v.value(), commit.value().oldValue()));
+    }
+
+    /**
+     * Handles a replaceVersion commit.
+     *
+     * @param commit replaceVersion commit
+     * @return map entry update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
+        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
+        return replaceIf(commit.index(), commit.value().key(), value,
+                v -> v.version() == commit.value().oldVersion());
     }
 
     /**
      * Handles a clear commit.
      *
-     * @param commit clear commit
      * @return clear result
      */
-    protected MapEntryUpdateResult.Status clear(Commit<Void> commit) {
-        Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
-                .entrySet().iterator();
+    protected MapEntryUpdateResult.Status clear() {
+        Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
+        Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
         while (iterator.hasNext()) {
             Map.Entry<String, MapEntryValue> entry = iterator.next();
             String key = entry.getKey();
             MapEntryValue value = entry.getValue();
-            Versioned<byte[]> removedValue = new Versioned<>(value.value(),
-                    value.version());
-            publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
-            iterator.remove();
+            if (!valueIsNull(value)) {
+                Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
+                publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
+                if (activeTransactions.isEmpty()) {
+                    iterator.remove();
+                } else {
+                    entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+                }
+            }
         }
+        entries().putAll(entriesToAdd);
         return MapEntryUpdateResult.Status.OK;
     }
 
     /**
      * Handles a listen commit.
      *
-     * @param commit listen commit
+     * @param session listen session
      */
-    protected void listen(Commit<Void> commit) {
-        listeners.put(commit.session().sessionId().id(), commit.session());
+    protected void listen(RaftSession session) {
+        listeners.put(session.sessionId().id(), session);
     }
 
     /**
      * Handles an unlisten commit.
      *
-     * @param commit unlisten commit
+     * @param session unlisten session
      */
-    protected void unlisten(Commit<Void> commit) {
-        listeners.remove(commit.session().sessionId().id());
+    protected void unlisten(RaftSession session) {
+        listeners.remove(session.sessionId().id());
     }
 
     /**
@@ -412,7 +653,7 @@
                 }
 
                 // Read the existing value from the map.
-                MapEntryValue existingValue = mapEntries.get(key);
+                MapEntryValue existingValue = entries().get(key);
 
                 // Note: if the existing value is null, that means the key has not changed during the transaction,
                 // otherwise a tombstone would have been retained.
@@ -501,7 +742,7 @@
                 continue;
             }
 
-            MapEntryValue previousValue = mapEntries.remove(key);
+            MapEntryValue previousValue = entries().remove(key);
             MapEntryValue newValue = null;
 
             // If the record is not a delete, create a transactional commit.
@@ -512,11 +753,42 @@
                 newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
             }
 
-            eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
-
+            MapEvent<String, byte[]> event;
             if (newValue != null) {
-                mapEntries.put(key, newValue);
+                entries().put(key, newValue);
+                if (!valueIsNull(newValue)) {
+                    if (!valueIsNull(previousValue)) {
+                        event = new MapEvent<>(
+                                MapEvent.Type.UPDATE,
+                                "",
+                                key,
+                                toVersioned(newValue),
+                                toVersioned(previousValue));
+                    } else {
+                        event = new MapEvent<>(
+                                MapEvent.Type.INSERT,
+                                "",
+                                key,
+                                toVersioned(newValue),
+                                null);
+                    }
+                } else {
+                    event = new MapEvent<>(
+                            MapEvent.Type.REMOVE,
+                            "",
+                            key,
+                            null,
+                            toVersioned(previousValue));
+                }
+            } else {
+                event = new MapEvent<>(
+                        MapEvent.Type.REMOVE,
+                        "",
+                        key,
+                        null,
+                        toVersioned(previousValue));
             }
+            eventsToPublish.add(event);
         }
         publish(eventsToPublish);
         return CommitResult.OK;
@@ -557,7 +829,7 @@
      */
     private void discardTombstones() {
         if (activeTransactions.isEmpty()) {
-            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
             while (iterator.hasNext()) {
                 MapEntryValue value = iterator.next().getValue();
                 if (value.type() == MapEntryValue.Type.TOMBSTONE) {
@@ -568,7 +840,7 @@
             long lowWaterMark = activeTransactions.values().stream()
                     .mapToLong(TransactionScope::version)
                     .min().getAsLong();
-            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
             while (iterator.hasNext()) {
                 MapEntryValue value = iterator.next().getValue();
                 if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
@@ -579,46 +851,32 @@
     }
 
     /**
-     * Computes the update status that would result if the specified update were to applied to
-     * the state machine.
-     *
-     * @param update update
-     * @return status
-     */
-    private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
-        MapEntryValue existingValue = mapEntries.get(update.key());
-        boolean isEmpty = existingValue == null || existingValue.type() == MapEntryValue.Type.TOMBSTONE;
-        if (isEmpty && update.value() == null) {
-            return MapEntryUpdateResult.Status.NOOP;
-        }
-        if (preparedKeys.contains(update.key())) {
-            return MapEntryUpdateResult.Status.WRITE_LOCK;
-        }
-        byte[] existingRawValue = isEmpty ? null : existingValue.value();
-        Long existingVersion = isEmpty ? null : existingValue.version();
-        return update.valueMatch().matches(existingRawValue)
-                && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
-                : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
-    }
-
-    /**
      * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
      * @param value map entry value
      * @return versioned instance
      */
-    private Versioned<byte[]> toVersioned(MapEntryValue value) {
+    protected Versioned<byte[]> toVersioned(MapEntryValue value) {
         return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
                 ? new Versioned<>(value.value(), value.version()) : null;
     }
 
     /**
+     * Publishes an event to listeners.
+     *
+     * @param event event to publish
+     */
+    private void publish(MapEvent<String, byte[]> event) {
+        publish(Lists.newArrayList(event));
+    }
+
+    /**
      * Publishes events to listeners.
      *
      * @param events list of map event to publish
      */
     private void publish(List<MapEvent<String, byte[]>> events) {
         listeners.values().forEach(session -> {
-            session.publish(CHANGE, SERIALIZER::encode, events);
+            session.publish(CHANGE, serializer()::encode, events);
         });
     }
 
@@ -639,7 +897,7 @@
     /**
      * Interface implemented by map values.
      */
-    private static class MapEntryValue {
+    protected static class MapEntryValue {
         protected final Type type;
         protected final long version;
         protected final byte[] value;
@@ -689,7 +947,7 @@
     /**
      * Map transaction scope.
      */
-    private static final class TransactionScope {
+    protected static final class TransactionScope {
         private final long version;
         private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
index 8f91b3d..e11333d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
@@ -16,63 +16,37 @@
 
 package org.onosproject.store.primitives.resources.impl;
 
-import java.util.Collection;
-import java.util.ConcurrentModificationException;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
-import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
 
-import com.google.common.collect.Maps;
 import io.atomix.protocols.raft.proxy.RaftProxy;
 import org.onlab.util.KryoNamespace;
-import org.onlab.util.Match;
-import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorEntry;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherEntry;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerEntry;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_KEY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CLEAR;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_KEY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_VALUE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingEntry;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsValue;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ENTRY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET_OR_DEFAULT;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.Get;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GetOrDefault;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LOWER_ENTRY;
@@ -80,338 +54,105 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerKey;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_FIRST_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_LAST_ENTRY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.REMOVE_LISTENER;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SIZE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UPDATE_AND_GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UpdateAndGet;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.VALUES;
 
 /**
  * Implementation of {@link AsyncConsistentTreeMap}.
  */
-public class AtomixConsistentTreeMap extends AbstractRaftPrimitive implements AsyncConsistentTreeMap<byte[]> {
+public class AtomixConsistentTreeMap extends AtomixConsistentMap implements AsyncConsistentTreeMap<byte[]> {
     private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
             .register(KryoNamespaces.BASIC)
+            .register(AtomixConsistentMapOperations.NAMESPACE)
             .register(AtomixConsistentTreeMapOperations.NAMESPACE)
-            .register(AtomixConsistentTreeMapEvents.NAMESPACE)
+            .register(AtomixConsistentMapEvents.NAMESPACE)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 150)
+            .register(AtomixConsistentMapService.TransactionScope.class)
+            .register(TransactionLog.class)
+            .register(TransactionId.class)
+            .register(AtomixConsistentMapService.MapEntryValue.class)
+            .register(AtomixConsistentMapService.MapEntryValue.Type.class)
+            .register(new HashMap().keySet().getClass())
+            .register(TreeMap.class)
             .build());
 
-    private final Map<MapEventListener<String, byte[]>, Executor>
-            mapEventListeners = Maps.newConcurrentMap();
-
     public AtomixConsistentTreeMap(RaftProxy proxy) {
         super(proxy);
-        proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
-    }
-
-    private void handleEvent(List<MapEvent<String, byte[]>> events) {
-        events.forEach(event -> mapEventListeners.
-                forEach((listener, executor) ->
-                        executor.execute(() ->
-                                listener.event(event))));
     }
 
     @Override
-    public CompletableFuture<Boolean> isEmpty() {
-        return proxy.invoke(IS_EMPTY, SERIALIZER::decode);
-    }
-
-    @Override
-    public CompletableFuture<Integer> size() {
-        return proxy.invoke(SIZE, SERIALIZER::decode);
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsKey(String key) {
-        return proxy.invoke(CONTAINS_KEY, SERIALIZER::encode, new ContainsKey(key), SERIALIZER::decode);
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsValue(byte[] value) {
-        return proxy.invoke(CONTAINS_VALUE, SERIALIZER::encode, new ContainsValue(value), SERIALIZER::decode);
-    }
-
-    @Override
-    public CompletableFuture<Versioned<byte[]>> get(String key) {
-        return proxy.invoke(GET, SERIALIZER::encode, new Get(key), SERIALIZER::decode);
-    }
-
-    @Override
-    public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
-        return proxy.invoke(
-                GET_OR_DEFAULT,
-                SERIALIZER::encode,
-                new GetOrDefault(key, defaultValue),
-                SERIALIZER::decode);
-    }
-
-    @Override
-    public CompletableFuture<Set<String>> keySet() {
-        return proxy.invoke(KEY_SET, SERIALIZER::decode);
-    }
-
-    @Override
-    public CompletableFuture<Collection<Versioned<byte[]>>> values() {
-        return proxy.invoke(VALUES, SERIALIZER::decode);
-    }
-
-    @Override
-    public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet() {
-        return proxy.invoke(ENTRY_SET, SERIALIZER::decode);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, value, Match.ANY, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.oldValue());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, value, Match.ANY, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.newValue());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, value, Match.NULL, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.oldValue());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Versioned<byte[]>> remove(String key) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, null, Match.ANY, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.oldValue());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Boolean> remove(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.updated());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Boolean> remove(String key, long version) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.updated());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.oldValue());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.updated());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
-        return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                UPDATE_AND_GET,
-                SERIALIZER::encode,
-                new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)),
-                SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r.status()))
-                .thenApply(v -> v.updated());
-    }
-
-    @Override
-    public CompletableFuture<Void> clear() {
-        return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, SERIALIZER::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> null);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public CompletableFuture<Versioned<byte[]>> computeIf(String key,
-            Predicate<? super byte[]> condition,
-            BiFunction<? super String,
-                    ? super byte[],
-                    ? extends byte[]> remappingFunction) {
-        return get(key).thenCompose(r1 -> {
-            byte[] existingValue = r1 == null ? null : r1.value();
-
-            if (!condition.test(existingValue)) {
-                return CompletableFuture.completedFuture(r1);
-            }
-
-            AtomicReference<byte[]> computedValue = new AtomicReference<byte[]>();
-            try {
-                computedValue.set(remappingFunction.apply(key, existingValue));
-            } catch (Exception e) {
-                CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
-                future.completeExceptionally(e);
-                return future;
-            }
-            if (computedValue.get() == null && r1 == null) {
-                return CompletableFuture.completedFuture(null);
-            }
-            Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
-            Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
-            return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
-                    UPDATE_AND_GET,
-                    SERIALIZER::encode,
-                    new UpdateAndGet(key, computedValue.get(), valueMatch, versionMatch),
-                    SERIALIZER::decode)
-                    .whenComplete((r, e) -> throwIfLocked(r.status()))
-                    .thenApply(v -> v.newValue());
-        });
-    }
-
-    @Override
-    public CompletableFuture<Void> addListener(
-            MapEventListener<String, byte[]> listener, Executor executor) {
-        if (mapEventListeners.isEmpty()) {
-            return proxy.invoke(ADD_LISTENER).thenRun(() ->
-                    mapEventListeners.put(listener,
-                            executor));
-        } else {
-            mapEventListeners.put(listener, executor);
-            return CompletableFuture.completedFuture(null);
-        }
-    }
-
-    @Override
-    public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
-        if (mapEventListeners.remove(listener) != null &&
-                mapEventListeners.isEmpty()) {
-            return proxy.invoke(REMOVE_LISTENER)
-                    .thenApply(v -> null);
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-
-    private void throwIfLocked(MapEntryUpdateResult.Status status) {
-        if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
-            throw new ConcurrentModificationException("Cannot update TreeMap: another update is in progress.");
-        }
+    protected Serializer serializer() {
+        return SERIALIZER;
     }
 
     @Override
     public CompletableFuture<String> firstKey() {
-        return proxy.invoke(FIRST_KEY, SERIALIZER::decode);
+        return proxy.invoke(FIRST_KEY, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<String> lastKey() {
-        return proxy.invoke(LAST_KEY, SERIALIZER::decode);
+        return proxy.invoke(LAST_KEY, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> ceilingEntry(String key) {
-        return proxy.invoke(CEILING_ENTRY, SERIALIZER::encode, new CeilingEntry(key), SERIALIZER::decode);
+        return proxy.invoke(CEILING_ENTRY, serializer()::encode, new CeilingEntry(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> floorEntry(String key) {
-        return proxy.invoke(FLOOR_ENTRY, SERIALIZER::encode, new FloorEntry(key), SERIALIZER::decode);
+        return proxy.invoke(FLOOR_ENTRY, serializer()::encode, new FloorEntry(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> higherEntry(
             String key) {
-        return proxy.invoke(HIGHER_ENTRY, SERIALIZER::encode, new HigherEntry(key), SERIALIZER::decode);
+        return proxy.invoke(HIGHER_ENTRY, serializer()::encode, new HigherEntry(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lowerEntry(
             String key) {
-        return proxy.invoke(LOWER_ENTRY, SERIALIZER::encode, new LowerEntry(key), SERIALIZER::decode);
+        return proxy.invoke(LOWER_ENTRY, serializer()::encode, new LowerEntry(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> firstEntry() {
-        return proxy.invoke(FIRST_ENTRY, SERIALIZER::decode);
+        return proxy.invoke(FIRST_ENTRY, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lastEntry() {
-        return proxy.invoke(LAST_ENTRY, SERIALIZER::decode);
+        return proxy.invoke(LAST_ENTRY, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollFirstEntry() {
-        return proxy.invoke(POLL_FIRST_ENTRY, SERIALIZER::decode);
+        return proxy.invoke(POLL_FIRST_ENTRY, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollLastEntry() {
-        return proxy.invoke(POLL_LAST_ENTRY, SERIALIZER::decode);
+        return proxy.invoke(POLL_LAST_ENTRY, serializer()::decode);
     }
 
     @Override
     public CompletableFuture<String> lowerKey(String key) {
-        return proxy.invoke(LOWER_KEY, SERIALIZER::encode, new LowerKey(key), SERIALIZER::decode);
+        return proxy.invoke(LOWER_KEY, serializer()::encode, new LowerKey(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<String> floorKey(String key) {
-        return proxy.invoke(FLOOR_KEY, SERIALIZER::encode, new FloorKey(key), SERIALIZER::decode);
+        return proxy.invoke(FLOOR_KEY, serializer()::encode, new FloorKey(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<String> ceilingKey(String key) {
-        return proxy.invoke(CEILING_KEY, SERIALIZER::encode, new CeilingKey(key), SERIALIZER::decode);
+        return proxy.invoke(CEILING_KEY, serializer()::encode, new CeilingKey(key), serializer()::decode);
     }
 
     @Override
     public CompletableFuture<String> higherKey(String key) {
-        return proxy.invoke(HIGHER_KEY, SERIALIZER::encode, new HigherKey(key), SERIALIZER::decode);
+        return proxy.invoke(HIGHER_KEY, serializer()::encode, new HigherKey(key), serializer()::decode);
     }
 
     @Override
@@ -425,29 +166,4 @@
             boolean inclusiveLower) {
         throw new UnsupportedOperationException("This operation is not yet supported.");
     }
-
-    @Override
-    public CompletableFuture<Version> begin(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet supported.");
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
-        throw new UnsupportedOperationException("This operation is not yet supported.");
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
-        throw new UnsupportedOperationException("This operation is not yet supported.");
-    }
-
-    @Override
-    public CompletableFuture<Void> commit(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet supported.");
-    }
-
-    @Override
-    public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet supported.");
-    }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapEvents.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapEvents.java
deleted file mode 100644
index ab3b972..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapEvents.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.resources.impl;
-
-import io.atomix.protocols.raft.event.EventType;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.MapEvent;
-
-/**
- * Atomix consistent tree map events.
- */
-public enum AtomixConsistentTreeMapEvents implements EventType {
-    CHANGE("change");
-
-    private final String id;
-
-    AtomixConsistentTreeMapEvents(String id) {
-        this.id = id;
-    }
-
-    @Override
-    public String id() {
-        return id;
-    }
-
-    public static final KryoNamespace NAMESPACE = KryoNamespace.newBuilder()
-            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50)
-            .register(MapEvent.class)
-            .register(MapEvent.Type.class)
-            .build("AtomixConsistentTreeMapEvents");
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapOperations.java
index c6b8d88..c44a63e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapOperations.java
@@ -34,15 +34,6 @@
  * state machine operations.
  */
 public enum AtomixConsistentTreeMapOperations implements OperationId {
-    CONTAINS_KEY("containsKey", OperationType.QUERY),
-    CONTAINS_VALUE("containsValue", OperationType.QUERY),
-    ENTRY_SET("entrySet", OperationType.QUERY),
-    GET("get", OperationType.QUERY),
-    GET_OR_DEFAULT("getOrDefault", OperationType.QUERY),
-    IS_EMPTY("isEmpty", OperationType.QUERY),
-    KEY_SET("keySet", OperationType.QUERY),
-    SIZE("size", OperationType.QUERY),
-    VALUES("values", OperationType.QUERY),
     SUB_MAP("subMap", OperationType.QUERY),
     FIRST_KEY("firstKey", OperationType.QUERY),
     LAST_KEY("lastKey", OperationType.QUERY),
@@ -57,11 +48,7 @@
     CEILING_ENTRY("ceilingEntry", OperationType.QUERY),
     CEILING_KEY("ceilingKey", OperationType.QUERY),
     HIGHER_ENTRY("higherEntry", OperationType.QUERY),
-    HIGHER_KEY("higherKey", OperationType.QUERY),
-    UPDATE_AND_GET("updateAndGet", OperationType.COMMAND),
-    CLEAR("clear", OperationType.COMMAND),
-    ADD_LISTENER("addListener", OperationType.COMMAND),
-    REMOVE_LISTENER("removeListener", OperationType.COMMAND);
+    HIGHER_KEY("higherKey", OperationType.QUERY);
 
     private final String id;
     private final OperationType type;
@@ -83,11 +70,7 @@
 
     public static final KryoNamespace NAMESPACE = KryoNamespace.newBuilder()
             .register(KryoNamespaces.BASIC)
-            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
-            .register(ContainsKey.class)
-            .register(ContainsValue.class)
-            .register(Get.class)
-            .register(GetOrDefault.class)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
             .register(LowerKey.class)
             .register(LowerEntry.class)
             .register(HigherKey.class)
@@ -96,11 +79,7 @@
             .register(FloorEntry.class)
             .register(CeilingKey.class)
             .register(CeilingEntry.class)
-            .register(UpdateAndGet.class)
-            .register(Match.class)
             .register(Versioned.class)
-            .register(MapEntryUpdateResult.class)
-            .register(MapEntryUpdateResult.Status.class)
             .register(AbstractMap.SimpleImmutableEntry.class)
             .register(Maps.immutableEntry("", "").getClass())
             .build("AtomixConsistentTreeMapOperations");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
index b9e7135..f089051 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
@@ -16,61 +16,36 @@
 
 package org.onosproject.store.primitives.resources.impl;
 
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
-import java.util.stream.Collectors;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import io.atomix.protocols.raft.service.AbstractRaftService;
 import io.atomix.protocols.raft.service.Commit;
 import io.atomix.protocols.raft.service.RaftServiceExecutor;
 import io.atomix.protocols.raft.session.RaftSession;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
 import org.onlab.util.KryoNamespace;
-import org.onlab.util.Match;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_KEY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CLEAR;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_KEY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_VALUE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingEntry;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsValue;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ENTRY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorEntry;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET_OR_DEFAULT;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.Get;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GetOrDefault;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherEntry;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LOWER_ENTRY;
@@ -79,307 +54,134 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerKey;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_FIRST_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_LAST_ENTRY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.REMOVE_LISTENER;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SIZE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SUB_MAP;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SubMap;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UPDATE_AND_GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UpdateAndGet;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.VALUES;
-import static org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult.Status;
 
 /**
  * State machine corresponding to {@link AtomixConsistentTreeMap} backed by a
  * {@link TreeMap}.
  */
-public class AtomixConsistentTreeMapService extends AbstractRaftService {
+public class AtomixConsistentTreeMapService extends AtomixConsistentMapService {
 
     private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
             .register(KryoNamespaces.BASIC)
+            .register(AtomixConsistentMapOperations.NAMESPACE)
             .register(AtomixConsistentTreeMapOperations.NAMESPACE)
-            .register(AtomixConsistentTreeMapEvents.NAMESPACE)
-            .register(TreeMapEntryValue.class)
-            .register(new HashMap<>().keySet().getClass())
+            .register(AtomixConsistentMapEvents.NAMESPACE)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 150)
+            .register(TransactionScope.class)
+            .register(TransactionLog.class)
+            .register(TransactionId.class)
+            .register(MapEntryValue.class)
+            .register(MapEntryValue.Type.class)
+            .register(new HashMap().keySet().getClass())
             .register(TreeMap.class)
             .build());
 
-    private final Map<Long, RaftSession> listeners = Maps.newHashMap();
-    private TreeMap<String, TreeMapEntryValue> tree = Maps.newTreeMap();
-    private final Set<String> preparedKeys = Sets.newHashSet();
-
     @Override
-    public void snapshot(SnapshotWriter writer) {
-        writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
-        writer.writeObject(preparedKeys, SERIALIZER::encode);
-        writer.writeObject(tree, SERIALIZER::encode);
+    protected TreeMap<String, MapEntryValue> createMap() {
+        return Maps.newTreeMap();
     }
 
     @Override
-    public void install(SnapshotReader reader) {
-        listeners.clear();
-        for (long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
-            listeners.put(sessionId, getSessions().getSession(sessionId));
-        }
+    protected TreeMap<String, MapEntryValue> entries() {
+        return (TreeMap<String, MapEntryValue>) super.entries();
+    }
 
-        preparedKeys.clear();
-        preparedKeys.addAll(reader.readObject(SERIALIZER::decode));
-
-        tree.clear();
-        tree.putAll(reader.readObject(SERIALIZER::decode));
+    @Override
+    protected Serializer serializer() {
+        return SERIALIZER;
     }
 
     @Override
     public void configure(RaftServiceExecutor executor) {
-        // Listeners
-        executor.register(ADD_LISTENER, this::listen);
-        executor.register(REMOVE_LISTENER, this::unlisten);
-        // Queries
-        executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
-        executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
-        executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
-        executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
-        executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
-        executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
-        executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
-        executor.register(SIZE, this::size, SERIALIZER::encode);
-        executor.register(VALUES, this::values, SERIALIZER::encode);
-        executor.register(SUB_MAP, SERIALIZER::decode, this::subMap, SERIALIZER::encode);
-        executor.register(FIRST_KEY, this::firstKey, SERIALIZER::encode);
-        executor.register(LAST_KEY, this::lastKey, SERIALIZER::encode);
-        executor.register(FIRST_ENTRY, this::firstEntry, SERIALIZER::encode);
-        executor.register(LAST_ENTRY, this::lastEntry, SERIALIZER::encode);
-        executor.register(POLL_FIRST_ENTRY, this::pollFirstEntry, SERIALIZER::encode);
-        executor.register(POLL_LAST_ENTRY, this::pollLastEntry, SERIALIZER::encode);
-        executor.register(LOWER_ENTRY, SERIALIZER::decode, this::lowerEntry, SERIALIZER::encode);
-        executor.register(LOWER_KEY, SERIALIZER::decode, this::lowerKey, SERIALIZER::encode);
-        executor.register(FLOOR_ENTRY, SERIALIZER::decode, this::floorEntry, SERIALIZER::encode);
-        executor.register(FLOOR_KEY, SERIALIZER::decode, this::floorKey, SERIALIZER::encode);
-        executor.register(CEILING_ENTRY, SERIALIZER::decode, this::ceilingEntry, SERIALIZER::encode);
-        executor.register(CEILING_KEY, SERIALIZER::decode, this::ceilingKey, SERIALIZER::encode);
-        executor.register(HIGHER_ENTRY, SERIALIZER::decode, this::higherEntry, SERIALIZER::encode);
-        executor.register(HIGHER_KEY, SERIALIZER::decode, this::higherKey, SERIALIZER::encode);
-
-        // Commands
-        executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
-        executor.register(CLEAR, this::clear, SERIALIZER::encode);
+        super.configure(executor);
+        executor.register(SUB_MAP, serializer()::decode, this::subMap, serializer()::encode);
+        executor.register(FIRST_KEY, (Commit<Void> c) -> firstKey(), serializer()::encode);
+        executor.register(LAST_KEY, (Commit<Void> c) -> lastKey(), serializer()::encode);
+        executor.register(FIRST_ENTRY, (Commit<Void> c) -> firstEntry(), serializer()::encode);
+        executor.register(LAST_ENTRY, (Commit<Void> c) -> lastEntry(), serializer()::encode);
+        executor.register(POLL_FIRST_ENTRY, (Commit<Void> c) -> pollFirstEntry(), serializer()::encode);
+        executor.register(POLL_LAST_ENTRY, (Commit<Void> c) -> pollLastEntry(), serializer()::encode);
+        executor.register(LOWER_ENTRY, serializer()::decode, this::lowerEntry, serializer()::encode);
+        executor.register(LOWER_KEY, serializer()::decode, this::lowerKey, serializer()::encode);
+        executor.register(FLOOR_ENTRY, serializer()::decode, this::floorEntry, serializer()::encode);
+        executor.register(FLOOR_KEY, serializer()::decode, this::floorKey, serializer()::encode);
+        executor.register(CEILING_ENTRY, serializer()::decode, this::ceilingEntry, serializer()::encode);
+        executor.register(CEILING_KEY, serializer()::decode, this::ceilingKey, serializer()::encode);
+        executor.register(HIGHER_ENTRY, serializer()::decode, this::higherEntry, serializer()::encode);
+        executor.register(HIGHER_KEY, serializer()::decode, this::higherKey, serializer()::encode);
     }
 
-    protected boolean containsKey(Commit<? extends ContainsKey> commit) {
-        return toVersioned(tree.get((commit.value().key()))) != null;
-    }
-
-    protected boolean containsValue(Commit<? extends ContainsValue> commit) {
-        Match<byte[]> valueMatch = Match
-                .ifValue(commit.value().value());
-        return tree.values().stream().anyMatch(
-                value -> valueMatch.matches(value.value()));
-    }
-
-    protected Versioned<byte[]> get(Commit<? extends Get> commit) {
-        return toVersioned(tree.get(commit.value().key()));
-    }
-
-    protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
-        Versioned<byte[]> value = toVersioned(tree.get(commit.value().key()));
-        return value != null ? value : new Versioned<>(commit.value().defaultValue(), 0);
-    }
-
-    protected int size(Commit<Void> commit) {
-        return tree.size();
-    }
-
-    protected boolean isEmpty(Commit<Void> commit) {
-        return tree.isEmpty();
-    }
-
-    protected Set<String> keySet(Commit<Void> commit) {
-        return tree.keySet().stream().collect(Collectors.toSet());
-    }
-
-    protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
-        return tree.values().stream().map(this::toVersioned)
-                .collect(Collectors.toList());
-    }
-
-    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
-        return tree
-                .entrySet()
-                .stream()
-                .map(e -> Maps.immutableEntry(e.getKey(),
-                        toVersioned(e.getValue())))
-                .collect(Collectors.toSet());
-    }
-
-    protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
-        Status updateStatus = validate(commit.value());
-        String key = commit.value().key();
-        TreeMapEntryValue oldCommitValue = tree.get(commit.value().key());
-        Versioned<byte[]> oldTreeValue = toVersioned(oldCommitValue);
-
-        if (updateStatus != Status.OK) {
-            return new MapEntryUpdateResult<>(updateStatus, "", key,
-                    oldTreeValue, oldTreeValue);
-        }
-
-        byte[] newValue = commit.value().value();
-        long newVersion = commit.index();
-        Versioned<byte[]> newTreeValue = newValue == null ? null
-                : new Versioned<byte[]>(newValue, newVersion);
-
-        MapEvent.Type updateType = newValue == null ? MapEvent.Type.REMOVE
-                : oldCommitValue == null ? MapEvent.Type.INSERT :
-                MapEvent.Type.UPDATE;
-        if (updateType == MapEvent.Type.REMOVE ||
-                updateType == MapEvent.Type.UPDATE) {
-            tree.remove(key);
-        }
-        if (updateType == MapEvent.Type.INSERT ||
-                updateType == MapEvent.Type.UPDATE) {
-            tree.put(key, new TreeMapEntryValue(newVersion, commit.value().value()));
-        }
-        publish(Lists.newArrayList(new MapEvent<>("", key, newTreeValue,
-                oldTreeValue)));
-        return new MapEntryUpdateResult<>(updateStatus, "", key, oldTreeValue,
-                newTreeValue);
-    }
-
-    protected Status clear(Commit<Void> commit) {
-        Iterator<Map.Entry<String, TreeMapEntryValue>> iterator = tree
-                .entrySet()
-                .iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<String, TreeMapEntryValue> entry = iterator.next();
-            String key = entry.getKey();
-            TreeMapEntryValue value = entry.getValue();
-            Versioned<byte[]> removedValue =
-                    new Versioned<byte[]>(value.value(),
-                            value.version());
-            publish(Lists.newArrayList(new MapEvent<>("", key, null,
-                    removedValue)));
-            iterator.remove();
-        }
-        return Status.OK;
-    }
-
-    protected void listen(Commit<Void> commit) {
-        listeners.put(commit.session().sessionId().id(), commit.session());
-    }
-
-    protected void unlisten(Commit<Void> commit) {
-        closeListener(commit.session().sessionId().id());
-    }
-
-    private Status validate(UpdateAndGet update) {
-        TreeMapEntryValue existingValue = tree.get(update.key());
-        if (existingValue == null && update.value() == null) {
-            return Status.NOOP;
-        }
-        if (preparedKeys.contains(update.key())) {
-            return Status.WRITE_LOCK;
-        }
-        byte[] existingRawValue = existingValue == null ? null :
-                existingValue.value();
-        Long existingVersion = existingValue == null ? null :
-                existingValue.version();
-        return update.valueMatch().matches(existingRawValue)
-                && update.versionMatch().matches(existingVersion) ?
-                Status.OK
-                : Status.PRECONDITION_FAILED;
-    }
-
-    protected NavigableMap<String, TreeMapEntryValue> subMap(
+    protected NavigableMap<String, MapEntryValue> subMap(
             Commit<? extends SubMap> commit) {
         // Do not support this until lazy communication is possible.  At present
         // it transmits up to the entire map.
-        SubMap<String, TreeMapEntryValue> subMap = commit.value();
-        return tree.subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
+        SubMap<String, MapEntryValue> subMap = commit.value();
+        return entries().subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
                 subMap.toKey(), subMap.isInclusiveTo());
     }
 
-    protected String firstKey(Commit<Void> commit) {
-        if (tree.isEmpty()) {
-            return null;
-        }
-        return tree.firstKey();
+    protected String firstKey() {
+        return isEmpty() ? null : entries().firstKey();
     }
 
-    protected String lastKey(Commit<Void> commit) {
-        return tree.isEmpty() ? null : tree.lastKey();
+    protected String lastKey() {
+        return isEmpty() ? null : entries().lastKey();
     }
 
     protected Map.Entry<String, Versioned<byte[]>> higherEntry(Commit<? extends HigherEntry> commit) {
-        if (tree.isEmpty()) {
-            return null;
-        }
-        return toVersionedEntry(
-                tree.higherEntry(commit.value().key()));
+        return isEmpty() ? null : toVersionedEntry(entries().higherEntry(commit.value().key()));
     }
 
-    protected Map.Entry<String, Versioned<byte[]>> firstEntry(Commit<Void> commit) {
-        if (tree.isEmpty()) {
-            return null;
-        }
-        return toVersionedEntry(tree.firstEntry());
+    protected Map.Entry<String, Versioned<byte[]>> firstEntry() {
+        return isEmpty() ? null : toVersionedEntry(entries().firstEntry());
     }
 
-    protected Map.Entry<String, Versioned<byte[]>> lastEntry(Commit<Void> commit) {
-        if (tree.isEmpty()) {
-            return null;
-        }
-        return toVersionedEntry(tree.lastEntry());
+    protected Map.Entry<String, Versioned<byte[]>> lastEntry() {
+        return isEmpty() ? null : toVersionedEntry(entries().lastEntry());
     }
 
-    protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry(Commit<Void> commit) {
-        return toVersionedEntry(tree.pollFirstEntry());
+    protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry() {
+        return toVersionedEntry(entries().pollFirstEntry());
     }
 
-    protected Map.Entry<String, Versioned<byte[]>> pollLastEntry(Commit<Void> commit) {
-        return toVersionedEntry(tree.pollLastEntry());
+    protected Map.Entry<String, Versioned<byte[]>> pollLastEntry() {
+        return toVersionedEntry(entries().pollLastEntry());
     }
 
     protected Map.Entry<String, Versioned<byte[]>> lowerEntry(Commit<? extends LowerEntry> commit) {
-        return toVersionedEntry(tree.lowerEntry(commit.value().key()));
+        return toVersionedEntry(entries().lowerEntry(commit.value().key()));
     }
 
     protected String lowerKey(Commit<? extends LowerKey> commit) {
-        return tree.lowerKey(commit.value().key());
+        return entries().lowerKey(commit.value().key());
     }
 
     protected Map.Entry<String, Versioned<byte[]>> floorEntry(Commit<? extends FloorEntry> commit) {
-        return toVersionedEntry(tree.floorEntry(commit.value().key()));
+        return toVersionedEntry(entries().floorEntry(commit.value().key()));
     }
 
     protected String floorKey(Commit<? extends FloorKey> commit) {
-        return tree.floorKey(commit.value().key());
+        return entries().floorKey(commit.value().key());
     }
 
     protected Map.Entry<String, Versioned<byte[]>> ceilingEntry(Commit<CeilingEntry> commit) {
-        return toVersionedEntry(
-                tree.ceilingEntry(commit.value().key()));
+        return toVersionedEntry(entries().ceilingEntry(commit.value().key()));
     }
 
     protected String ceilingKey(Commit<CeilingKey> commit) {
-        return tree.ceilingKey(commit.value().key());
+        return entries().ceilingKey(commit.value().key());
     }
 
     protected String higherKey(Commit<HigherKey> commit) {
-        return tree.higherKey(commit.value().key());
-    }
-
-    private Versioned<byte[]> toVersioned(TreeMapEntryValue value) {
-        return value == null ? null :
-                new Versioned<byte[]>(value.value(), value.version());
+        return entries().higherKey(commit.value().key());
     }
 
     private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
-            Map.Entry<String, TreeMapEntryValue> entry) {
-        //FIXME is this the best type of entry to return?
-        return entry == null ? null : new SimpleImmutableEntry<>(
-                entry.getKey(), toVersioned(entry.getValue()));
-    }
-
-    private void publish(List<MapEvent<String, byte[]>> events) {
-        listeners.values().forEach(session -> session.publish(CHANGE, SERIALIZER::encode, events));
+            Map.Entry<String, MapEntryValue> entry) {
+        return entry == null || valueIsNull(entry.getValue())
+                ? null : Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue()));
     }
 
     @Override
@@ -395,22 +197,4 @@
     private void closeListener(Long sessionId) {
         listeners.remove(sessionId);
     }
-
-    private static class TreeMapEntryValue {
-        private final long version;
-        private final byte[] value;
-
-        public TreeMapEntryValue(long version, byte[] value) {
-            this.version = version;
-            this.value = value;
-        }
-
-        public byte[] value() {
-            return value;
-        }
-
-        public long version() {
-            return version;
-        }
-    }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
index 63235d6..7773db7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
@@ -15,12 +15,8 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import java.util.function.Function;
-
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.Versioned;
-
 import com.google.common.base.MoreObjects;
+import org.onosproject.store.service.Versioned;
 
 /**
  * Result of a map entry update operation.
@@ -57,18 +53,16 @@
         PRECONDITION_FAILED
     }
 
-    private final String mapName;
-    private Status status;
+    private final Status status;
+    private final long version;
     private final K key;
-    private final Versioned<V> oldValue;
-    private final Versioned<V> newValue;
+    private final Versioned<V> result;
 
-    public MapEntryUpdateResult(Status status, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) {
+    public MapEntryUpdateResult(Status status, long version, K key, Versioned<V> result) {
         this.status = status;
-        this.mapName = mapName;
+        this.version = version;
         this.key = key;
-        this.oldValue = oldValue;
-        this.newValue = newValue;
+        this.result = result;
     }
 
     /**
@@ -80,14 +74,6 @@
     }
 
     /**
-     * Returns the map name.
-     * @return map name
-     */
-    public String mapName() {
-        return mapName;
-    }
-
-    /**
      * Returns the update status.
      * @return update status
      */
@@ -96,65 +82,27 @@
     }
 
     /**
-     * Returns the map key.
-     * @return key
+     * Returns the result version.
+     * @return result version
      */
-    public K key() {
-        return key;
+    public long version() {
+        return version;
     }
 
     /**
-     * Returns the old value.
-     * @return the previous value associated with key if updated was successful, otherwise current value
+     * Returns the value.
+     * @return the value associated with key if updated was successful, otherwise current value
      */
-    public Versioned<V> oldValue() {
-        return oldValue;
-    }
-
-    /**
-     * Returns the new value after update.
-     * @return if updated was unsuccessful, this is same as old value
-     */
-    public Versioned<V> newValue() {
-        return newValue;
-    }
-
-    /**
-     * Maps to another instance with different key and value types.
-     * @param keyTransform transformer to use for transcoding keys
-     * @param valueMapper mapper to use for transcoding values
-     * @return new instance
-     * @param <K1> key type of returned {@code MapEntryUpdateResult}
-     * @param <V1> value type of returned {@code MapEntryUpdateResult}
-     */
-    public <K1, V1> MapEntryUpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
-        return new MapEntryUpdateResult<>(status,
-                mapName,
-                keyTransform.apply(key),
-                oldValue == null ? null : oldValue.map(valueMapper),
-                newValue == null ? null : newValue.map(valueMapper));
-    }
-
-    /**
-     * Return the map event that will be generated as a result of this update.
-     * @return map event. if update was unsuccessful, this returns {@code null}
-     */
-    public MapEvent<K, V> toMapEvent() {
-        if (!updated()) {
-            return null;
-        } else {
-            return new MapEvent<>(mapName(), key(), newValue, oldValue);
-        }
+    public Versioned<V> result() {
+        return result;
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(MapEntryUpdateResult.class)
-                .add("mapName", mapName)
                 .add("status", status)
                 .add("key", key)
-                .add("oldValue", oldValue)
-                .add("newValue", newValue)
+                .add("result", result)
                 .toString();
     }
 }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
index 098c193..ecd8086 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -26,14 +26,14 @@
 import io.atomix.storage.StorageLevel;
 import io.atomix.time.WallClockTimestamp;
 import org.junit.Test;
-import org.onlab.util.Match;
 import org.onosproject.store.service.Versioned;
 
 import static org.easymock.EasyMock.mock;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
 
 /**
  * Consistent map service test.
@@ -49,10 +49,10 @@
         Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
 
         AtomixConsistentMapService service = new AtomixConsistentMapService();
-        service.updateAndGet(new DefaultCommit<>(
+        service.put(new DefaultCommit<>(
                 2,
-                UPDATE_AND_GET,
-                new AtomixConsistentMapOperations.UpdateAndGet("foo", "Hello world!".getBytes(), Match.ANY, Match.ANY),
+                PUT,
+                new Put("foo", "Hello world!".getBytes()),
                 mock(RaftSessionContext.class),
                 System.currentTimeMillis()));
 
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index e858b3f..7b26f8b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -255,7 +255,7 @@
 
         map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
             assertNull(result);
-        });
+        }).join();
 
         map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java
deleted file mode 100644
index 04698e0..0000000
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.resources.impl;
-
-import io.atomix.protocols.raft.service.ServiceId;
-import io.atomix.protocols.raft.service.impl.DefaultCommit;
-import io.atomix.protocols.raft.session.impl.RaftSessionContext;
-import io.atomix.protocols.raft.storage.RaftStorage;
-import io.atomix.protocols.raft.storage.snapshot.Snapshot;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
-import io.atomix.storage.StorageLevel;
-import io.atomix.time.WallClockTimestamp;
-import org.junit.Test;
-import org.onlab.util.Match;
-import org.onosproject.store.service.Versioned;
-
-import static org.easymock.EasyMock.mock;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UPDATE_AND_GET;
-
-/**
- * Consistent tree map service test.
- */
-public class AtomixConsistentTreeMapServiceTest {
-    @Test
-    @SuppressWarnings("unchecked")
-    public void testSnapshot() throws Exception {
-        SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
-                .withPrefix("test")
-                .withStorageLevel(StorageLevel.MEMORY)
-                .build());
-        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
-
-        AtomixConsistentTreeMapService service = new AtomixConsistentTreeMapService();
-        service.updateAndGet(new DefaultCommit<>(
-                2,
-                UPDATE_AND_GET,
-                new AtomixConsistentTreeMapOperations.UpdateAndGet(
-                        "foo", "Hello world!".getBytes(), Match.ANY, Match.ANY),
-                mock(RaftSessionContext.class),
-                System.currentTimeMillis()));
-
-        try (SnapshotWriter writer = snapshot.openWriter()) {
-            service.snapshot(writer);
-        }
-
-        snapshot.complete();
-
-        service = new AtomixConsistentTreeMapService();
-        try (SnapshotReader reader = snapshot.openReader()) {
-            service.install(reader);
-        }
-
-        Versioned<byte[]> value = service.get(new DefaultCommit<>(
-                2,
-                GET,
-                new AtomixConsistentTreeMapOperations.Get("foo"),
-                mock(RaftSessionContext.class),
-                System.currentTimeMillis()));
-        assertNotNull(value);
-        assertArrayEquals("Hello world!".getBytes(), value.value());
-    }
-}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
index f1de625..7e8eec7 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
@@ -180,7 +180,6 @@
             map.putAndGet(key, allValues.get(allKeys.indexOf(key))).thenAccept(secondResult -> {
                 assertArrayEquals(allValues.get(allKeys.indexOf(key)), firstResult.value());
                 assertArrayEquals(allValues.get(allKeys.indexOf(key)), secondResult.value());
-                assertTrue((firstResult.version() + 1) == secondResult.version());
             });
         }).join());
 
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index e117ba5..dbfee3c 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -176,7 +176,6 @@
             .register(AtomixConsistentMapOperations.class)
             .register(AtomixConsistentSetMultimapOperations.class)
             .register(AtomixConsistentSetMultimapEvents.class)
-            .register(AtomixConsistentTreeMapEvents.class)
             .register(AtomixConsistentTreeMapOperations.class)
             .register(AtomixCounterOperations.class)
             .register(AtomixDocumentTreeEvents.class)