[ONOS-6342] Refactor transaction architecture to support a shared cache for transactional primitives

Change-Id: I2a17965100895f5aa4d2202028047bb980c11d26
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
index 3b12021..e04f10e 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
@@ -19,6 +19,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.Objects;
 import java.util.function.Function;
 
 import org.onlab.util.ByteArraySizeHashPrinter;
@@ -42,6 +43,7 @@
          * Insert/Update entry without any checks.
          */
         PUT,
+
         /**
          * Insert an entry iff there is no existing entry for that key.
          */
@@ -73,7 +75,6 @@
         REMOVE_IF_VALUE_MATCH,
     }
 
-    private String mapName;
     private Type type;
     private K key;
     private V value;
@@ -81,15 +82,6 @@
     private long currentVersion = -1;
 
     /**
-     * Returns the name of the map.
-     *
-     * @return map name
-     */
-    public String mapName() {
-        return mapName;
-    }
-
-    /**
      * Returns the type of update operation.
      * @return type of update.
      */
@@ -140,7 +132,6 @@
      */
     public <S, T> MapUpdate<S, T> map(Function<K, S> keyMapper, Function<V, T> valueMapper) {
         return MapUpdate.<S, T>newBuilder()
-                .withMapName(mapName)
                 .withType(type)
                 .withKey(keyMapper.apply(key))
                 .withValue(value == null ? null : valueMapper.apply(value))
@@ -150,9 +141,26 @@
     }
 
     @Override
+    public int hashCode() {
+        return Objects.hash(type, key, value, currentValue, currentVersion);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (object instanceof MapUpdate) {
+            MapUpdate that = (MapUpdate) object;
+            return this.type == that.type
+                    && Objects.equals(this.key, that.key)
+                    && Objects.equals(this.value, that.value)
+                    && Objects.equals(this.currentValue, that.currentValue)
+                    && Objects.equals(this.currentVersion, that.currentVersion);
+        }
+        return false;
+    }
+
+    @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
-            .add("mapName", mapName)
             .add("type", type)
             .add("key", key)
             .add("value", value instanceof byte[] ? new ByteArraySizeHashPrinter((byte[]) value) : value)
@@ -187,11 +195,6 @@
             return update;
         }
 
-        public Builder<K, V> withMapName(String name) {
-            update.mapName = checkNotNull(name, "name cannot be null");
-            return this;
-        }
-
         public Builder<K, V> withType(Type type) {
             update.type = checkNotNull(type, "type cannot be null");
             return this;
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 8bfd952..0704853 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -17,8 +17,8 @@
 package org.onosproject.store.service;
 
 import java.util.Collection;
-import java.util.Objects;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -26,10 +26,9 @@
 import java.util.function.Function;
 import java.util.function.Predicate;
 
-import org.onosproject.store.primitives.DefaultConsistentMap;
-import org.onosproject.store.primitives.TransactionId;
-
 import com.google.common.util.concurrent.MoreExecutors;
+import org.onosproject.store.primitives.DefaultConsistentMap;
+import org.onosproject.store.primitives.MapUpdate;
 
 /**
  * A distributed, strongly consistent map whose methods are all executed asynchronously.
@@ -55,7 +54,7 @@
  * the returned future will be {@link CompletableFuture#complete completed} when the
  * operation finishes.
  */
-public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
+public interface AsyncConsistentMap<K, V> extends DistributedPrimitive, Transactional<MapUpdate<K, V>> {
 
     @Override
     default DistributedPrimitive.Type primitiveType() {
@@ -346,36 +345,6 @@
     CompletableFuture<Void> removeListener(MapEventListener<K, V> listener);
 
     /**
-     * Prepares a transaction for commitment.
-     * @param transaction transaction
-     * @return {@code true} if prepare is successful and transaction is ready to be committed;
-     * {@code false} otherwise
-     */
-    CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction);
-
-    /**
-     * Commits a previously prepared transaction.
-     * @param transactionId transaction identifier
-     * @return future that will be completed when the operation finishes
-     */
-    CompletableFuture<Void> commit(TransactionId transactionId);
-
-    /**
-     * Aborts a previously prepared transaction.
-     * @param transactionId transaction identifier
-     * @return future that will be completed when the operation finishes
-     */
-    CompletableFuture<Void> rollback(TransactionId transactionId);
-
-    /**
-     * Prepares a transaction and commits it in one go.
-     * @param transaction transaction
-     * @return {@code true} if operation is successful and updates are committed
-     * {@code false} otherwise
-     */
-    CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction);
-
-    /**
      * Returns a new {@link ConsistentMap} that is backed by this instance.
      *
      * @return new {@code ConsistentMap} instance
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapTransaction.java b/core/api/src/main/java/org/onosproject/store/service/MapTransaction.java
deleted file mode 100644
index acedf17..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/MapTransaction.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service;
-
-import java.util.List;
-import java.util.function.Function;
-
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-/**
- * Collection of map updates to be committed atomically.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class MapTransaction<K, V> {
-
-    private final TransactionId transactionId;
-    private final List<MapUpdate<K, V>> updates;
-
-    public MapTransaction(TransactionId transactionId, List<MapUpdate<K, V>> updates) {
-        this.transactionId = transactionId;
-        this.updates = ImmutableList.copyOf(updates);
-    }
-
-    /**
-     * Returns the transaction identifier.
-     *
-     * @return transaction id
-     */
-    public TransactionId transactionId() {
-        return transactionId;
-    }
-
-    /**
-     * Returns the list of map updates.
-     *
-     * @return map updates
-     */
-    public List<MapUpdate<K, V>> updates() {
-        return updates;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("transactionId", transactionId)
-                .add("updates", updates)
-                .toString();
-    }
-
-    /**
-     * Maps this instance to another {@code MapTransaction} with different key and value types.
-     *
-     * @param keyMapper function for mapping key types
-     * @param valueMapper function for mapping value types
-     * @return newly typed instance
-     *
-     * @param <S> key type of returned instance
-     * @param <T> value type of returned instance
-     */
-    public <S, T> MapTransaction<S, T> map(Function<K, S> keyMapper, Function<V, T> valueMapper) {
-        return new MapTransaction<>(transactionId, Lists.transform(updates, u -> u.map(keyMapper, valueMapper)));
-    }
-}
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java b/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java
new file mode 100644
index 0000000..c367f5d
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.onosproject.store.primitives.TransactionId;
+
+/**
+ * Collection of transaction updates to be applied atomically.
+ *
+ * @param <T> log record type
+ */
+public class TransactionLog<T> {
+    private final TransactionId transactionId;
+    private final List<T> records;
+
+    public TransactionLog(TransactionId transactionId, List<T> records) {
+        this.transactionId = transactionId;
+        this.records = ImmutableList.copyOf(records);
+    }
+
+    /**
+     * Returns the transaction identifier.
+     *
+     * @return transaction id
+     */
+    public TransactionId transactionId() {
+        return transactionId;
+    }
+
+    /**
+     * Returns the list of transaction log records.
+     *
+     * @return a list of transaction log records
+     */
+    public List<T> records() {
+        return records;
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (object instanceof TransactionLog) {
+            TransactionLog that = (TransactionLog) object;
+            return this.transactionId.equals(that.transactionId)
+                    && this.records.equals(that.records);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(transactionId, records);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("transactionId", transactionId)
+                .add("records", records)
+                .toString();
+    }
+
+    /**
+     * Maps this instance to another {@code MapTransaction} with different key and value types.
+     *
+     * @param mapper function for mapping record types
+     * @return newly typed instance
+     *
+     * @param <U> record type of returned instance
+     */
+    public <U> TransactionLog<U> map(Function<T, U> mapper) {
+        return new TransactionLog<>(transactionId, Lists.transform(records, mapper::apply));
+    }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/Transactional.java b/core/api/src/main/java/org/onosproject/store/service/Transactional.java
new file mode 100644
index 0000000..a6e1fbd
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Transactional.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.primitives.TransactionId;
+
+/**
+ * Interface for transactional primitives.
+ */
+public interface Transactional<T> {
+
+    /**
+     * Begins the transaction.
+     *
+     * @param transactionId the transaction identifier for the transaction to begin
+     * @return a completable future to be completed with the lock version
+     */
+    CompletableFuture<Version> begin(TransactionId transactionId);
+
+    /**
+     * Prepares a transaction for commitment.
+     *
+     * @param transactionLog transaction log
+     * @return {@code true} if prepare is successful and transaction is ready to be committed
+     * {@code false} otherwise
+     */
+    CompletableFuture<Boolean> prepare(TransactionLog<T> transactionLog);
+
+    /**
+     * Prepares and commits a transaction.
+     *
+     * @param transactionLog transaction log
+     * @return {@code true} if prepare is successful and transaction was committed
+     * {@code false} otherwise
+     */
+    CompletableFuture<Boolean> prepareAndCommit(TransactionLog<T> transactionLog);
+
+    /**
+     * Commits a previously prepared transaction and unlocks the object.
+     *
+     * @param transactionId transaction identifier
+     * @return future that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> commit(TransactionId transactionId);
+
+    /**
+     * Aborts a previously prepared transaction and unlocks the object.
+     *
+     * @param transactionId transaction identifier
+     * @return future that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> rollback(TransactionId transactionId);
+
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/Version.java b/core/api/src/main/java/org/onosproject/store/service/Version.java
new file mode 100644
index 0000000..58a1d3d
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Version.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ComparisonChain;
+import org.onosproject.store.Timestamp;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Logical timestamp for versions.
+ * <p>
+ * The version is a logical timestamp that represents a point in logical time at which an event occurs.
+ * This is used in both pessimistic and optimistic locking protocols to ensure that the state of a shared resource
+ * has not changed at the end of a transaction.
+ */
+public class Version implements Timestamp {
+    private final long version;
+
+    public Version(long version) {
+        this.version = version;
+    }
+
+    @Override
+    public int compareTo(Timestamp o) {
+        checkArgument(o instanceof Version,
+                "Must be LockVersion", o);
+        Version that = (Version) o;
+
+        return ComparisonChain.start()
+                .compare(this.version, that.version)
+                .result();
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(version);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof Version)) {
+            return false;
+        }
+        Version that = (Version) obj;
+        return Objects.equals(this.version, that.version);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("version", version)
+                .toString();
+    }
+
+    /**
+     * Returns the lock version.
+     *
+     * @return the lock version
+     */
+    public long value() {
+        return this.version;
+    }
+}
\ No newline at end of file
diff --git a/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java b/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
index 6fecd8c..53cc160 100644
--- a/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
+++ b/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
@@ -33,7 +33,6 @@
             .withValue("2".getBytes())
             .withCurrentVersion(3)
             .withKey("4")
-            .withMapName("5")
             .withType(MapUpdate.Type.PUT)
             .build();
 
@@ -42,7 +41,6 @@
             .withValue("2".getBytes())
             .withCurrentVersion(3)
             .withKey("4")
-            .withMapName("5")
             .withType(MapUpdate.Type.REMOVE)
             .build();
 
@@ -51,7 +49,6 @@
             .withValue("2".getBytes())
             .withCurrentVersion(3)
             .withKey("4")
-            .withMapName("5")
             .withType(MapUpdate.Type.REMOVE_IF_VALUE_MATCH)
             .build();
 
@@ -60,7 +57,6 @@
             .withValue("2".getBytes())
             .withCurrentVersion(3)
             .withKey("4")
-            .withMapName("5")
             .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
             .build();
 
@@ -69,7 +65,6 @@
             .withValue("2".getBytes())
             .withCurrentVersion(3)
             .withKey("4")
-            .withMapName("5")
             .withType(MapUpdate.Type.PUT_IF_VALUE_MATCH)
             .build();
 
@@ -78,7 +73,6 @@
             .withValue("2".getBytes())
             .withCurrentVersion(3)
             .withKey("4")
-            .withMapName("5")
             .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
             .build();
 
@@ -91,7 +85,6 @@
         assertThat(stats1.value(), is("2".getBytes()));
         assertThat(stats1.currentVersion(), is(3L));
         assertThat(stats1.key(), is("4"));
-        assertThat(stats1.mapName(), is("5"));
         assertThat(stats1.type(), is(MapUpdate.Type.PUT));
     }
 
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
index c9161a3..90b4f51 100644
--- a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
@@ -24,6 +24,7 @@
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
 
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 
 /**
@@ -143,7 +144,17 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
         return null;
     }
 
@@ -156,10 +167,5 @@
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
         return null;
     }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
-        return null;
-    }
 }
 
diff --git a/core/api/src/test/java/org/onosproject/store/service/VersionTest.java b/core/api/src/test/java/org/onosproject/store/service/VersionTest.java
new file mode 100644
index 0000000..1495ec3
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/VersionTest.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+
+/**
+ * Version test.
+ */
+public class VersionTest {
+
+    @Test
+    public void testVersion() {
+        Version version1 = new Version(1);
+        Version version2 = new Version(1);
+        assertTrue(version1.equals(version2));
+        assertTrue(version1.hashCode() == version2.hashCode());
+        assertTrue(version1.value() == version2.value());
+
+        Version version3 = new Version(2);
+        assertFalse(version1.equals(version3));
+        assertFalse(version1.hashCode() == version3.hashCode());
+        assertFalse(version1.value() == version3.value());
+    }
+
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 645215c..d676ab1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -50,7 +50,7 @@
 import org.onosproject.store.service.DocumentPath;
 import org.onosproject.store.service.DocumentTreeEvent;
 import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.WorkQueueStats;
@@ -97,7 +97,7 @@
         serializer.register(TransactionId.class, factory);
         serializer.register(MapUpdate.class, factory);
         serializer.register(MapUpdate.Type.class, factory);
-        serializer.register(MapTransaction.class, factory);
+        serializer.register(TransactionLog.class, factory);
         serializer.register(Versioned.class, factory);
         serializer.register(MapEvent.class, factory);
         serializer.register(Task.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index fccf48e..2f3fe11 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -15,11 +15,9 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.CommitStatus;
 import org.onosproject.store.service.Serializer;
@@ -27,7 +25,7 @@
 import org.onosproject.store.service.TransactionalMap;
 import org.onosproject.utils.MeteringAgent;
 
-import com.google.common.collect.Sets;
+import static com.google.common.base.MoreObjects.toStringHelper;
 
 /**
  * Default implementation of transaction context.
@@ -35,17 +33,12 @@
 public class DefaultTransactionContext implements TransactionContext {
 
     private final AtomicBoolean isOpen = new AtomicBoolean(false);
-    private final DistributedPrimitiveCreator creator;
     private final TransactionId transactionId;
     private final TransactionCoordinator transactionCoordinator;
-    private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
     private final MeteringAgent monitor;
 
-    public DefaultTransactionContext(TransactionId transactionId,
-            DistributedPrimitiveCreator creator,
-            TransactionCoordinator transactionCoordinator) {
+    public DefaultTransactionContext(TransactionId transactionId, TransactionCoordinator transactionCoordinator) {
         this.transactionId = transactionId;
-        this.creator = creator;
         this.transactionCoordinator = transactionCoordinator;
         this.monitor = new MeteringAgent("transactionContext", "*", true);
     }
@@ -75,8 +68,7 @@
     @Override
     public CompletableFuture<CommitStatus> commit() {
         final MeteringAgent.Context timer = monitor.startTimer("commit");
-        return transactionCoordinator.commit(transactionId, txParticipants)
-                                     .whenComplete((r, e) -> timer.stop(e));
+        return transactionCoordinator.commit().whenComplete((r, e) -> timer.stop(e));
     }
 
     @Override
@@ -85,14 +77,14 @@
     }
 
     @Override
-    public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
-            Serializer serializer) {
-        // FIXME: Do not create duplicates.
-        DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<K, V>(mapName,
-                DistributedPrimitives.newMeteredMap(creator.<K, V>newAsyncConsistentMap(mapName, serializer)),
-                this,
-                serializer);
-        txParticipants.add(txMap);
-        return txMap;
+    public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, Serializer serializer) {
+        return transactionCoordinator.getTransactionalMap(mapName, serializer);
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("transactionId", transactionId)
+                .toString();
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
index c599306..12f36a0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
@@ -15,31 +15,26 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionContextBuilder;
+
 /**
  * Default Transaction Context Builder.
  */
 public class DefaultTransactionContextBuilder extends TransactionContextBuilder {
 
     private final TransactionId transactionId;
-    private final DistributedPrimitiveCreator primitiveCreator;
-    private final TransactionCoordinator transactionCoordinator;
+    private final TransactionManager transactionManager;
 
-    public DefaultTransactionContextBuilder(TransactionId transactionId,
-            DistributedPrimitiveCreator primitiveCreator,
-            TransactionCoordinator transactionCoordinator) {
+    public DefaultTransactionContextBuilder(TransactionId transactionId, TransactionManager transactionManager) {
         this.transactionId = transactionId;
-        this.primitiveCreator = primitiveCreator;
-        this.transactionCoordinator = transactionCoordinator;
+        this.transactionManager = transactionManager;
     }
 
     @Override
     public TransactionContext build() {
         return new DefaultTransactionContext(transactionId,
-                primitiveCreator,
-                transactionCoordinator);
+                new TransactionCoordinator(transactionId, transactionManager));
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
deleted file mode 100644
index d138c9b..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.impl;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.HexString;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapTransaction;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TransactionContext;
-import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.Versioned;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Default Transactional Map implementation that provides a repeatable reads
- * transaction isolation level.
- *
- * @param <K> key type
- * @param <V> value type.
- */
-public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
-
-    private final TransactionContext txContext;
-    private static final String TX_CLOSED_ERROR = "Transaction is closed";
-    private final AsyncConsistentMap<K, V> backingMap;
-    private final ConsistentMap<K, V> backingConsistentMap;
-    private final String name;
-    private final Serializer serializer;
-    private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
-    private final Map<K, V> writeCache = Maps.newConcurrentMap();
-    private final Set<K> deleteSet = Sets.newConcurrentHashSet();
-
-    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
-    private static final String ERROR_NULL_KEY = "Null key is not allowed";
-
-    private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
-            .softValues()
-            .build(new CacheLoader<K, String>() {
-
-                @Override
-                public String load(K key) {
-                    return HexString.toHexString(serializer.encode(key));
-                }
-            });
-
-    protected K dK(String key) {
-        return serializer.decode(HexString.fromHexString(key));
-    }
-
-    public DefaultTransactionalMap(
-            String name,
-            AsyncConsistentMap<K, V> backingMap,
-            TransactionContext txContext,
-            Serializer serializer) {
-        this.name = name;
-        this.backingMap = backingMap;
-        this.backingConsistentMap = backingMap.asConsistentMap();
-        this.txContext = txContext;
-        this.serializer = serializer;
-    }
-
-    @Override
-    public V get(K key) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(key, ERROR_NULL_KEY);
-        if (deleteSet.contains(key)) {
-            return null;
-        }
-        V latest = writeCache.get(key);
-        if (latest != null) {
-            return latest;
-        } else {
-            Versioned<V> v = readCache.computeIfAbsent(key, k -> backingConsistentMap.get(k));
-            return v != null ? v.value() : null;
-        }
-    }
-
-    @Override
-    public boolean containsKey(K key) {
-        return get(key) != null;
-    }
-
-    @Override
-    public V put(K key, V value) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(value, ERROR_NULL_VALUE);
-
-        V latest = get(key);
-        writeCache.put(key, value);
-        deleteSet.remove(key);
-        return latest;
-    }
-
-    @Override
-    public V remove(K key) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        V latest = get(key);
-        if (latest != null) {
-            writeCache.remove(key);
-            deleteSet.add(key);
-        }
-        return latest;
-    }
-
-    @Override
-    public boolean remove(K key, V value) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(value, ERROR_NULL_VALUE);
-        V latest = get(key);
-        if (Objects.equal(value, latest)) {
-            remove(key);
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean replace(K key, V oldValue, V newValue) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(oldValue, ERROR_NULL_VALUE);
-        checkNotNull(newValue, ERROR_NULL_VALUE);
-        V latest = get(key);
-        if (Objects.equal(oldValue, latest)) {
-            put(key, newValue);
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public V putIfAbsent(K key, V value) {
-        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        checkNotNull(value, ERROR_NULL_VALUE);
-        V latest = get(key);
-        if (latest == null) {
-            put(key, value);
-        }
-        return latest;
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepare() {
-        return backingMap.prepare(new MapTransaction<>(txContext.transactionId(), updates()));
-    }
-
-    @Override
-    public CompletableFuture<Void> commit() {
-        return backingMap.commit(txContext.transactionId());
-    }
-
-    @Override
-    public CompletableFuture<Void> rollback() {
-        return backingMap.rollback(txContext.transactionId());
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit() {
-        return backingMap.prepareAndCommit(new MapTransaction<>(txContext.transactionId(), updates()));
-    }
-
-    @Override
-    public int totalUpdates() {
-        return updates().size();
-    }
-
-    @Override
-    public boolean hasPendingUpdates() {
-        return updatesStream().findAny().isPresent();
-    }
-
-    protected Stream<MapUpdate<K, V>> updatesStream() {
-        return Stream.concat(
-            // 1st stream: delete ops
-            deleteSet.stream()
-                .map(key -> Pair.of(key, readCache.get(key)))
-                .filter(e -> e.getValue() != null)
-                .map(e -> MapUpdate.<K, V>newBuilder()
-                 .withMapName(name)
-                 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
-                 .withKey(e.getKey())
-                 .withCurrentVersion(e.getValue().version())
-                 .build()),
-            // 2nd stream: write ops
-            writeCache.entrySet().stream()
-                    .map(e -> {
-                        Versioned<V> original = readCache.get(e.getKey());
-                        if (original == null) {
-                            return MapUpdate.<K, V>newBuilder()
-                                    .withMapName(name)
-                                    .withType(MapUpdate.Type.PUT_IF_ABSENT)
-                                    .withKey(e.getKey())
-                                    .withValue(e.getValue())
-                                    .build();
-                        } else {
-                            return MapUpdate.<K, V>newBuilder()
-                                    .withMapName(name)
-                                    .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
-                                    .withKey(e.getKey())
-                                    .withCurrentVersion(original.version())
-                                    .withValue(e.getValue())
-                                    .build();
-                        }
-                    }));
-    }
-
-    protected List<MapUpdate<K, V>> updates() {
-        return updatesStream().collect(Collectors.toList());
-    }
-
-    protected List<MapUpdate<String, byte[]>> toMapUpdates() {
-        List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
-        deleteSet.forEach(key -> {
-            Versioned<V> original = readCache.get(key);
-            if (original != null) {
-                updates.add(MapUpdate.<String, byte[]>newBuilder()
-                        .withMapName(name)
-                        .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
-                        .withKey(keyCache.getUnchecked(key))
-                        .withCurrentVersion(original.version())
-                        .build());
-            }
-        });
-        writeCache.forEach((key, value) -> {
-            Versioned<V> original = readCache.get(key);
-            if (original == null) {
-                updates.add(MapUpdate.<String, byte[]>newBuilder()
-                        .withMapName(name)
-                        .withType(MapUpdate.Type.PUT_IF_ABSENT)
-                        .withKey(keyCache.getUnchecked(key))
-                        .withValue(serializer.encode(value))
-                        .build());
-            } else {
-                updates.add(MapUpdate.<String, byte[]>newBuilder()
-                        .withMapName(name)
-                        .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
-                        .withKey(keyCache.getUnchecked(key))
-                        .withCurrentVersion(original.version())
-                        .withValue(serializer.encode(value))
-                        .build());
-            }
-        });
-        return updates;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("backingMap", backingMap)
-                .add("updates", updates())
-                .toString();
-    }
-
-    /**
-     * Discards all changes made to this transactional map.
-     */
-    protected void abort() {
-        readCache.clear();
-        writeCache.clear();
-        deleteSet.clear();
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
new file mode 100644
index 0000000..9acc377
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * Repeatable read based map participant.
+ */
+public class DefaultTransactionalMapParticipant<K, V> extends TransactionalMapParticipant<K, V> {
+    private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
+
+    public DefaultTransactionalMapParticipant(
+            ConsistentMap<K, V> backingMap, Transaction<MapUpdate<K, V>> transaction) {
+        super(backingMap, transaction);
+    }
+
+    @Override
+    protected V read(K key) {
+        Versioned<V> value = readCache.computeIfAbsent(key, backingMap::get);
+        return value != null ? value.value() : null;
+    }
+
+    @Override
+    protected Stream<MapUpdate<K, V>> records() {
+        return Stream.concat(deleteStream(), writeStream());
+    }
+
+    /**
+     * Returns a transaction record stream for deleted keys.
+     */
+    private Stream<MapUpdate<K, V>> deleteStream() {
+        return deleteSet.stream()
+                .map(key -> Pair.of(key, readCache.get(key)))
+                .filter(e -> e.getValue() != null)
+                .map(e -> MapUpdate.<K, V>newBuilder()
+                        .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                        .withKey(e.getKey())
+                        .withCurrentVersion(e.getValue().version())
+                        .build());
+    }
+
+    /**
+     * Returns a transaction record stream for updated keys.
+     */
+    private Stream<MapUpdate<K, V>> writeStream() {
+        return writeCache.entrySet().stream().map(entry -> {
+            Versioned<V> original = readCache.get(entry.getKey());
+            if (original == null) {
+                return MapUpdate.<K, V>newBuilder()
+                        .withType(MapUpdate.Type.PUT_IF_ABSENT)
+                        .withKey(entry.getKey())
+                        .withValue(entry.getValue())
+                        .build();
+            } else {
+                return MapUpdate.<K, V>newBuilder()
+                        .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey(entry.getKey())
+                        .withCurrentVersion(original.version())
+                        .withValue(entry.getValue())
+                        .build();
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 2f4a665..c0b323a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -29,10 +29,12 @@
 import java.util.function.Predicate;
 
 import org.onosproject.core.ApplicationId;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.MoreObjects;
@@ -170,8 +172,18 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
-        return delegateMap.prepare(transaction);
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return delegateMap.begin(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        return delegateMap.prepare(transactionLog);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        return delegateMap.prepareAndCommit(transactionLog);
     }
 
     @Override
@@ -185,11 +197,6 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
-        return delegateMap.prepareAndCommit(transaction);
-    }
-
-    @Override
     public void addStatusChangeListener(Consumer<Status> listener) {
         delegateMap.addStatusChangeListener(listener);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index 48c0b85..f474f7b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -16,10 +16,12 @@
 
 package org.onosproject.store.primitives.impl;
 
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
@@ -253,9 +255,18 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(
-            MapTransaction<String, V> transaction) {
-        return delegateMap.prepare(transaction);
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return delegateMap.begin(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V>> transactionLog) {
+        return delegateMap.prepare(transactionLog);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V>> transactionLog) {
+        return delegateMap.prepareAndCommit(transactionLog);
     }
 
     @Override
@@ -269,12 +280,6 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepareAndCommit(
-            MapTransaction<String, V> transaction) {
-        return delegateMap.prepareAndCommit(transaction);
-    }
-
-    @Override
     public boolean equals(Object other) {
         if (other instanceof DelegatingAsyncConsistentTreeMap) {
             DelegatingAsyncConsistentTreeMap<V> that =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
index f8844fb..ccffead 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
@@ -26,11 +26,13 @@
 import java.util.function.Function;
 import java.util.function.Predicate;
 
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.Throwables;
@@ -63,6 +65,7 @@
     private static final String ENTRY_SET = "entrySet";
     private static final String REPLACE = "replace";
     private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
+    private static final String BEGIN = "begin";
     private static final String PREPARE = "prepare";
     private static final String COMMIT = "commit";
     private static final String ROLLBACK = "rollback";
@@ -249,10 +252,17 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        final MeteringAgent.Context timer = monitor.startTimer(BEGIN);
+        return super.begin(transactionId)
+                .whenComplete((r, e) -> timer.stop(e));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
         final MeteringAgent.Context timer = monitor.startTimer(PREPARE);
-        return super.prepare(transaction)
-                    .whenComplete((r, e) -> timer.stop(e));
+        return super.prepare(transactionLog)
+                .whenComplete((r, e) -> timer.stop(e));
     }
 
     @Override
@@ -270,10 +280,10 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
         final MeteringAgent.Context timer = monitor.startTimer(PREPARE_AND_COMMIT);
-        return super.prepareAndCommit(transaction)
-                    .whenComplete((r, e) -> timer.stop(e));
+        return super.prepareAndCommit(transactionLog)
+                .whenComplete((r, e) -> timer.stop(e));
     }
 
     private class InternalMeteredMapEventListener implements MapEventListener<K, V> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 9adcb33..bf77105 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -18,7 +18,6 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -37,12 +36,12 @@
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -200,54 +199,28 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        throw new UnsupportedOperationException();
+    }
 
-        Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
-        transaction.updates().forEach(update -> {
-            AsyncConsistentMap<K, V> map = getMap(update.key());
-            updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
-        });
-        Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
-                Maps.transformValues(updatesGroupedByMap,
-                                     list -> new MapTransaction<>(transaction.transactionId(), list));
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        throw new UnsupportedOperationException();
+    }
 
-        return Tools.allOf(transactionsByMap.entrySet()
-                         .stream()
-                         .map(e -> e.getKey().prepare(e.getValue()))
-                         .collect(Collectors.toList()))
-                    .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return CompletableFuture.allOf(getMaps().stream()
-                                                .map(e -> e.commit(transactionId))
-                                                .toArray(CompletableFuture[]::new));
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return CompletableFuture.allOf(getMaps().stream()
-                .map(e -> e.rollback(transactionId))
-                .toArray(CompletableFuture[]::new));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
-        Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
-        transaction.updates().forEach(update -> {
-            AsyncConsistentMap<K, V> map = getMap(update.key());
-            updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
-        });
-        Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
-                Maps.transformValues(updatesGroupedByMap,
-                                     list -> new MapTransaction<>(transaction.transactionId(), list));
-
-        return Tools.allOf(transactionsByMap.entrySet()
-                                            .stream()
-                                            .map(e -> e.getKey().prepareAndCommit(e.getValue()))
-                                            .collect(Collectors.toList()))
-                    .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+        throw new UnsupportedOperationException();
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java
new file mode 100644
index 0000000..90b3524
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.service.TransactionalMap;
+
+/**
+ * Partitioned transactional map.
+ */
+public class PartitionedTransactionalMap<K, V> implements TransactionalMap<K, V> {
+    protected final Map<PartitionId, TransactionalMapParticipant<K, V>> partitions;
+    protected final Hasher<K> hasher;
+
+    public PartitionedTransactionalMap(
+            Map<PartitionId, TransactionalMapParticipant<K, V>> partitions, Hasher<K> hasher) {
+        this.partitions = partitions;
+        this.hasher = hasher;
+    }
+
+    /**
+     * Returns the collection of map partitions.
+     *
+     * @return a collection of map partitions
+     */
+    @SuppressWarnings("unchecked")
+    Collection<TransactionParticipant> participants() {
+        return (Collection) partitions.values();
+    }
+
+    /**
+     * Returns the partition for the given key.
+     *
+     * @param key the key for which to return the partition
+     * @return the partition for the given key
+     */
+    private TransactionalMap<K, V> partition(K key) {
+        return partitions.get(hasher.hash(key));
+    }
+
+    @Override
+    public V get(K key) {
+        return partition(key).get(key);
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+        return partition(key).containsKey(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+        return partition(key).put(key, value);
+    }
+
+    @Override
+    public V remove(K key) {
+        return partition(key).remove(key);
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        return partition(key).putIfAbsent(key, value);
+    }
+
+    @Override
+    public boolean remove(K key, V value) {
+        return partition(key).remove(key, value);
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        return partition(key).replace(key, oldValue, newValue);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("partitions", partitions.values())
+                .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 48d9b7e..1fc821d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -42,7 +42,6 @@
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
@@ -69,7 +68,6 @@
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
 
 /**
  * Implementation for {@code StorageService} and {@code StorageAdminService}.
@@ -98,8 +96,7 @@
     private final Supplier<TransactionId> transactionIdGenerator =
             () -> TransactionId.from(UUID.randomUUID().toString());
     private DistributedPrimitiveCreator federatedPrimitiveCreator;
-    private AsyncConsistentMap<TransactionId, Transaction.State> transactions;
-    private TransactionCoordinator transactionCoordinator;
+    private TransactionManager transactionManager;
 
     @Activate
     public void activate() {
@@ -108,13 +105,7 @@
             .filter(id -> !id.equals(PartitionId.from(0)))
             .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
         federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
-        transactions = this.<TransactionId, Transaction.State>consistentMapBuilder()
-                    .withName("onos-transactions")
-                    .withSerializer(Serializer.using(KryoNamespaces.API,
-                            Transaction.class,
-                            Transaction.State.class))
-                    .buildAsyncMap();
-        transactionCoordinator = new TransactionCoordinator(transactions);
+        transactionManager = new TransactionManager(this, partitionService);
         log.info("Started");
     }
 
@@ -187,9 +178,7 @@
     @Override
     public TransactionContextBuilder transactionContextBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultTransactionContextBuilder(transactionIdGenerator.get(),
-                federatedPrimitiveCreator,
-                transactionCoordinator);
+        return new DefaultTransactionContextBuilder(transactionIdGenerator.get(), transactionManager);
     }
 
     @Override
@@ -259,7 +248,7 @@
 
     @Override
     public Collection<TransactionId> getPendingTransactions() {
-        return Futures.getUnchecked(transactions.keySet());
+        return transactionManager.getPendingTransactions();
     }
 
     private List<MapInfo> listMapInfo(DistributedPrimitiveCreator creator) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index be5f05a..82b3919 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Laboratory
+ * Copyright 2017-present Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,88 +16,263 @@
 package org.onosproject.store.primitives.impl;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Transactional;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.TransactionContext;
+import org.onosproject.store.service.TransactionException;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
- * An immutable transaction object.
+ * Manages a transaction within the context of a single primitive.
+ * <p>
+ * The {@code Transaction} object is used to manage the transaction for a single partition primitive that implements
+ * the {@link Transactional} interface. It's used as a proxy for {@link TransactionContext}s to manage the transaction
+ * as it relates to a single piece of atomic state.
  */
-public class Transaction {
+public class Transaction<T> {
 
+    /**
+     * Transaction state.
+     * <p>
+     * The transaction state is used to indicate the phase within which the transaction is currently running.
+     */
     enum State {
+
         /**
-         * Indicates a new transaction that is about to be prepared. All transactions
-         * start their life in this state.
+         * Active transaction state.
+         * <p>
+         * The {@code ACTIVE} state represents a transaction in progress. Active transactions may or may not affect
+         * concurrently running transactions depending on the transaction's isolation level.
+         */
+        ACTIVE,
+
+        /**
+         * Preparing transaction state.
+         * <p>
+         * Once a transaction commitment begins, it enters the {@code PREPARING} phase of the two-phase commit protocol.
          */
         PREPARING,
 
         /**
-         * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+         * Prepared transaction state.
+         * <p>
+         * Once the first phase of the two-phase commit protocol is complete, the transaction's state is set to
+         * {@code PREPARED}.
          */
         PREPARED,
 
         /**
-         * Indicates a transaction that is about to be committed.
+         * Committing transaction state.
+         * <p>
+         * The {@code COMMITTING} state represents a transaction within the second phase of the two-phase commit
+         * protocol.
          */
         COMMITTING,
 
         /**
-         * Indicates a transaction that has successfully committed.
+         * Committed transaction state.
+         * <p>
+         * Once the second phase of the two-phase commit protocol is complete, the transaction's state is set to
+         * {@code COMMITTED}.
          */
         COMMITTED,
 
         /**
-         * Indicates a transaction that is about to be rolled back.
+         * Rolling back transaction state.
+         * <p>
+         * In the event of a two-phase lock failure, when the transaction is rolled back it will enter the
+         * {@code ROLLING_BACK} state while the rollback is in progress.
          */
-        ROLLINGBACK,
+        ROLLING_BACK,
 
         /**
-         * Indicates a transaction that has been rolled back and all locks are released.
+         * Rolled back transaction state.
+         * <p>
+         * Once a transaction has been rolled back, it will enter the {@code ROLLED_BACK} state.
          */
-        ROLLEDBACK
+        ROLLED_BACK,
     }
 
-    private final TransactionId transactionId;
-    private final List<MapUpdate<String, byte[]>> updates;
-    private final State state;
+    private static final String TX_OPEN_ERROR = "transaction already open";
+    private static final String TX_CLOSED_ERROR = "transaction not open";
+    private static final String TX_INACTIVE_ERROR = "transaction is not active";
+    private static final String TX_UNPREPARED_ERROR = "transaction has not been prepared";
 
-    public Transaction(TransactionId transactionId, List<MapUpdate<String, byte[]>> updates) {
-        this(transactionId, updates, State.PREPARING);
-    }
+    protected final TransactionId transactionId;
+    protected final Transactional<T> transactionalObject;
+    private final AtomicBoolean open = new AtomicBoolean();
+    private volatile State state = State.ACTIVE;
 
-    private Transaction(TransactionId transactionId,
-            List<MapUpdate<String, byte[]>> updates,
-            State state) {
+    public Transaction(TransactionId transactionId, Transactional<T> transactionalObject) {
         this.transactionId = transactionId;
-        this.updates = ImmutableList.copyOf(updates);
-        this.state = state;
+        this.transactionalObject = transactionalObject;
     }
 
-    public TransactionId id() {
+    /**
+     * Returns the transaction identifier.
+     *
+     * @return the transaction identifier
+     */
+    public TransactionId transactionId() {
         return transactionId;
     }
 
-    public List<MapUpdate<String, byte[]>> updates() {
-        return updates;
-    }
-
+    /**
+     * Returns the current transaction state.
+     *
+     * @return the current transaction state
+     */
     public State state() {
         return state;
     }
 
-    public Transaction transition(State newState) {
-        return new Transaction(transactionId, updates, newState);
+    /**
+     * Returns a boolean indicating whether the transaction is open.
+     *
+     * @return indicates whether the transaction is open
+     */
+    public boolean isOpen() {
+        return open.get();
+    }
+
+    /**
+     * Opens the transaction, throwing an {@link IllegalStateException} if it's already open.
+     */
+    protected void open() {
+        if (!open.compareAndSet(false, true)) {
+            throw new IllegalStateException(TX_OPEN_ERROR);
+        }
+    }
+
+    /**
+     * Checks that the transaction is open and throws an {@link IllegalStateException} if not.
+     */
+    protected void checkOpen() {
+        checkState(isOpen(), TX_CLOSED_ERROR);
+    }
+
+    /**
+     * Checks that the transaction state is {@code ACTIVE} and throws an {@link IllegalStateException} if not.
+     */
+    protected void checkActive() {
+        checkState(state == State.ACTIVE, TX_INACTIVE_ERROR);
+    }
+
+    /**
+     * Checks that the transaction state is {@code PREPARED} and throws an {@link IllegalStateException} if not.
+     */
+    protected void checkPrepared() {
+        checkState(state == State.PREPARED, TX_UNPREPARED_ERROR);
+    }
+
+    /**
+     * Updates the transaction state.
+     *
+     * @param state the updated transaction state
+     */
+    protected void setState(State state) {
+        this.state = state;
+    }
+
+    /**
+     * Begins the transaction.
+     * <p>
+     * Locks are acquired when the transaction is begun to prevent concurrent transactions from operating on the shared
+     * resource to which this transaction relates.
+     *
+     * @return a completable future to be completed once the transaction has been started
+     */
+    public CompletableFuture<Version> begin() {
+        open();
+        return transactionalObject.begin(transactionId);
+    }
+
+    /**
+     * Prepares the transaction.
+     * <p>
+     * When preparing the transaction, the given list of updates for the shared resource will be prepared, and
+     * concurrent modification checks will be performed. The returned future may be completed with a
+     * {@link TransactionException} if a concurrent modification is detected for an isolation level that does
+     * not allow such modifications.
+     *
+     * @param updates the transaction updates
+     * @return a completable future to be completed once the transaction has been prepared
+     */
+    public CompletableFuture<Boolean> prepare(List<T> updates) {
+        checkOpen();
+        checkActive();
+        setState(State.PREPARING);
+        return transactionalObject.prepare(new TransactionLog<T>(transactionId, updates))
+                .thenApply(succeeded -> {
+                    setState(State.PREPARED);
+                    return succeeded;
+                });
+    }
+
+    /**
+     * Prepares and commits the transaction in a single atomic operation.
+     * <p>
+     * Both the prepare and commit phases of the protocol must be executed within a single atomic operation. This method
+     * is used to optimize committing transactions that operate only on a single partition within a single primitive.
+     *
+     * @param updates the transaction updates
+     * @return a completable future to be completed once the transaction has been prepared
+     */
+    public CompletableFuture<Boolean> prepareAndCommit(List<T> updates) {
+        checkOpen();
+        checkActive();
+        setState(State.PREPARING);
+        return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, updates))
+                .thenApply(succeeded -> {
+                    setState(State.COMMITTED);
+                    return succeeded;
+                });
+    }
+
+    /**
+     * Commits the transaction.
+     * <p>
+     * Performs the second phase of the two-phase commit protocol, committing the previously
+     * {@link #prepare(List) prepared} updates.
+     *
+     * @return a completable future to be completed once the transaction has been committed
+     */
+    public CompletableFuture<Void> commit() {
+        checkOpen();
+        checkPrepared();
+        setState(State.COMMITTING);
+        return transactionalObject.commit(transactionId).thenRun(() -> {
+            setState(State.COMMITTED);
+        });
+    }
+
+    /**
+     * Rolls back the transaction.
+     * <p>
+     * Rolls back the first phase of the two-phase commit protocol, cancelling prepared updates.
+     *
+     * @return a completable future to be completed once the transaction has been rolled back
+     */
+    public CompletableFuture<Void> rollback() {
+        checkOpen();
+        checkPrepared();
+        setState(State.ROLLING_BACK);
+        return transactionalObject.rollback(transactionId).thenRun(() -> {
+            setState(State.ROLLED_BACK);
+        });
     }
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(getClass())
+        return toStringHelper(this)
                 .add("transactionId", transactionId)
-                .add("updates", updates)
                 .add("state", state)
                 .toString();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 160ecf9..9826266 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -19,78 +19,123 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Sets;
 import org.onlab.util.Tools;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.CommitStatus;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionalMap;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
 
 /**
- * Coordinator for a two-phase commit protocol.
+ * Transaction coordinator.
  */
 public class TransactionCoordinator {
+    protected final TransactionId transactionId;
+    protected final TransactionManager transactionManager;
+    protected final Set<TransactionParticipant> transactionParticipants = Sets.newConcurrentHashSet();
 
-    private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
-
-    public TransactionCoordinator(AsyncConsistentMap<TransactionId, Transaction.State> transactions) {
-        this.transactions = transactions;
+    public TransactionCoordinator(TransactionId transactionId, TransactionManager transactionManager) {
+        this.transactionId = transactionId;
+        this.transactionManager = transactionManager;
     }
 
     /**
-     * Commits a transaction.
+     * Returns a transactional map for this transaction.
      *
-     * @param transactionId transaction identifier
-     * @param transactionParticipants set of transaction participants
-     * @return future for commit result
+     * @param name the transactional map name
+     * @param serializer the serializer
+     * @param <K> key type
+     * @param <V> value type
+     * @return a transactional map for this transaction
      */
-    CompletableFuture<CommitStatus> commit(TransactionId transactionId,
-                                           Set<TransactionParticipant> transactionParticipants) {
-        int totalUpdates = transactionParticipants.stream()
-                                                  .map(TransactionParticipant::totalUpdates)
-                                                  .reduce(Math::addExact)
-                                                  .orElse(0);
+    public <K, V> TransactionalMap<K, V> getTransactionalMap(String name, Serializer serializer) {
+        PartitionedTransactionalMap<K, V> map = transactionManager.getTransactionalMap(name, serializer, this);
+        transactionParticipants.addAll(map.participants());
+        return map;
+    }
 
-        if (totalUpdates == 0) {
+    /**
+     * Commits the transaction.
+     *
+     * @return the transaction commit status
+     */
+    public CompletableFuture<CommitStatus> commit() {
+        long totalParticipants = transactionParticipants.stream()
+                .filter(TransactionParticipant::hasPendingUpdates)
+                .count();
+
+        if (totalParticipants == 0) {
             return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
-        } else if (totalUpdates == 1) {
+        } else if (totalParticipants == 1) {
             return transactionParticipants.stream()
-                                          .filter(p -> p.totalUpdates() == 1)
-                                          .findFirst()
-                                          .get()
-                                          .prepareAndCommit()
-                                          .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
+                    .filter(TransactionParticipant::hasPendingUpdates)
+                    .findFirst()
+                    .get()
+                    .prepareAndCommit()
+                    .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
         } else {
-            CompletableFuture<CommitStatus> status =  transactions.put(transactionId, Transaction.State.PREPARING)
-                    .thenCompose(v -> this.doPrepare(transactionParticipants))
+            Set<TransactionParticipant> transactionParticipants = this.transactionParticipants.stream()
+                    .filter(TransactionParticipant::hasPendingUpdates)
+                    .collect(Collectors.toSet());
+
+            CompletableFuture<CommitStatus> status = transactionManager.updateState(
+                    transactionId, Transaction.State.PREPARING)
+                    .thenCompose(v -> prepare(transactionParticipants))
                     .thenCompose(result -> result
-                            ? transactions.put(transactionId, Transaction.State.COMMITTING)
-                                          .thenCompose(v -> doCommit(transactionParticipants))
-                                          .thenApply(v -> CommitStatus.SUCCESS)
-                            : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
-                                          .thenCompose(v -> doRollback(transactionParticipants))
-                                          .thenApply(v -> CommitStatus.FAILURE));
-            return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
+                            ? transactionManager.updateState(transactionId, Transaction.State.COMMITTING)
+                            .thenCompose(v -> commit(transactionParticipants))
+                            .thenApply(v -> CommitStatus.SUCCESS)
+                            : transactionManager.updateState(transactionId, Transaction.State.ROLLING_BACK)
+                            .thenCompose(v -> rollback(transactionParticipants))
+                            .thenApply(v -> CommitStatus.FAILURE));
+            return status.thenCompose(v -> transactionManager.remove(transactionId).thenApply(u -> v));
         }
     }
 
-    private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
+    /**
+     * Performs the prepare phase of the two-phase commit protocol for the given transaction participants.
+     *
+     * @param transactionParticipants the transaction participants for which to prepare the transaction
+     * @return a completable future indicating whether <em>all</em> prepares succeeded
+     */
+    protected CompletableFuture<Boolean> prepare(Set<TransactionParticipant> transactionParticipants) {
         return Tools.allOf(transactionParticipants.stream()
-                                                  .filter(TransactionParticipant::hasPendingUpdates)
-                                                  .map(TransactionParticipant::prepare)
-                                                  .collect(Collectors.toList()))
-                    .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+                .map(TransactionParticipant::prepare)
+                .collect(Collectors.toList()))
+                .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
     }
 
-    private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
+    /**
+     * Performs the commit phase of the two-phase commit protocol for the given transaction participants.
+     *
+     * @param transactionParticipants the transaction participants for which to commit the transaction
+     * @return a completable future to be completed once the commits are complete
+     */
+    protected CompletableFuture<Void> commit(Set<TransactionParticipant> transactionParticipants) {
         return CompletableFuture.allOf(transactionParticipants.stream()
-                                                              .filter(TransactionParticipant::hasPendingUpdates)
-                                                              .map(TransactionParticipant::commit)
-                                                              .toArray(CompletableFuture[]::new));
+                .map(TransactionParticipant::commit)
+                .toArray(CompletableFuture[]::new));
     }
 
-    private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
+    /**
+     * Rolls back transactions for the given participants.
+     *
+     * @param transactionParticipants the transaction participants for which to roll back the transaction
+     * @return a completable future to be completed once the rollbacks are complete
+     */
+    protected CompletableFuture<Void> rollback(Set<TransactionParticipant> transactionParticipants) {
         return CompletableFuture.allOf(transactionParticipants.stream()
-                                                              .filter(TransactionParticipant::hasPendingUpdates)
-                                                              .map(TransactionParticipant::rollback)
-                                                              .toArray(CompletableFuture[]::new));
+                .map(TransactionParticipant::rollback)
+                .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("transactionId", transactionId)
+                .add("participants", transactionParticipants)
+                .toString();
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
new file mode 100644
index 0000000..5c246de
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.Futures;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.PartitionService;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.TransactionException;
+
+/**
+ * Transaction manager for managing state shared across multiple transactions.
+ */
+public class TransactionManager {
+    private static final int DEFAULT_CACHE_SIZE = 100;
+
+    private final PartitionService partitionService;
+    private final List<PartitionId> sortedPartitions;
+    private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
+    private final int cacheSize;
+    private final Map<PartitionId, Cache<String, AsyncConsistentMap>> partitionCache = Maps.newConcurrentMap();
+
+    public TransactionManager(StorageService storageService, PartitionService partitionService) {
+        this(storageService, partitionService, DEFAULT_CACHE_SIZE);
+    }
+
+    public TransactionManager(StorageService storageService, PartitionService partitionService, int cacheSize) {
+        this.partitionService = partitionService;
+        this.cacheSize = cacheSize;
+        this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder()
+                .withName("onos-transactions")
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                        Transaction.class,
+                        Transaction.State.class))
+                .buildAsyncMap();
+        this.sortedPartitions = Lists.newArrayList(partitionService.getAllPartitionIds());
+        Collections.sort(sortedPartitions);
+    }
+
+    /**
+     * Returns the collection of currently pending transactions.
+     *
+     * @return a collection of currently pending transactions
+     */
+    public Collection<TransactionId> getPendingTransactions() {
+        return Futures.getUnchecked(transactions.keySet());
+    }
+
+    /**
+     * Returns a partitioned transactional map for use within a transaction context.
+     * <p>
+     * The transaction coordinator will return a map that takes advantage of caching that's shared across transaction
+     * contexts.
+     *
+     * @param name the map name
+     * @param serializer the map serializer
+     * @param transactionCoordinator the transaction coordinator for which the map is being created
+     * @param <K> key type
+     * @param <V> value type
+     * @return a partitioned transactional map
+     */
+    <K, V> PartitionedTransactionalMap<K, V> getTransactionalMap(
+            String name,
+            Serializer serializer,
+            TransactionCoordinator transactionCoordinator) {
+        Map<PartitionId, TransactionalMapParticipant<K, V>> partitions = new HashMap<>();
+        for (PartitionId partitionId : partitionService.getAllPartitionIds()) {
+            partitions.put(partitionId, getTransactionalMapPartition(
+                    name, partitionId, serializer, transactionCoordinator));
+        }
+
+        Hasher<K> hasher = key -> {
+            int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
+            return sortedPartitions.get(Math.abs(hashCode) % sortedPartitions.size());
+        };
+        return new PartitionedTransactionalMap<>(partitions, hasher);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <K, V> TransactionalMapParticipant<K, V> getTransactionalMapPartition(
+            String mapName,
+            PartitionId partitionId,
+            Serializer serializer,
+            TransactionCoordinator transactionCoordinator) {
+        Cache<String, AsyncConsistentMap> mapCache = partitionCache.computeIfAbsent(partitionId, p ->
+                CacheBuilder.newBuilder().maximumSize(cacheSize / partitionService.getNumberOfPartitions()).build());
+        try {
+            AsyncConsistentMap<K, V> baseMap = partitionService.getDistributedPrimitiveCreator(partitionId)
+                            .newAsyncConsistentMap(mapName, serializer);
+            AsyncConsistentMap<K, V> asyncMap = mapCache.get(mapName, () ->
+                    DistributedPrimitives.newCachingMap(baseMap));
+
+            Transaction<MapUpdate<K, V>> transaction = new Transaction<>(
+                    transactionCoordinator.transactionId,
+                    baseMap);
+            return new DefaultTransactionalMapParticipant<>(asyncMap.asConsistentMap(), transaction);
+        } catch (ExecutionException e) {
+            throw new TransactionException(e);
+        }
+    }
+
+    /**
+     * Updates the state of a transaction in the transaction registry.
+     *
+     * @param transactionId the transaction identifier
+     * @param state the state of the transaction
+     * @return a completable future to be completed once the transaction state has been updated in the registry
+     */
+    CompletableFuture<Void> updateState(TransactionId transactionId, Transaction.State state) {
+        return transactions.put(transactionId, state).thenApply(v -> null);
+    }
+
+    /**
+     * Removes the given transaction from the transaction registry.
+     *
+     * @param transactionId the transaction identifier
+     * @return a completable future to be completed once the transaction state has been removed from the registry
+     */
+    CompletableFuture<Void> remove(TransactionId transactionId) {
+        return transactions.remove(transactionId).thenApply(v -> null);
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
index 0780bf6..f97d448 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
@@ -23,40 +23,38 @@
 public interface TransactionParticipant {
 
     /**
-     * Returns if this participant has updates that need to be committed.
-     * @return {@code true} if yes; {@code false} otherwise
+     * Returns a boolean indicating whether the participant has pending updates.
+     *
+     * @return indicates whether the participant has pending updates
      */
-    default boolean hasPendingUpdates() {
-        return totalUpdates() > 0;
-    }
-
-    /**
-     * Returns the number of updates that need to committed for this participant.
-     * @return update count.
-     */
-    int totalUpdates();
-
-    /**
-     * Executes the prepare and commit steps in a single go.
-     * @return {@code true} is successful i.e updates are committed; {@code false} otherwise
-     */
-    CompletableFuture<Boolean> prepareAndCommit();
+    boolean hasPendingUpdates();
 
     /**
      * Executes the prepare phase.
+     *
      * @return {@code true} is successful; {@code false} otherwise
      */
     CompletableFuture<Boolean> prepare();
 
     /**
      * Attempts to execute the commit phase for previously prepared transaction.
+     *
      * @return future that is completed when the operation completes
      */
     CompletableFuture<Void> commit();
 
     /**
+     * Executes the prepare and commit phases atomically.
+     *
+     * @return {@code true} is successful; {@code false} otherwise
+     */
+    CompletableFuture<Boolean> prepareAndCommit();
+
+    /**
      * Attempts to execute the rollback phase for previously prepared transaction.
+     *
      * @return future that is completed when the operation completes
      */
     CompletableFuture<Void> rollback();
+
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
new file mode 100644
index 0000000..4429a1b
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.TransactionException;
+import org.onosproject.store.service.TransactionalMap;
+import org.onosproject.store.service.Version;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Base class for participants within a single {@link TransactionalMap}.
+ * <p>
+ * This class provides the basic functionality required by transactional map participants and provides methods
+ * for defining operations specific to individual isolation levels.
+ *
+ * @param <K> key type
+ * @param <V> value type.
+ */
+public abstract class TransactionalMapParticipant<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
+    private static final String TX_CLOSED_ERROR = "Transaction is closed";
+    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+    private static final String ERROR_NULL_KEY = "Null key is not allowed";
+
+    protected final ConsistentMap<K, V> backingMap;
+    protected final Transaction<MapUpdate<K, V>> transaction;
+    protected final Map<K, V> writeCache = Maps.newConcurrentMap();
+    protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
+    protected final List<MapUpdate<K, V>> log = new ArrayList<>();
+    protected volatile Version lock;
+
+    protected TransactionalMapParticipant(
+            ConsistentMap<K, V> backingMap,
+            Transaction<MapUpdate<K, V>> transaction) {
+        this.backingMap = backingMap;
+        this.transaction = transaction;
+    }
+
+    /**
+     * Starts the transaction for this partition when a read occurs.
+     * <p>
+     * Acquiring a pessimistic lock at the start of the transaction ensures that underlying cached maps have been
+     * synchronized prior to a read.
+     */
+    private void beginTransaction() {
+        if (!transaction.isOpen()) {
+            try {
+                lock = transaction.begin()
+                        .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new TransactionException.Interrupted();
+            } catch (TimeoutException e) {
+                throw new TransactionException.Timeout();
+            } catch (ExecutionException e) {
+                Throwables.propagateIfPossible(e.getCause());
+                throw new TransactionException(e.getCause());
+            }
+        }
+    }
+
+    @Override
+    public V get(K key) {
+        // Start the transaction for this primitive/partition if necessary.
+        beginTransaction();
+
+        checkState(transaction.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(key, ERROR_NULL_KEY);
+
+        if (deleteSet.contains(key)) {
+            return null;
+        }
+
+        V latest = writeCache.get(key);
+        if (latest != null) {
+            return latest;
+        } else {
+            return read(key);
+        }
+    }
+
+    /**
+     * Executes a get operation based on the transaction isolation level.
+     *
+     * @param key the key to look up
+     * @return the value
+     */
+    protected abstract V read(K key);
+
+    @Override
+    public boolean containsKey(K key) {
+        return get(key) != null;
+    }
+
+    @Override
+    public V put(K key, V value) {
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        writeCache.put(key, value);
+        deleteSet.remove(key);
+        return latest;
+    }
+
+    @Override
+    public V remove(K key) {
+        V latest = get(key);
+        if (latest != null) {
+            writeCache.remove(key);
+            deleteSet.add(key);
+        }
+        return latest;
+    }
+
+    @Override
+    public boolean remove(K key, V value) {
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        if (Objects.equal(value, latest)) {
+            remove(key);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        checkNotNull(oldValue, ERROR_NULL_VALUE);
+        checkNotNull(newValue, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        if (Objects.equal(oldValue, latest)) {
+            put(key, newValue);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        if (latest == null) {
+            put(key, value);
+        }
+        return latest;
+    }
+
+    @Override
+    public boolean hasPendingUpdates() {
+        return records().findAny().isPresent();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare() {
+        return transaction.prepare(log());
+    }
+
+    @Override
+    public CompletableFuture<Void> commit() {
+        return transaction.commit();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit() {
+        return transaction.prepareAndCommit(log());
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback() {
+        return transaction.rollback();
+    }
+
+    /**
+     * Returns a list of updates performed within this map partition.
+     *
+     * @return a list of map updates
+     */
+    protected List<MapUpdate<K, V>> log() {
+        return records().collect(Collectors.toList());
+    }
+
+    /**
+     * Returns a stream of updates performed within this map partition.
+     *
+     * @return a stream of map updates
+     */
+    protected abstract Stream<MapUpdate<K, V>> records();
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("backingMap", backingMap)
+                .add("updates", log())
+                .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 2afb5df..c2f167a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -29,11 +29,13 @@
 import java.util.stream.Collectors;
 
 import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Maps;
@@ -266,9 +268,27 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<K1, V1> transaction) {
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
         try {
-            return backingMap.prepare(transaction.map(keyEncoder, valueEncoder));
+            return backingMap.begin(transactionId);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K1, V1>> transactionLog) {
+        try {
+            return backingMap.prepare(transactionLog.map(record -> record.map(keyEncoder, valueEncoder)));
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K1, V1>> transactionLog) {
+        try {
+            return backingMap.prepareAndCommit(transactionLog.map(record -> record.map(keyEncoder, valueEncoder)));
         } catch (Exception e) {
             return Tools.exceptionalFuture(e);
         }
@@ -276,18 +296,17 @@
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return backingMap.commit(transactionId);
+        try {
+            return backingMap.commit(transactionId);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return backingMap.rollback(transactionId);
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K1, V1> transaction) {
         try {
-            return backingMap.prepareAndCommit(transaction.map(keyEncoder, valueEncoder));
+            return backingMap.rollback(transactionId);
         } catch (Exception e) {
             return Tools.exceptionalFuture(e);
         }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index 747008f..2f0683e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -18,11 +18,13 @@
 
 import com.google.common.collect.Maps;
 import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
@@ -361,27 +363,30 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(
-            MapTransaction<String, V1> transaction) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V1>> transactionLog) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V1>> transactionLog) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");    }
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");    }
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
 
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(
-            MapTransaction<String, V1> transaction) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");    }
     private class InternalBackingMapEventListener
             implements MapEventListener<String, V2> {
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index afa67f5..8a13c01 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -36,6 +36,7 @@
 
 import org.onlab.util.Match;
 import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
@@ -47,6 +48,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionBegin;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
@@ -58,7 +60,8 @@
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.ImmutableSet;
@@ -294,8 +297,22 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
-        return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return client.submit(new TransactionBegin()).thenApply(Version::new);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(
+            TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+        return client.submit(new TransactionPrepare(transactionLog))
+                .thenApply(v -> v == PrepareResult.OK);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(
+            TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+        return client.submit(new TransactionPrepareAndCommit(transactionLog))
+                .thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
@@ -305,13 +322,7 @@
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return client.submit(new TransactionRollback(transactionId))
-                .thenApply(v -> null);
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
-        return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+        return client.submit(new TransactionRollback(transactionId)).thenApply(v -> null);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 20b13f1..0c5dd76 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -30,8 +30,9 @@
 import java.util.Set;
 
 import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.MoreObjects;
@@ -202,39 +203,49 @@
     }
 
     /**
+     * Transaction begin query.
+     */
+    public static class TransactionBegin extends MapQuery<Long> {
+        @Override
+        public ConsistencyLevel consistency() {
+            return ConsistencyLevel.LINEARIZABLE;
+        }
+    }
+
+    /**
      * Map prepare command.
      */
     @SuppressWarnings("serial")
     public static class TransactionPrepare extends MapCommand<PrepareResult> {
-        private MapTransaction<String, byte[]> mapTransaction;
+        private TransactionLog<MapUpdate<String, byte[]>> transactionLog;
 
         public TransactionPrepare() {
         }
 
-        public TransactionPrepare(MapTransaction<String, byte[]> mapTransaction) {
-            this.mapTransaction = mapTransaction;
+        public TransactionPrepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+            this.transactionLog = transactionLog;
         }
 
-        public MapTransaction<String, byte[]> transaction() {
-            return mapTransaction;
+        public TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
+            return transactionLog;
         }
 
         @Override
         public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
             super.writeObject(buffer, serializer);
-            serializer.writeObject(mapTransaction, buffer);
+            serializer.writeObject(transactionLog, buffer);
         }
 
         @Override
         public void readObject(BufferInput<?> buffer, Serializer serializer) {
             super.readObject(buffer, serializer);
-            mapTransaction = serializer.readObject(buffer);
+            transactionLog = serializer.readObject(buffer);
         }
 
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(getClass())
-                    .add("mapTransaction", mapTransaction)
+                    .add("transactionLog", transactionLog)
                     .toString();
         }
     }
@@ -247,8 +258,8 @@
         public TransactionPrepareAndCommit() {
         }
 
-        public TransactionPrepareAndCommit(MapTransaction<String, byte[]> mapTransaction) {
-            super(mapTransaction);
+        public TransactionPrepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+            super(transactionLog);
         }
 
         @Override
@@ -592,6 +603,7 @@
             registry.register(Size.class, -769);
             registry.register(Listen.class, -770);
             registry.register(Unlisten.class, -771);
+            registry.register(TransactionBegin.class, -777);
             registry.register(TransactionPrepare.class, -772);
             registry.register(TransactionCommit.class, -773);
             registry.register(TransactionRollback.class, -774);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 3a99b02..e844b8f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -52,6 +52,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionBegin;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
@@ -60,7 +61,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
 import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
@@ -110,6 +111,7 @@
         // Commands
         executor.register(UpdateAndGet.class, this::updateAndGet);
         executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
+        executor.register(TransactionBegin.class, this::begin);
         executor.register(TransactionPrepare.class, this::prepare);
         executor.register(TransactionCommit.class, this::commit);
         executor.register(TransactionRollback.class, this::rollback);
@@ -374,6 +376,20 @@
     }
 
     /**
+     * Handles a begin commit.
+     *
+     * @param commit transaction begin commit
+     * @return transaction state version
+     */
+    protected long begin(Commit<? extends TransactionBegin> commit) {
+        try {
+            return commit.index();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
      * Handles an prepare and commit commit.
      *
      * @param commit transaction prepare and commit commit
@@ -382,7 +398,7 @@
     protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
         PrepareResult prepareResult = prepare(commit);
         if (prepareResult == PrepareResult.OK) {
-            commitInternal(commit.operation().transaction().transactionId(), commit.index());
+            commitInternal(commit.operation().transactionLog().transactionId(), commit.index());
         }
         return prepareResult;
     }
@@ -396,8 +412,8 @@
     protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
         boolean ok = false;
         try {
-            MapTransaction<String, byte[]> transaction = commit.operation().transaction();
-            for (MapUpdate<String, byte[]> update : transaction.updates()) {
+            TransactionLog<MapUpdate<String, byte[]>> transaction = commit.operation().transactionLog();
+            for (MapUpdate<String, byte[]> update : transaction.records()) {
                 String key = update.key();
                 if (preparedKeys.contains(key)) {
                     return PrepareResult.CONCURRENT_TRANSACTION;
@@ -416,7 +432,7 @@
             // No violations detected. Add to pendingTransactions and mark
             // modified keys as locked for updates.
             pendingTransactions.put(transaction.transactionId(), commit);
-            transaction.updates().forEach(u -> preparedKeys.add(u.key()));
+            transaction.records().forEach(u -> preparedKeys.add(u.key()));
             ok = true;
             return PrepareResult.OK;
         } catch (Exception e) {
@@ -453,16 +469,16 @@
         if (prepareCommit == null) {
             return CommitResult.UNKNOWN_TRANSACTION_ID;
         }
-        MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
+        TransactionLog<MapUpdate<String, byte[]>> transaction = prepareCommit.operation().transactionLog();
         long totalReferencesToCommit = transaction
-                .updates()
+                .records()
                 .stream()
                 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
                 .count();
         CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
                 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
         List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
-        for (MapUpdate<String, byte[]> update : transaction.updates()) {
+        for (MapUpdate<String, byte[]> update : transaction.records()) {
             String key = update.key();
             checkState(preparedKeys.remove(key), "key is not prepared");
             MapEntryValue previousValue = mapEntries.remove(key);
@@ -496,8 +512,8 @@
                 return RollbackResult.UNKNOWN_TRANSACTION_ID;
             } else {
                 prepareCommit.operation()
-                             .transaction()
-                             .updates()
+                             .transactionLog()
+                             .records()
                              .forEach(u -> preparedKeys.remove(u.key()));
                 prepareCommit.close();
                 return RollbackResult.OK;
@@ -648,7 +664,7 @@
 
         @Override
         public byte[] value() {
-            MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
+            TransactionLog<MapUpdate<String, byte[]>> transaction = completer.object().operation().transactionLog();
             return valueForKey(key, transaction);
         }
 
@@ -662,8 +678,8 @@
             completer.countDown();
         }
 
-        private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
-            MapUpdate<String, byte[]>  update = transaction.updates()
+        private byte[] valueForKey(String key, TransactionLog<MapUpdate<String, byte[]>> transaction) {
+            MapUpdate<String, byte[]>  update = transaction.records()
                                                            .stream()
                                                            .filter(u -> u.key().equals(key))
                                                            .findFirst()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
index 3dd34fd..bf462c9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
@@ -21,6 +21,7 @@
 import io.atomix.resource.AbstractResource;
 import io.atomix.resource.ResourceTypeInfo;
 import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
@@ -30,7 +31,8 @@
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
@@ -368,40 +370,38 @@
 
     @Override
     public CompletableFuture<NavigableSet<String>> navigableKeySet() {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");
+        throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 
     @Override
     public CompletableFuture<NavigableMap<String, byte[]>> subMap(
             String upperKey, String lowerKey, boolean inclusiveUpper,
             boolean inclusiveLower) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String,
-            byte[]> transaction) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");
+        throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 
     @Override
-    public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]>
-                                                          transaction) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");
+        throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        throw new UnsupportedOperationException("This operation is not yet " +
-                                                        "supported.");
+        throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
new file mode 100644
index 0000000..9245a03
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.HashMap;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.primitives.PartitionService;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Transaction manager test.
+ */
+public class TransactionManagerTest {
+
+    @Test
+    public void testTransactionMapCaching() throws Exception {
+        AsyncConsistentMap asyncMap = mock(AsyncConsistentMap.class);
+        expect(asyncMap.name()).andReturn("foo");
+        expect(asyncMap.addListener(anyObject(MapEventListener.class), anyObject(Executor.class)))
+                .andReturn(CompletableFuture.completedFuture(null)).anyTimes();
+        asyncMap.addStatusChangeListener(anyObject(Consumer.class));
+        expectLastCall().anyTimes();
+        expect(asyncMap.entrySet()).andReturn(CompletableFuture.completedFuture(new HashMap<>().entrySet())).anyTimes();
+
+        ConsistentMapBuilder mapBuilder = mock(ConsistentMapBuilder.class);
+        expect(mapBuilder.withName(anyString())).andReturn(mapBuilder).anyTimes();
+        expect(mapBuilder.withSerializer(anyObject(Serializer.class))).andReturn(mapBuilder).anyTimes();
+        expect(mapBuilder.buildAsyncMap()).andReturn(asyncMap).anyTimes();
+
+        DistributedPrimitiveCreator primitiveCreator = mock(DistributedPrimitiveCreator.class);
+        expect(primitiveCreator.newAsyncConsistentMap(anyString(), anyObject(Serializer.class)))
+                .andReturn(asyncMap).anyTimes();
+
+        StorageService storageService = mock(StorageService.class);
+        expect(storageService.consistentMapBuilder()).andReturn(mapBuilder);
+
+        PartitionService partitionService = mock(PartitionService.class);
+        Set<PartitionId> partitionIds = Sets.newHashSet(PartitionId.from(1), PartitionId.from(2), PartitionId.from(3));
+        expect(partitionService.getAllPartitionIds())
+                .andReturn(partitionIds).anyTimes();
+        expect(partitionService.getNumberOfPartitions())
+                .andReturn(partitionIds.size()).anyTimes();
+        expect(partitionService.getDistributedPrimitiveCreator(anyObject(PartitionId.class)))
+                .andReturn(primitiveCreator).anyTimes();
+
+        replay(storageService, partitionService, asyncMap, primitiveCreator, mapBuilder);
+
+        TransactionManager transactionManager = new TransactionManager(storageService, partitionService);
+        TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
+        TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
+        Serializer serializer = Serializer.using(KryoNamespaces.API);
+
+        PartitionedTransactionalMap<String, String> transactionalMap1 = (PartitionedTransactionalMap)
+                transactionManager.getTransactionalMap("foo", serializer, transactionCoordinator);
+        PartitionedTransactionalMap<String, String> transactionalMap2 = (PartitionedTransactionalMap)
+                transactionManager.getTransactionalMap("foo", serializer, transactionCoordinator);
+
+        assertSame(transactionalMap1.partitions.get(PartitionId.from(1)).transaction.transactionalObject,
+                transactionalMap2.partitions.get(PartitionId.from(1)).transaction.transactionalObject);
+        assertSame(transactionalMap1.partitions.get(PartitionId.from(2)).transaction.transactionalObject,
+                transactionalMap2.partitions.get(PartitionId.from(2)).transaction.transactionalObject);
+        assertSame(transactionalMap1.partitions.get(PartitionId.from(3)).transaction.transactionalObject,
+                transactionalMap2.partitions.get(PartitionId.from(3)).transaction.transactionalObject);
+    }
+
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
new file mode 100644
index 0000000..533ea97
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.hash.Hashing;
+import org.junit.Test;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.CommitStatus;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
+import static junit.framework.TestCase.assertNull;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.strictMock;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Transaction test.
+ */
+public class TransactionTest {
+
+    @Test
+    public void testTransaction() throws Exception {
+        AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
+        TransactionId transactionId = TransactionId.from("foo");
+        List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
+        Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+        assertEquals(transactionId, transaction.transactionId());
+
+        expect(asyncMap.begin(transactionId))
+                .andReturn(CompletableFuture.completedFuture(new Version(1)));
+        expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+                .andReturn(CompletableFuture.completedFuture(true));
+        expect(asyncMap.commit(transactionId))
+                .andReturn(CompletableFuture.completedFuture(null));
+        replay(asyncMap);
+
+        assertEquals(Transaction.State.ACTIVE, transaction.state());
+        assertEquals(1, transaction.begin().join().value());
+        assertEquals(Transaction.State.ACTIVE, transaction.state());
+        assertTrue(transaction.prepare(updates).join());
+        assertEquals(Transaction.State.PREPARED, transaction.state());
+        transaction.commit();
+        assertEquals(Transaction.State.COMMITTED, transaction.state());
+        verify(asyncMap);
+    }
+
+    @Test
+    public void testTransactionFailOnOutOfOrderCalls() throws Exception {
+        AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
+        TransactionId transactionId = TransactionId.from("foo");
+        List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
+        Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+
+        try {
+            transaction.prepare(updates);
+            fail();
+        } catch (IllegalStateException e) {
+        }
+
+        try {
+            transaction.commit();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+
+        try {
+            transaction.rollback();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+
+        expect(asyncMap.begin(transactionId))
+                .andReturn(CompletableFuture.completedFuture(new Version(1)));
+        expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+                .andReturn(CompletableFuture.completedFuture(true));
+        replay(asyncMap);
+
+        assertFalse(transaction.isOpen());
+        assertEquals(Transaction.State.ACTIVE, transaction.state());
+        assertEquals(1, transaction.begin().join().value());
+        assertTrue(transaction.isOpen());
+        assertEquals(Transaction.State.ACTIVE, transaction.state());
+        assertTrue(transaction.prepare(updates).join());
+        assertEquals(Transaction.State.PREPARED, transaction.state());
+
+        try {
+            transaction.begin();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+        verify(asyncMap);
+    }
+
+    @Test
+    public void testCoordinatedMapTransaction() throws Exception {
+        List<Object> mocks = new ArrayList<>();
+
+        Map<PartitionId, DefaultTransactionalMapParticipant<String, String>> participants = new HashMap<>();
+        List<PartitionId> sortedParticipants = new ArrayList<>();
+        TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
+        for (int i = 1; i <= 3; i++) {
+            AsyncConsistentMap<String, String> asyncMap = mock(AsyncConsistentMap.class);
+            mocks.add(asyncMap);
+
+            ConsistentMap<String, String> consistentMap = new TestConsistentMap<>();
+            Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+            PartitionId partitionId = PartitionId.from(i);
+            participants.put(partitionId, new DefaultTransactionalMapParticipant<>(consistentMap, transaction));
+            sortedParticipants.add(partitionId);
+        }
+
+        expect(participants.get(PartitionId.from(1)).transaction.transactionalObject
+                .begin(anyObject(TransactionId.class)))
+                .andReturn(CompletableFuture.completedFuture(new Version(1)));
+
+        expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.prepare(
+                new TransactionLog<>(transactionId, Arrays.asList(
+                        MapUpdate.<String, String>newBuilder()
+                                .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                                .withKey("foo")
+                                .withCurrentVersion(1)
+                                .build(),
+                        MapUpdate.<String, String>newBuilder()
+                                .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                                .withKey("baz")
+                                .withCurrentVersion(2)
+                                .build()
+                )))).andReturn(CompletableFuture.completedFuture(true));
+
+        expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.commit(transactionId))
+                .andReturn(CompletableFuture.completedFuture(null));
+
+        expect(participants.get(PartitionId.from(3)).transaction.transactionalObject
+                .begin(anyObject(TransactionId.class)))
+                .andReturn(CompletableFuture.completedFuture(new Version(1)));
+
+        expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.prepare(
+                new TransactionLog<>(transactionId, Arrays.asList(
+                        MapUpdate.<String, String>newBuilder()
+                                .withType(MapUpdate.Type.PUT_IF_ABSENT)
+                                .withKey("bar")
+                                .withValue("baz")
+                                .build()
+                )))).andReturn(CompletableFuture.completedFuture(true));
+
+        expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.commit(transactionId))
+                .andReturn(CompletableFuture.completedFuture(null));
+
+        TransactionManager transactionManager = mock(TransactionManager.class);
+        expect(transactionManager.updateState(anyObject(TransactionId.class), anyObject(Transaction.State.class)))
+                .andReturn(CompletableFuture.completedFuture(null))
+                .anyTimes();
+        expect(transactionManager.remove(anyObject(TransactionId.class)))
+                .andReturn(CompletableFuture.completedFuture(null))
+                .anyTimes();
+        mocks.add(transactionManager);
+
+        TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
+
+        Hasher<String> hasher = key -> {
+            int hashCode = Hashing.sha256().hashBytes(key.getBytes()).asInt();
+            return sortedParticipants.get(Math.abs(hashCode) % sortedParticipants.size());
+        };
+
+        expect(transactionManager.<String, String>getTransactionalMap(anyString(), anyObject(), anyObject()))
+                .andReturn(new PartitionedTransactionalMap(participants, hasher));
+
+        replay(mocks.toArray());
+
+        PartitionedTransactionalMap<String, String> transactionalMap = (PartitionedTransactionalMap)
+                transactionCoordinator.getTransactionalMap("foo", Serializer.using(KryoNamespaces.API));
+
+        // Sneak a couple entries in the first partition.
+        transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("foo", "bar");
+        transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("baz", "foo");
+
+        assertTrue(transactionalMap.containsKey("foo"));
+        assertEquals("bar", transactionalMap.remove("foo"));
+        assertFalse(transactionalMap.containsKey("bar"));
+        assertNull(transactionalMap.put("bar", "baz"));
+        assertTrue(transactionalMap.containsKey("bar"));
+        assertTrue(transactionalMap.containsKey("baz"));
+        assertFalse(transactionalMap.remove("baz", "baz"));
+        assertTrue(transactionalMap.remove("baz", "foo"));
+        assertFalse(transactionalMap.containsKey("baz"));
+
+        assertEquals(CommitStatus.SUCCESS, transactionCoordinator.commit().join());
+        verify(mocks.toArray());
+    }
+
+    private static class TestConsistentMap<K, V> implements ConsistentMap<K, V> {
+        private final Map<K, Versioned<V>> map = new HashMap<>();
+        private final AtomicLong version = new AtomicLong();
+
+        @Override
+        public String name() {
+            return null;
+        }
+
+        @Override
+        public Type primitiveType() {
+            return Type.CONSISTENT_MAP;
+        }
+
+        private long nextVersion() {
+            return version.incrementAndGet();
+        }
+
+        @Override
+        public int size() {
+            return map.size();
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return map.isEmpty();
+        }
+
+        @Override
+        public boolean containsKey(K key) {
+            return map.containsKey(key);
+        }
+
+        @Override
+        public boolean containsValue(V value) {
+            return map.containsValue(value);
+        }
+
+        @Override
+        public Versioned<V> get(K key) {
+            return map.get(key);
+        }
+
+        @Override
+        public Versioned<V> getOrDefault(K key, V defaultValue) {
+            return map.getOrDefault(key, new Versioned<>(defaultValue, 0));
+        }
+
+        @Override
+        public Versioned<V> computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Versioned<V> compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Versioned<V> computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Versioned<V> computeIf(K key,
+                Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Versioned<V> put(K key, V value) {
+            return map.put(key, new Versioned<>(value, nextVersion()));
+        }
+
+        @Override
+        public Versioned<V> putAndGet(K key, V value) {
+            return put(key, value);
+        }
+
+        @Override
+        public Versioned<V> remove(K key) {
+            return map.remove(key);
+        }
+
+        @Override
+        public void clear() {
+            map.clear();
+        }
+
+        @Override
+        public Set<K> keySet() {
+            return map.keySet();
+        }
+
+        @Override
+        public Collection<Versioned<V>> values() {
+            return map.values();
+        }
+
+        @Override
+        public Set<Map.Entry<K, Versioned<V>>> entrySet() {
+            return map.entrySet();
+        }
+
+        @Override
+        public Versioned<V> putIfAbsent(K key, V value) {
+            return map.putIfAbsent(key, new Versioned<>(value, nextVersion()));
+        }
+
+        @Override
+        public boolean remove(K key, V value) {
+            return map.remove(key, value);
+        }
+
+        @Override
+        public boolean remove(K key, long version) {
+            Versioned<V> value = map.get(key);
+            if (value != null && value.version() == version) {
+                map.remove(key);
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public Versioned<V> replace(K key, V value) {
+            return map.replace(key, new Versioned<>(value, nextVersion()));
+        }
+
+        @Override
+        public boolean replace(K key, V oldValue, V newValue) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean replace(K key, long oldVersion, V newValue) {
+            Versioned<V> value = map.get(key);
+            if (value != null && value.version() == oldVersion) {
+                map.put(key, new Versioned<>(newValue, nextVersion()));
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public void addListener(MapEventListener<K, V> listener, Executor executor) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void removeListener(MapEventListener<K, V> listener) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Map<K, V> asJavaMap() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 23d528b..ac4c60d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -26,7 +26,7 @@
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Arrays;
@@ -366,7 +366,8 @@
                 .withValue(value1)
                 .build();
 
-        MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
+        TransactionLog<MapUpdate<String, byte[]>> tx =
+                new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
 
         map.prepare(tx).thenAccept(result -> {
             assertEquals(true, result);
@@ -416,7 +417,7 @@
                 .withCurrentVersion(currFooVersion)
                 .build();
 
-        tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+        tx = new TransactionLog<>(TransactionId.from("tx2"), Arrays.asList(remove1));
 
         map.prepare(tx).thenAccept(result -> {
             assertTrue("prepare should succeed", result);
@@ -459,7 +460,8 @@
                 .withKey("foo")
                 .withValue(value1)
                 .build();
-        MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
+        TransactionLog<MapUpdate<String, byte[]>> tx =
+                new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
         map.prepare(tx).thenAccept(result -> {
             assertEquals(true, result);
         }).join();
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index dd3ef19..d638f5f 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -220,7 +220,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.SetEvent;
 import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
@@ -528,7 +528,7 @@
             .register(ExtensionSelectorType.class)
             .register(ExtensionTreatmentType.class)
             .register(TransactionId.class)
-            .register(MapTransaction.class)
+            .register(TransactionLog.class)
             .register(MapUpdate.class)
             .register(MapUpdate.Type.class)
             .register(Versioned.class)