diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 645215c..d676ab1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -50,7 +50,7 @@
 import org.onosproject.store.service.DocumentPath;
 import org.onosproject.store.service.DocumentTreeEvent;
 import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.WorkQueueStats;
@@ -97,7 +97,7 @@
         serializer.register(TransactionId.class, factory);
         serializer.register(MapUpdate.class, factory);
         serializer.register(MapUpdate.Type.class, factory);
-        serializer.register(MapTransaction.class, factory);
+        serializer.register(TransactionLog.class, factory);
         serializer.register(Versioned.class, factory);
         serializer.register(MapEvent.class, factory);
         serializer.register(Task.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index fccf48e..2f3fe11 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -15,11 +15,9 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.CommitStatus;
 import org.onosproject.store.service.Serializer;
@@ -27,7 +25,7 @@
 import org.onosproject.store.service.TransactionalMap;
 import org.onosproject.utils.MeteringAgent;
 
-import com.google.common.collect.Sets;
+import static com.google.common.base.MoreObjects.toStringHelper;
 
 /**
  * Default implementation of transaction context.
@@ -35,17 +33,12 @@
 public class DefaultTransactionContext implements TransactionContext {
 
     private final AtomicBoolean isOpen = new AtomicBoolean(false);
-    private final DistributedPrimitiveCreator creator;
     private final TransactionId transactionId;
     private final TransactionCoordinator transactionCoordinator;
-    private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
     private final MeteringAgent monitor;
 
-    public DefaultTransactionContext(TransactionId transactionId,
-            DistributedPrimitiveCreator creator,
-            TransactionCoordinator transactionCoordinator) {
+    public DefaultTransactionContext(TransactionId transactionId, TransactionCoordinator transactionCoordinator) {
         this.transactionId = transactionId;
-        this.creator = creator;
         this.transactionCoordinator = transactionCoordinator;
         this.monitor = new MeteringAgent("transactionContext", "*", true);
     }
@@ -75,8 +68,7 @@
     @Override
     public CompletableFuture<CommitStatus> commit() {
         final MeteringAgent.Context timer = monitor.startTimer("commit");
-        return transactionCoordinator.commit(transactionId, txParticipants)
-                                     .whenComplete((r, e) -> timer.stop(e));
+        return transactionCoordinator.commit().whenComplete((r, e) -> timer.stop(e));
     }
 
     @Override
@@ -85,14 +77,14 @@
     }
 
     @Override
-    public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
-            Serializer serializer) {
-        // FIXME: Do not create duplicates.
-        DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<K, V>(mapName,
-                DistributedPrimitives.newMeteredMap(creator.<K, V>newAsyncConsistentMap(mapName, serializer)),
-                this,
-                serializer);
-        txParticipants.add(txMap);
-        return txMap;
+    public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, Serializer serializer) {
+        return transactionCoordinator.getTransactionalMap(mapName, serializer);
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("transactionId", transactionId)
+                .toString();
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
index c599306..12f36a0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
@@ -15,31 +15,26 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionContextBuilder;
+
 /**
  * Default Transaction Context Builder.
  */
 public class DefaultTransactionContextBuilder extends TransactionContextBuilder {
 
     private final TransactionId transactionId;
-    private final DistributedPrimitiveCreator primitiveCreator;
-    private final TransactionCoordinator transactionCoordinator;
+    private final TransactionManager transactionManager;
 
-    public DefaultTransactionContextBuilder(TransactionId transactionId,
-            DistributedPrimitiveCreator primitiveCreator,
-            TransactionCoordinator transactionCoordinator) {
+    public DefaultTransactionContextBuilder(TransactionId transactionId, TransactionManager transactionManager) {
         this.transactionId = transactionId;
-        this.primitiveCreator = primitiveCreator;
-        this.transactionCoordinator = transactionCoordinator;
+        this.transactionManager = transactionManager;
     }
 
     @Override
     public TransactionContext build() {
         return new DefaultTransactionContext(transactionId,
-                primitiveCreator,
-                transactionCoordinator);
+                new TransactionCoordinator(transactionId, transactionManager));
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
deleted file mode 100644
index d138c9b..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.impl;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.HexString;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapTransaction;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TransactionContext;
-import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.Versioned;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Default Transactional Map implementation that provides a repeatable reads
- * transaction isolation level.
- *
- * @param <K> key type
- * @param <V> value type.
- */
-public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
-
-    private final TransactionContext txContext;
-    private static final String TX_CLOSED_ERROR = "Transaction is closed";
-    private final AsyncConsistentMap<K, V> backingMap;
-    private final ConsistentMap<K, V> backingConsistentMap;
-    private final String name;
-    private final Serializer serializer;
-    private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
-    private final Map<K, V> writeCache = Maps.newConcurrentMap();
-    private final Set<K> deleteSet = Sets.newConcurrentHashSet();
-
-    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
-    private static final String ERROR_NULL_KEY = "Null key is not allowed";
-
-    private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
-            .softValues()
-            .build(new CacheLoader<K, String>() {
-
-                @Override
-                public String load(K key) {
-                    return HexString.toHexString(serializer.encode(key));
-                }
-            });
-
-    protected K dK(String key) {
-        return serializer.decode(HexString.fromHexString(key));
-    }
-
-    public DefaultTransactionalMap(
-            String name,
-            AsyncConsistentMap<K, V> backingMap,
-            TransactionContext txContext,
-            Serializer serializer) {
-        this.name = name;
-        this.backingMap = backingMap;
-        this.backingConsistentMap = backingMap.asConsistentMap();
-        this.txContext = txContext;
-        this.serializer = serializer;
-    }
-
-    @Override
-    public V get(K key) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(key, ERROR_NULL_KEY);
-        if (deleteSet.contains(key)) {
-            return null;
-        }
-        V latest = writeCache.get(key);
-        if (latest != null) {
-            return latest;
-        } else {
-            Versioned<V> v = readCache.computeIfAbsent(key, k -> backingConsistentMap.get(k));
-            return v != null ? v.value() : null;
-        }
-    }
-
-    @Override
-    public boolean containsKey(K key) {
-        return get(key) != null;
-    }
-
-    @Override
-    public V put(K key, V value) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(value, ERROR_NULL_VALUE);
-
-        V latest = get(key);
-        writeCache.put(key, value);
-        deleteSet.remove(key);
-        return latest;
-    }
-
-    @Override
-    public V remove(K key) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        V latest = get(key);
-        if (latest != null) {
-            writeCache.remove(key);
-            deleteSet.add(key);
-        }
-        return latest;
-    }
-
-    @Override
-    public boolean remove(K key, V value) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(value, ERROR_NULL_VALUE);
-        V latest = get(key);
-        if (Objects.equal(value, latest)) {
-            remove(key);
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean replace(K key, V oldValue, V newValue) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(oldValue, ERROR_NULL_VALUE);
-        checkNotNull(newValue, ERROR_NULL_VALUE);
-        V latest = get(key);
-        if (Objects.equal(oldValue, latest)) {
-            put(key, newValue);
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public V putIfAbsent(K key, V value) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(value, ERROR_NULL_VALUE);
-        V latest = get(key);
-        if (latest == null) {
-            put(key, value);
-        }
-        return latest;
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepare() {
-        return backingMap.prepare(new MapTransaction<>(txContext.transactionId(), updates()));
-    }
-
-    @Override
-    public CompletableFuture<Void> commit() {
-        return backingMap.commit(txContext.transactionId());
-    }
-
-    @Override
-    public CompletableFuture<Void> rollback() {
-        return backingMap.rollback(txContext.transactionId());
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit() {
-        return backingMap.prepareAndCommit(new MapTransaction<>(txContext.transactionId(), updates()));
-    }
-
-    @Override
-    public int totalUpdates() {
-        return updates().size();
-    }
-
-    @Override
-    public boolean hasPendingUpdates() {
-        return updatesStream().findAny().isPresent();
-    }
-
-    protected Stream<MapUpdate<K, V>> updatesStream() {
-        return Stream.concat(
-            // 1st stream: delete ops
-            deleteSet.stream()
-                .map(key -> Pair.of(key, readCache.get(key)))
-                .filter(e -> e.getValue() != null)
-                .map(e -> MapUpdate.<K, V>newBuilder()
-                 .withMapName(name)
-                 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
-                 .withKey(e.getKey())
-                 .withCurrentVersion(e.getValue().version())
-                 .build()),
-            // 2nd stream: write ops
-            writeCache.entrySet().stream()
-                    .map(e -> {
-                        Versioned<V> original = readCache.get(e.getKey());
-                        if (original == null) {
-                            return MapUpdate.<K, V>newBuilder()
-                                    .withMapName(name)
-                                    .withType(MapUpdate.Type.PUT_IF_ABSENT)
-                                    .withKey(e.getKey())
-                                    .withValue(e.getValue())
-                                    .build();
-                        } else {
-                            return MapUpdate.<K, V>newBuilder()
-                                    .withMapName(name)
-                                    .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
-                                    .withKey(e.getKey())
-                                    .withCurrentVersion(original.version())
-                                    .withValue(e.getValue())
-                                    .build();
-                        }
-                    }));
-    }
-
-    protected List<MapUpdate<K, V>> updates() {
-        return updatesStream().collect(Collectors.toList());
-    }
-
-    protected List<MapUpdate<String, byte[]>> toMapUpdates() {
-        List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
-        deleteSet.forEach(key -> {
-            Versioned<V> original = readCache.get(key);
-            if (original != null) {
-                updates.add(MapUpdate.<String, byte[]>newBuilder()
-                        .withMapName(name)
-                        .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
-                        .withKey(keyCache.getUnchecked(key))
-                        .withCurrentVersion(original.version())
-                        .build());
-            }
-        });
-        writeCache.forEach((key, value) -> {
-            Versioned<V> original = readCache.get(key);
-            if (original == null) {
-                updates.add(MapUpdate.<String, byte[]>newBuilder()
-                        .withMapName(name)
-                        .withType(MapUpdate.Type.PUT_IF_ABSENT)
-                        .withKey(keyCache.getUnchecked(key))
-                        .withValue(serializer.encode(value))
-                        .build());
-            } else {
-                updates.add(MapUpdate.<String, byte[]>newBuilder()
-                        .withMapName(name)
-                        .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
-                        .withKey(keyCache.getUnchecked(key))
-                        .withCurrentVersion(original.version())
-                        .withValue(serializer.encode(value))
-                        .build());
-            }
-        });
-        return updates;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("backingMap", backingMap)
-                .add("updates", updates())
-                .toString();
-    }
-
-    /**
-     * Discards all changes made to this transactional map.
-     */
-    protected void abort() {
-        readCache.clear();
-        writeCache.clear();
-        deleteSet.clear();
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
new file mode 100644
index 0000000..9acc377
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * Repeatable read based map participant.
+ */
+public class DefaultTransactionalMapParticipant<K, V> extends TransactionalMapParticipant<K, V> {
+    private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
+
+    public DefaultTransactionalMapParticipant(
+            ConsistentMap<K, V> backingMap, Transaction<MapUpdate<K, V>> transaction) {
+        super(backingMap, transaction);
+    }
+
+    @Override
+    protected V read(K key) {
+        Versioned<V> value = readCache.computeIfAbsent(key, backingMap::get);
+        return value != null ? value.value() : null;
+    }
+
+    @Override
+    protected Stream<MapUpdate<K, V>> records() {
+        return Stream.concat(deleteStream(), writeStream());
+    }
+
+    /**
+     * Returns a transaction record stream for deleted keys.
+     */
+    private Stream<MapUpdate<K, V>> deleteStream() {
+        return deleteSet.stream()
+                .map(key -> Pair.of(key, readCache.get(key)))
+                .filter(e -> e.getValue() != null)
+                .map(e -> MapUpdate.<K, V>newBuilder()
+                        .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                        .withKey(e.getKey())
+                        .withCurrentVersion(e.getValue().version())
+                        .build());
+    }
+
+    /**
+     * Returns a transaction record stream for updated keys.
+     */
+    private Stream<MapUpdate<K, V>> writeStream() {
+        return writeCache.entrySet().stream().map(entry -> {
+            Versioned<V> original = readCache.get(entry.getKey());
+            if (original == null) {
+                return MapUpdate.<K, V>newBuilder()
+                        .withType(MapUpdate.Type.PUT_IF_ABSENT)
+                        .withKey(entry.getKey())
+                        .withValue(entry.getValue())
+                        .build();
+            } else {
+                return MapUpdate.<K, V>newBuilder()
+                        .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey(entry.getKey())
+                        .withCurrentVersion(original.version())
+                        .withValue(entry.getValue())
+                        .build();
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 2f4a665..c0b323a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -29,10 +29,12 @@
 import java.util.function.Predicate;
 
 import org.onosproject.core.ApplicationId;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.MoreObjects;
@@ -170,8 +172,18 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
-        return delegateMap.prepare(transaction);
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return delegateMap.begin(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        return delegateMap.prepare(transactionLog);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        return delegateMap.prepareAndCommit(transactionLog);
     }
 
     @Override
@@ -185,11 +197,6 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
-        return delegateMap.prepareAndCommit(transaction);
-    }
-
-    @Override
     public void addStatusChangeListener(Consumer<Status> listener) {
         delegateMap.addStatusChangeListener(listener);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index 48c0b85..f474f7b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -16,10 +16,12 @@
 
 package org.onosproject.store.primitives.impl;
 
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
@@ -253,9 +255,18 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(
-            MapTransaction<String, V> transaction) {
-        return delegateMap.prepare(transaction);
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return delegateMap.begin(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V>> transactionLog) {
+        return delegateMap.prepare(transactionLog);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V>> transactionLog) {
+        return delegateMap.prepareAndCommit(transactionLog);
     }
 
     @Override
@@ -269,12 +280,6 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepareAndCommit(
-            MapTransaction<String, V> transaction) {
-        return delegateMap.prepareAndCommit(transaction);
-    }
-
-    @Override
     public boolean equals(Object other) {
         if (other instanceof DelegatingAsyncConsistentTreeMap) {
             DelegatingAsyncConsistentTreeMap<V> that =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
index f8844fb..ccffead 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
@@ -26,11 +26,13 @@
 import java.util.function.Function;
 import java.util.function.Predicate;
 
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.Throwables;
@@ -63,6 +65,7 @@
     private static final String ENTRY_SET = "entrySet";
     private static final String REPLACE = "replace";
     private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
+    private static final String BEGIN = "begin";
     private static final String PREPARE = "prepare";
     private static final String COMMIT = "commit";
     private static final String ROLLBACK = "rollback";
@@ -249,10 +252,17 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        final MeteringAgent.Context timer = monitor.startTimer(BEGIN);
+        return super.begin(transactionId)
+                .whenComplete((r, e) -> timer.stop(e));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
         final MeteringAgent.Context timer = monitor.startTimer(PREPARE);
-        return super.prepare(transaction)
-                    .whenComplete((r, e) -> timer.stop(e));
+        return super.prepare(transactionLog)
+                .whenComplete((r, e) -> timer.stop(e));
     }
 
     @Override
@@ -270,10 +280,10 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
         final MeteringAgent.Context timer = monitor.startTimer(PREPARE_AND_COMMIT);
-        return super.prepareAndCommit(transaction)
-                    .whenComplete((r, e) -> timer.stop(e));
+        return super.prepareAndCommit(transactionLog)
+                .whenComplete((r, e) -> timer.stop(e));
     }
 
     private class InternalMeteredMapEventListener implements MapEventListener<K, V> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 9adcb33..bf77105 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -18,7 +18,6 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -37,12 +36,12 @@
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -200,54 +199,28 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        throw new UnsupportedOperationException();
+    }
 
-        Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
-        transaction.updates().forEach(update -> {
-            AsyncConsistentMap<K, V> map = getMap(update.key());
-            updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
-        });
-        Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
-                Maps.transformValues(updatesGroupedByMap,
-                                     list -> new MapTransaction<>(transaction.transactionId(), list));
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        throw new UnsupportedOperationException();
+    }
 
-        return Tools.allOf(transactionsByMap.entrySet()
-                         .stream()
-                         .map(e -> e.getKey().prepare(e.getValue()))
-                         .collect(Collectors.toList()))
-                    .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return CompletableFuture.allOf(getMaps().stream()
-                                                .map(e -> e.commit(transactionId))
-                                                .toArray(CompletableFuture[]::new));
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return CompletableFuture.allOf(getMaps().stream()
-                .map(e -> e.rollback(transactionId))
-                .toArray(CompletableFuture[]::new));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
-        Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
-        transaction.updates().forEach(update -> {
-            AsyncConsistentMap<K, V> map = getMap(update.key());
-            updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
-        });
-        Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
-                Maps.transformValues(updatesGroupedByMap,
-                                     list -> new MapTransaction<>(transaction.transactionId(), list));
-
-        return Tools.allOf(transactionsByMap.entrySet()
-                                            .stream()
-                                            .map(e -> e.getKey().prepareAndCommit(e.getValue()))
-                                            .collect(Collectors.toList()))
-                    .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+        throw new UnsupportedOperationException();
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java
new file mode 100644
index 0000000..90b3524
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.service.TransactionalMap;
+
+/**
+ * Partitioned transactional map.
+ */
+public class PartitionedTransactionalMap<K, V> implements TransactionalMap<K, V> {
+    protected final Map<PartitionId, TransactionalMapParticipant<K, V>> partitions;
+    protected final Hasher<K> hasher;
+
+    public PartitionedTransactionalMap(
+            Map<PartitionId, TransactionalMapParticipant<K, V>> partitions, Hasher<K> hasher) {
+        this.partitions = partitions;
+        this.hasher = hasher;
+    }
+
+    /**
+     * Returns the collection of map partitions.
+     *
+     * @return a collection of map partitions
+     */
+    @SuppressWarnings("unchecked")
+    Collection<TransactionParticipant> participants() {
+        return (Collection) partitions.values();
+    }
+
+    /**
+     * Returns the partition for the given key.
+     *
+     * @param key the key for which to return the partition
+     * @return the partition for the given key
+     */
+    private TransactionalMap<K, V> partition(K key) {
+        return partitions.get(hasher.hash(key));
+    }
+
+    @Override
+    public V get(K key) {
+        return partition(key).get(key);
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+        return partition(key).containsKey(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+        return partition(key).put(key, value);
+    }
+
+    @Override
+    public V remove(K key) {
+        return partition(key).remove(key);
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        return partition(key).putIfAbsent(key, value);
+    }
+
+    @Override
+    public boolean remove(K key, V value) {
+        return partition(key).remove(key, value);
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        return partition(key).replace(key, oldValue, newValue);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("partitions", partitions.values())
+                .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 48d9b7e..1fc821d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -42,7 +42,6 @@
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
@@ -69,7 +68,6 @@
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
 
 /**
  * Implementation for {@code StorageService} and {@code StorageAdminService}.
@@ -98,8 +96,7 @@
     private final Supplier<TransactionId> transactionIdGenerator =
             () -> TransactionId.from(UUID.randomUUID().toString());
     private DistributedPrimitiveCreator federatedPrimitiveCreator;
-    private AsyncConsistentMap<TransactionId, Transaction.State> transactions;
-    private TransactionCoordinator transactionCoordinator;
+    private TransactionManager transactionManager;
 
     @Activate
     public void activate() {
@@ -108,13 +105,7 @@
             .filter(id -> !id.equals(PartitionId.from(0)))
             .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
         federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
-        transactions = this.<TransactionId, Transaction.State>consistentMapBuilder()
-                    .withName("onos-transactions")
-                    .withSerializer(Serializer.using(KryoNamespaces.API,
-                            Transaction.class,
-                            Transaction.State.class))
-                    .buildAsyncMap();
-        transactionCoordinator = new TransactionCoordinator(transactions);
+        transactionManager = new TransactionManager(this, partitionService);
         log.info("Started");
     }
 
@@ -187,9 +178,7 @@
     @Override
     public TransactionContextBuilder transactionContextBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultTransactionContextBuilder(transactionIdGenerator.get(),
-                federatedPrimitiveCreator,
-                transactionCoordinator);
+        return new DefaultTransactionContextBuilder(transactionIdGenerator.get(), transactionManager);
     }
 
     @Override
@@ -259,7 +248,7 @@
 
     @Override
     public Collection<TransactionId> getPendingTransactions() {
-        return Futures.getUnchecked(transactions.keySet());
+        return transactionManager.getPendingTransactions();
     }
 
     private List<MapInfo> listMapInfo(DistributedPrimitiveCreator creator) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index be5f05a..82b3919 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Laboratory
+ * Copyright 2017-present Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,88 +16,263 @@
 package org.onosproject.store.primitives.impl;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Transactional;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.TransactionContext;
+import org.onosproject.store.service.TransactionException;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
- * An immutable transaction object.
+ * Manages a transaction within the context of a single primitive.
+ * <p>
+ * The {@code Transaction} object is used to manage the transaction for a single partition primitive that implements
+ * the {@link Transactional} interface. It's used as a proxy for {@link TransactionContext}s to manage the transaction
+ * as it relates to a single piece of atomic state.
  */
-public class Transaction {
+public class Transaction<T> {
 
+    /**
+     * Transaction state.
+     * <p>
+     * The transaction state is used to indicate the phase within which the transaction is currently running.
+     */
     enum State {
+
         /**
-         * Indicates a new transaction that is about to be prepared. All transactions
-         * start their life in this state.
+         * Active transaction state.
+         * <p>
+         * The {@code ACTIVE} state represents a transaction in progress. Active transactions may or may not affect
+         * concurrently running transactions depending on the transaction's isolation level.
+         */
+        ACTIVE,
+
+        /**
+         * Preparing transaction state.
+         * <p>
+         * Once a transaction commitment begins, it enters the {@code PREPARING} phase of the two-phase commit protocol.
          */
         PREPARING,
 
         /**
-         * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+         * Prepared transaction state.
+         * <p>
+         * Once the first phase of the two-phase commit protocol is complete, the transaction's state is set to
+         * {@code PREPARED}.
          */
         PREPARED,
 
         /**
-         * Indicates a transaction that is about to be committed.
+         * Committing transaction state.
+         * <p>
+         * The {@code COMMITTING} state represents a transaction within the second phase of the two-phase commit
+         * protocol.
          */
         COMMITTING,
 
         /**
-         * Indicates a transaction that has successfully committed.
+         * Committed transaction state.
+         * <p>
+         * Once the second phase of the two-phase commit protocol is complete, the transaction's state is set to
+         * {@code COMMITTED}.
          */
         COMMITTED,
 
         /**
-         * Indicates a transaction that is about to be rolled back.
+         * Rolling back transaction state.
+         * <p>
+         * In the event of a two-phase lock failure, when the transaction is rolled back it will enter the
+         * {@code ROLLING_BACK} state while the rollback is in progress.
          */
-        ROLLINGBACK,
+        ROLLING_BACK,
 
         /**
-         * Indicates a transaction that has been rolled back and all locks are released.
+         * Rolled back transaction state.
+         * <p>
+         * Once a transaction has been rolled back, it will enter the {@code ROLLED_BACK} state.
          */
-        ROLLEDBACK
+        ROLLED_BACK,
     }
 
-    private final TransactionId transactionId;
-    private final List<MapUpdate<String, byte[]>> updates;
-    private final State state;
+    private static final String TX_OPEN_ERROR = "transaction already open";
+    private static final String TX_CLOSED_ERROR = "transaction not open";
+    private static final String TX_INACTIVE_ERROR = "transaction is not active";
+    private static final String TX_UNPREPARED_ERROR = "transaction has not been prepared";
 
-    public Transaction(TransactionId transactionId, List<MapUpdate<String, byte[]>> updates) {
-        this(transactionId, updates, State.PREPARING);
-    }
+    protected final TransactionId transactionId;
+    protected final Transactional<T> transactionalObject;
+    private final AtomicBoolean open = new AtomicBoolean();
+    private volatile State state = State.ACTIVE;
 
-    private Transaction(TransactionId transactionId,
-            List<MapUpdate<String, byte[]>> updates,
-            State state) {
+    public Transaction(TransactionId transactionId, Transactional<T> transactionalObject) {
         this.transactionId = transactionId;
-        this.updates = ImmutableList.copyOf(updates);
-        this.state = state;
+        this.transactionalObject = transactionalObject;
     }
 
-    public TransactionId id() {
+    /**
+     * Returns the transaction identifier.
+     *
+     * @return the transaction identifier
+     */
+    public TransactionId transactionId() {
         return transactionId;
     }
 
-    public List<MapUpdate<String, byte[]>> updates() {
-        return updates;
-    }
-
+    /**
+     * Returns the current transaction state.
+     *
+     * @return the current transaction state
+     */
     public State state() {
         return state;
     }
 
-    public Transaction transition(State newState) {
-        return new Transaction(transactionId, updates, newState);
+    /**
+     * Returns a boolean indicating whether the transaction is open.
+     *
+     * @return indicates whether the transaction is open
+     */
+    public boolean isOpen() {
+        return open.get();
+    }
+
+    /**
+     * Opens the transaction, throwing an {@link IllegalStateException} if it's already open.
+     */
+    protected void open() {
+        if (!open.compareAndSet(false, true)) {
+            throw new IllegalStateException(TX_OPEN_ERROR);
+        }
+    }
+
+    /**
+     * Checks that the transaction is open and throws an {@link IllegalStateException} if not.
+     */
+    protected void checkOpen() {
+        checkState(isOpen(), TX_CLOSED_ERROR);
+    }
+
+    /**
+     * Checks that the transaction state is {@code ACTIVE} and throws an {@link IllegalStateException} if not.
+     */
+    protected void checkActive() {
+        checkState(state == State.ACTIVE, TX_INACTIVE_ERROR);
+    }
+
+    /**
+     * Checks that the transaction state is {@code PREPARED} and throws an {@link IllegalStateException} if not.
+     */
+    protected void checkPrepared() {
+        checkState(state == State.PREPARED, TX_UNPREPARED_ERROR);
+    }
+
+    /**
+     * Updates the transaction state.
+     *
+     * @param state the updated transaction state
+     */
+    protected void setState(State state) {
+        this.state = state;
+    }
+
+    /**
+     * Begins the transaction.
+     * <p>
+     * Locks are acquired when the transaction is begun to prevent concurrent transactions from operating on the shared
+     * resource to which this transaction relates.
+     *
+     * @return a completable future to be completed once the transaction has been started
+     */
+    public CompletableFuture<Version> begin() {
+        open();
+        return transactionalObject.begin(transactionId);
+    }
+
+    /**
+     * Prepares the transaction.
+     * <p>
+     * When preparing the transaction, the given list of updates for the shared resource will be prepared, and
+     * concurrent modification checks will be performed. The returned future may be completed with a
+     * {@link TransactionException} if a concurrent modification is detected for an isolation level that does
+     * not allow such modifications.
+     *
+     * @param updates the transaction updates
+     * @return a completable future to be completed once the transaction has been prepared
+     */
+    public CompletableFuture<Boolean> prepare(List<T> updates) {
+        checkOpen();
+        checkActive();
+        setState(State.PREPARING);
+        return transactionalObject.prepare(new TransactionLog<T>(transactionId, updates))
+                .thenApply(succeeded -> {
+                    setState(State.PREPARED);
+                    return succeeded;
+                });
+    }
+
+    /**
+     * Prepares and commits the transaction in a single atomic operation.
+     * <p>
+     * Both the prepare and commit phases of the protocol must be executed within a single atomic operation. This method
+     * is used to optimize committing transactions that operate only on a single partition within a single primitive.
+     *
+     * @param updates the transaction updates
+     * @return a completable future to be completed once the transaction has been prepared
+     */
+    public CompletableFuture<Boolean> prepareAndCommit(List<T> updates) {
+        checkOpen();
+        checkActive();
+        setState(State.PREPARING);
+        return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, updates))
+                .thenApply(succeeded -> {
+                    setState(State.COMMITTED);
+                    return succeeded;
+                });
+    }
+
+    /**
+     * Commits the transaction.
+     * <p>
+     * Performs the second phase of the two-phase commit protocol, committing the previously
+     * {@link #prepare(List) prepared} updates.
+     *
+     * @return a completable future to be completed once the transaction has been committed
+     */
+    public CompletableFuture<Void> commit() {
+        checkOpen();
+        checkPrepared();
+        setState(State.COMMITTING);
+        return transactionalObject.commit(transactionId).thenRun(() -> {
+            setState(State.COMMITTED);
+        });
+    }
+
+    /**
+     * Rolls back the transaction.
+     * <p>
+     * Rolls back the first phase of the two-phase commit protocol, cancelling prepared updates.
+     *
+     * @return a completable future to be completed once the transaction has been rolled back
+     */
+    public CompletableFuture<Void> rollback() {
+        checkOpen();
+        checkPrepared();
+        setState(State.ROLLING_BACK);
+        return transactionalObject.rollback(transactionId).thenRun(() -> {
+            setState(State.ROLLED_BACK);
+        });
     }
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(getClass())
+        return toStringHelper(this)
                 .add("transactionId", transactionId)
-                .add("updates", updates)
                 .add("state", state)
                 .toString();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 160ecf9..9826266 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -19,78 +19,123 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Sets;
 import org.onlab.util.Tools;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.CommitStatus;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionalMap;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
 
 /**
- * Coordinator for a two-phase commit protocol.
+ * Transaction coordinator.
  */
 public class TransactionCoordinator {
+    protected final TransactionId transactionId;
+    protected final TransactionManager transactionManager;
+    protected final Set<TransactionParticipant> transactionParticipants = Sets.newConcurrentHashSet();
 
-    private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
-
-    public TransactionCoordinator(AsyncConsistentMap<TransactionId, Transaction.State> transactions) {
-        this.transactions = transactions;
+    public TransactionCoordinator(TransactionId transactionId, TransactionManager transactionManager) {
+        this.transactionId = transactionId;
+        this.transactionManager = transactionManager;
     }
 
     /**
-     * Commits a transaction.
+     * Returns a transactional map for this transaction.
      *
-     * @param transactionId transaction identifier
-     * @param transactionParticipants set of transaction participants
-     * @return future for commit result
+     * @param name the transactional map name
+     * @param serializer the serializer
+     * @param <K> key type
+     * @param <V> value type
+     * @return a transactional map for this transaction
      */
-    CompletableFuture<CommitStatus> commit(TransactionId transactionId,
-                                           Set<TransactionParticipant> transactionParticipants) {
-        int totalUpdates = transactionParticipants.stream()
-                                                  .map(TransactionParticipant::totalUpdates)
-                                                  .reduce(Math::addExact)
-                                                  .orElse(0);
+    public <K, V> TransactionalMap<K, V> getTransactionalMap(String name, Serializer serializer) {
+        PartitionedTransactionalMap<K, V> map = transactionManager.getTransactionalMap(name, serializer, this);
+        transactionParticipants.addAll(map.participants());
+        return map;
+    }
 
-        if (totalUpdates == 0) {
+    /**
+     * Commits the transaction.
+     *
+     * @return the transaction commit status
+     */
+    public CompletableFuture<CommitStatus> commit() {
+        long totalParticipants = transactionParticipants.stream()
+                .filter(TransactionParticipant::hasPendingUpdates)
+                .count();
+
+        if (totalParticipants == 0) {
             return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
-        } else if (totalUpdates == 1) {
+        } else if (totalParticipants == 1) {
             return transactionParticipants.stream()
-                                          .filter(p -> p.totalUpdates() == 1)
-                                          .findFirst()
-                                          .get()
-                                          .prepareAndCommit()
-                                          .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
+                    .filter(TransactionParticipant::hasPendingUpdates)
+                    .findFirst()
+                    .get()
+                    .prepareAndCommit()
+                    .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
         } else {
-            CompletableFuture<CommitStatus> status =  transactions.put(transactionId, Transaction.State.PREPARING)
-                    .thenCompose(v -> this.doPrepare(transactionParticipants))
+            Set<TransactionParticipant> transactionParticipants = this.transactionParticipants.stream()
+                    .filter(TransactionParticipant::hasPendingUpdates)
+                    .collect(Collectors.toSet());
+
+            CompletableFuture<CommitStatus> status = transactionManager.updateState(
+                    transactionId, Transaction.State.PREPARING)
+                    .thenCompose(v -> prepare(transactionParticipants))
                     .thenCompose(result -> result
-                            ? transactions.put(transactionId, Transaction.State.COMMITTING)
-                                          .thenCompose(v -> doCommit(transactionParticipants))
-                                          .thenApply(v -> CommitStatus.SUCCESS)
-                            : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
-                                          .thenCompose(v -> doRollback(transactionParticipants))
-                                          .thenApply(v -> CommitStatus.FAILURE));
-            return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
+                            ? transactionManager.updateState(transactionId, Transaction.State.COMMITTING)
+                            .thenCompose(v -> commit(transactionParticipants))
+                            .thenApply(v -> CommitStatus.SUCCESS)
+                            : transactionManager.updateState(transactionId, Transaction.State.ROLLING_BACK)
+                            .thenCompose(v -> rollback(transactionParticipants))
+                            .thenApply(v -> CommitStatus.FAILURE));
+            return status.thenCompose(v -> transactionManager.remove(transactionId).thenApply(u -> v));
         }
     }
 
-    private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
+    /**
+     * Performs the prepare phase of the two-phase commit protocol for the given transaction participants.
+     *
+     * @param transactionParticipants the transaction participants for which to prepare the transaction
+     * @return a completable future indicating whether <em>all</em> prepares succeeded
+     */
+    protected CompletableFuture<Boolean> prepare(Set<TransactionParticipant> transactionParticipants) {
         return Tools.allOf(transactionParticipants.stream()
-                                                  .filter(TransactionParticipant::hasPendingUpdates)
-                                                  .map(TransactionParticipant::prepare)
-                                                  .collect(Collectors.toList()))
-                    .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+                .map(TransactionParticipant::prepare)
+                .collect(Collectors.toList()))
+                .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
     }
 
-    private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
+    /**
+     * Performs the commit phase of the two-phase commit protocol for the given transaction participants.
+     *
+     * @param transactionParticipants the transaction participants for which to commit the transaction
+     * @return a completable future to be completed once the commits are complete
+     */
+    protected CompletableFuture<Void> commit(Set<TransactionParticipant> transactionParticipants) {
         return CompletableFuture.allOf(transactionParticipants.stream()
-                                                              .filter(TransactionParticipant::hasPendingUpdates)
-                                                              .map(TransactionParticipant::commit)
-                                                              .toArray(CompletableFuture[]::new));
+                .map(TransactionParticipant::commit)
+                .toArray(CompletableFuture[]::new));
     }
 
-    private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
+    /**
+     * Rolls back transactions for the given participants.
+     *
+     * @param transactionParticipants the transaction participants for which to roll back the transaction
+     * @return a completable future to be completed once the rollbacks are complete
+     */
+    protected CompletableFuture<Void> rollback(Set<TransactionParticipant> transactionParticipants) {
         return CompletableFuture.allOf(transactionParticipants.stream()
-                                                              .filter(TransactionParticipant::hasPendingUpdates)
-                                                              .map(TransactionParticipant::rollback)
-                                                              .toArray(CompletableFuture[]::new));
+                .map(TransactionParticipant::rollback)
+                .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("transactionId", transactionId)
+                .add("participants", transactionParticipants)
+                .toString();
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
new file mode 100644
index 0000000..5c246de
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.Futures;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.PartitionService;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.TransactionException;
+
+/**
+ * Transaction manager for managing state shared across multiple transactions.
+ */
+public class TransactionManager {
+    private static final int DEFAULT_CACHE_SIZE = 100;
+
+    private final PartitionService partitionService;
+    private final List<PartitionId> sortedPartitions;
+    private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
+    private final int cacheSize;
+    private final Map<PartitionId, Cache<String, AsyncConsistentMap>> partitionCache = Maps.newConcurrentMap();
+
+    public TransactionManager(StorageService storageService, PartitionService partitionService) {
+        this(storageService, partitionService, DEFAULT_CACHE_SIZE);
+    }
+
+    public TransactionManager(StorageService storageService, PartitionService partitionService, int cacheSize) {
+        this.partitionService = partitionService;
+        this.cacheSize = cacheSize;
+        this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder()
+                .withName("onos-transactions")
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                        Transaction.class,
+                        Transaction.State.class))
+                .buildAsyncMap();
+        this.sortedPartitions = Lists.newArrayList(partitionService.getAllPartitionIds());
+        Collections.sort(sortedPartitions);
+    }
+
+    /**
+     * Returns the collection of currently pending transactions.
+     *
+     * @return a collection of currently pending transactions
+     */
+    public Collection<TransactionId> getPendingTransactions() {
+        return Futures.getUnchecked(transactions.keySet());
+    }
+
+    /**
+     * Returns a partitioned transactional map for use within a transaction context.
+     * <p>
+     * The transaction coordinator will return a map that takes advantage of caching that's shared across transaction
+     * contexts.
+     *
+     * @param name the map name
+     * @param serializer the map serializer
+     * @param transactionCoordinator the transaction coordinator for which the map is being created
+     * @param <K> key type
+     * @param <V> value type
+     * @return a partitioned transactional map
+     */
+    <K, V> PartitionedTransactionalMap<K, V> getTransactionalMap(
+            String name,
+            Serializer serializer,
+            TransactionCoordinator transactionCoordinator) {
+        Map<PartitionId, TransactionalMapParticipant<K, V>> partitions = new HashMap<>();
+        for (PartitionId partitionId : partitionService.getAllPartitionIds()) {
+            partitions.put(partitionId, getTransactionalMapPartition(
+                    name, partitionId, serializer, transactionCoordinator));
+        }
+
+        Hasher<K> hasher = key -> {
+            int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
+            return sortedPartitions.get(Math.abs(hashCode) % sortedPartitions.size());
+        };
+        return new PartitionedTransactionalMap<>(partitions, hasher);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <K, V> TransactionalMapParticipant<K, V> getTransactionalMapPartition(
+            String mapName,
+            PartitionId partitionId,
+            Serializer serializer,
+            TransactionCoordinator transactionCoordinator) {
+        Cache<String, AsyncConsistentMap> mapCache = partitionCache.computeIfAbsent(partitionId, p ->
+                CacheBuilder.newBuilder().maximumSize(cacheSize / partitionService.getNumberOfPartitions()).build());
+        try {
+            AsyncConsistentMap<K, V> baseMap = partitionService.getDistributedPrimitiveCreator(partitionId)
+                            .newAsyncConsistentMap(mapName, serializer);
+            AsyncConsistentMap<K, V> asyncMap = mapCache.get(mapName, () ->
+                    DistributedPrimitives.newCachingMap(baseMap));
+
+            Transaction<MapUpdate<K, V>> transaction = new Transaction<>(
+                    transactionCoordinator.transactionId,
+                    baseMap);
+            return new DefaultTransactionalMapParticipant<>(asyncMap.asConsistentMap(), transaction);
+        } catch (ExecutionException e) {
+            throw new TransactionException(e);
+        }
+    }
+
+    /**
+     * Updates the state of a transaction in the transaction registry.
+     *
+     * @param transactionId the transaction identifier
+     * @param state the state of the transaction
+     * @return a completable future to be completed once the transaction state has been updated in the registry
+     */
+    CompletableFuture<Void> updateState(TransactionId transactionId, Transaction.State state) {
+        return transactions.put(transactionId, state).thenApply(v -> null);
+    }
+
+    /**
+     * Removes the given transaction from the transaction registry.
+     *
+     * @param transactionId the transaction identifier
+     * @return a completable future to be completed once the transaction state has been removed from the registry
+     */
+    CompletableFuture<Void> remove(TransactionId transactionId) {
+        return transactions.remove(transactionId).thenApply(v -> null);
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
index 0780bf6..f97d448 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
@@ -23,40 +23,38 @@
 public interface TransactionParticipant {
 
     /**
-     * Returns if this participant has updates that need to be committed.
-     * @return {@code true} if yes; {@code false} otherwise
+     * Returns a boolean indicating whether the participant has pending updates.
+     *
+     * @return indicates whether the participant has pending updates
      */
-    default boolean hasPendingUpdates() {
-        return totalUpdates() > 0;
-    }
-
-    /**
-     * Returns the number of updates that need to committed for this participant.
-     * @return update count.
-     */
-    int totalUpdates();
-
-    /**
-     * Executes the prepare and commit steps in a single go.
-     * @return {@code true} is successful i.e updates are committed; {@code false} otherwise
-     */
-    CompletableFuture<Boolean> prepareAndCommit();
+    boolean hasPendingUpdates();
 
     /**
      * Executes the prepare phase.
+     *
      * @return {@code true} is successful; {@code false} otherwise
      */
     CompletableFuture<Boolean> prepare();
 
     /**
      * Attempts to execute the commit phase for previously prepared transaction.
+     *
      * @return future that is completed when the operation completes
      */
     CompletableFuture<Void> commit();
 
     /**
+     * Executes the prepare and commit phases atomically.
+     *
+     * @return {@code true} is successful; {@code false} otherwise
+     */
+    CompletableFuture<Boolean> prepareAndCommit();
+
+    /**
      * Attempts to execute the rollback phase for previously prepared transaction.
+     *
      * @return future that is completed when the operation completes
      */
     CompletableFuture<Void> rollback();
+
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
new file mode 100644
index 0000000..4429a1b
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.TransactionException;
+import org.onosproject.store.service.TransactionalMap;
+import org.onosproject.store.service.Version;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Base class for participants within a single {@link TransactionalMap}.
+ * <p>
+ * This class provides the basic functionality required by transactional map participants and provides methods
+ * for defining operations specific to individual isolation levels.
+ *
+ * @param <K> key type
+ * @param <V> value type.
+ */
+public abstract class TransactionalMapParticipant<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
+    private static final String TX_CLOSED_ERROR = "Transaction is closed";
+    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+    private static final String ERROR_NULL_KEY = "Null key is not allowed";
+
+    protected final ConsistentMap<K, V> backingMap;
+    protected final Transaction<MapUpdate<K, V>> transaction;
+    protected final Map<K, V> writeCache = Maps.newConcurrentMap();
+    protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
+    protected final List<MapUpdate<K, V>> log = new ArrayList<>();
+    protected volatile Version lock;
+
+    protected TransactionalMapParticipant(
+            ConsistentMap<K, V> backingMap,
+            Transaction<MapUpdate<K, V>> transaction) {
+        this.backingMap = backingMap;
+        this.transaction = transaction;
+    }
+
+    /**
+     * Starts the transaction for this partition when a read occurs.
+     * <p>
+     * Acquiring a pessimistic lock at the start of the transaction ensures that underlying cached maps have been
+     * synchronized prior to a read.
+     */
+    private void beginTransaction() {
+        if (!transaction.isOpen()) {
+            try {
+                lock = transaction.begin()
+                        .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new TransactionException.Interrupted();
+            } catch (TimeoutException e) {
+                throw new TransactionException.Timeout();
+            } catch (ExecutionException e) {
+                Throwables.propagateIfPossible(e.getCause());
+                throw new TransactionException(e.getCause());
+            }
+        }
+    }
+
+    @Override
+    public V get(K key) {
+        // Start the transaction for this primitive/partition if necessary.
+        beginTransaction();
+
+        checkState(transaction.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(key, ERROR_NULL_KEY);
+
+        if (deleteSet.contains(key)) {
+            return null;
+        }
+
+        V latest = writeCache.get(key);
+        if (latest != null) {
+            return latest;
+        } else {
+            return read(key);
+        }
+    }
+
+    /**
+     * Executes a get operation based on the transaction isolation level.
+     *
+     * @param key the key to look up
+     * @return the value
+     */
+    protected abstract V read(K key);
+
+    @Override
+    public boolean containsKey(K key) {
+        return get(key) != null;
+    }
+
+    @Override
+    public V put(K key, V value) {
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        writeCache.put(key, value);
+        deleteSet.remove(key);
+        return latest;
+    }
+
+    @Override
+    public V remove(K key) {
+        V latest = get(key);
+        if (latest != null) {
+            writeCache.remove(key);
+            deleteSet.add(key);
+        }
+        return latest;
+    }
+
+    @Override
+    public boolean remove(K key, V value) {
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        if (Objects.equal(value, latest)) {
+            remove(key);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        checkNotNull(oldValue, ERROR_NULL_VALUE);
+        checkNotNull(newValue, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        if (Objects.equal(oldValue, latest)) {
+            put(key, newValue);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        if (latest == null) {
+            put(key, value);
+        }
+        return latest;
+    }
+
+    @Override
+    public boolean hasPendingUpdates() {
+        return records().findAny().isPresent();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare() {
+        return transaction.prepare(log());
+    }
+
+    @Override
+    public CompletableFuture<Void> commit() {
+        return transaction.commit();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit() {
+        return transaction.prepareAndCommit(log());
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback() {
+        return transaction.rollback();
+    }
+
+    /**
+     * Returns a list of updates performed within this map partition.
+     *
+     * @return a list of map updates
+     */
+    protected List<MapUpdate<K, V>> log() {
+        return records().collect(Collectors.toList());
+    }
+
+    /**
+     * Returns a stream of updates performed within this map partition.
+     *
+     * @return a stream of map updates
+     */
+    protected abstract Stream<MapUpdate<K, V>> records();
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("backingMap", backingMap)
+                .add("updates", log())
+                .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 2afb5df..c2f167a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -29,11 +29,13 @@
 import java.util.stream.Collectors;
 
 import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Maps;
@@ -266,9 +268,27 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K1, V1> transaction) {
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
         try {
-            return backingMap.prepare(transaction.map(keyEncoder, valueEncoder));
+            return backingMap.begin(transactionId);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K1, V1>> transactionLog) {
+        try {
+            return backingMap.prepare(transactionLog.map(record -> record.map(keyEncoder, valueEncoder)));
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K1, V1>> transactionLog) {
+        try {
+            return backingMap.prepareAndCommit(transactionLog.map(record -> record.map(keyEncoder, valueEncoder)));
         } catch (Exception e) {
             return Tools.exceptionalFuture(e);
         }
@@ -276,18 +296,17 @@
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return backingMap.commit(transactionId);
+        try {
+            return backingMap.commit(transactionId);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return backingMap.rollback(transactionId);
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K1, V1> transaction) {
         try {
-            return backingMap.prepareAndCommit(transaction.map(keyEncoder, valueEncoder));
+            return backingMap.rollback(transactionId);
         } catch (Exception e) {
             return Tools.exceptionalFuture(e);
         }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index 747008f..2f0683e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -18,11 +18,13 @@
 
 import com.google.common.collect.Maps;
 import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
@@ -361,27 +363,30 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(
-            MapTransaction<String, V1> transaction) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V1>> transactionLog) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V1>> transactionLog) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");    }
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");    }
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
 
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(
-            MapTransaction<String, V1> transaction) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");    }
     private class InternalBackingMapEventListener
             implements MapEventListener<String, V2> {
 
