Added distributed transaction support through a two phase commit protocol

Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java b/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java
new file mode 100644
index 0000000..6013a38
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cli.net;
+
+import java.util.Collection;
+
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onlab.util.Tools;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.store.service.StorageAdminService;
+import org.onosproject.store.service.Transaction;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * CLI to work with database transactions in the system.
+ */
+@Command(scope = "onos", name = "transactions",
+        description = "Utility for viewing and redriving database transactions")
+public class TransactionsCommand extends AbstractShellCommand {
+
+    @Option(name = "-r", aliases = "--redrive",
+            description = "Redrive stuck transactions while removing those that are done",
+            required = false, multiValued = false)
+    private boolean redrive = false;
+
+    private static final String FMT = "%-20s %-15s %-10s";
+
+    /**
+     * Displays transactions as text.
+     *
+     * @param transactions transactions
+     */
+    private void displayTransactions(Collection<Transaction> transactions) {
+        print("---------------------------------------------");
+        print(FMT, "Id", "State", "Updated");
+        print("---------------------------------------------");
+        transactions.forEach(txn -> print(FMT, txn.id(), txn.state(),  Tools.timeAgo(txn.lastUpdated())));
+        if (transactions.size() > 0) {
+            print("---------------------------------------------");
+        }
+    }
+
+    /**
+     * Converts collection of transactions into a JSON object.
+     *
+     * @param transactions transactions
+     */
+    private JsonNode json(Collection<Transaction> transactions) {
+        ObjectMapper mapper = new ObjectMapper();
+        ArrayNode txns = mapper.createArrayNode();
+
+        // Create a JSON node for each transaction
+        transactions.stream().forEach(txn -> {
+                    ObjectNode txnNode = mapper.createObjectNode();
+                    txnNode.put("id", txn.id())
+                    .put("state", txn.state().toString())
+                    .put("lastUpdated", txn.lastUpdated());
+                    txns.add(txnNode);
+                });
+
+        return txns;
+    }
+
+    @Override
+    protected void execute() {
+        StorageAdminService storageAdminService = get(StorageAdminService.class);
+
+        if (redrive) {
+            storageAdminService.redriveTransactions();
+            return;
+        }
+
+        Collection<Transaction> transactions = storageAdminService.getTransactions();
+        if (outputJson()) {
+            print("%s", json(transactions));
+        } else {
+            displayTransactions(transactions);
+        }
+    }
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 4c0497b..98328b6 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -233,6 +233,9 @@
             <action class="org.onosproject.cli.net.MapsListCommand"/>
         </command>
         <command>
+            <action class="org.onosproject.cli.net.TransactionsCommand"/>
+        </command>
+        <command>
             <action class="org.onosproject.cli.net.ClusterDevicesCommand"/>
             <completers>
                 <ref component-id="clusterIdCompleter"/>
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
index 21e3bb4..7d6ad4d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
@@ -35,6 +35,12 @@
     }
 
     /**
+     * ConsistentMap update conflicts with an in flight transaction.
+     */
+    public static class ConcurrentModification extends ConsistentMapException {
+    }
+
+    /**
      * ConsistentMap operation interrupted.
      */
     public static class Interrupted extends ConsistentMapException {
diff --git a/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
new file mode 100644
index 0000000..fd4373e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Database update operation.
+ *
+ */
+public final class DatabaseUpdate {
+
+    /**
+     * Type of database update operation.
+     */
+    public static enum Type {
+        /**
+         * Insert/Update entry without any checks.
+         */
+        PUT,
+        /**
+         * Insert an entry iff there is no existing entry for that key.
+         */
+        PUT_IF_ABSENT,
+
+        /**
+         * Update entry if the current version matches specified version.
+         */
+        PUT_IF_VERSION_MATCH,
+
+        /**
+         * Update entry if the current value matches specified value.
+         */
+        PUT_IF_VALUE_MATCH,
+
+        /**
+         * Remove entry without any checks.
+         */
+        REMOVE,
+
+        /**
+         * Remove entry if the current version matches specified version.
+         */
+        REMOVE_IF_VERSION_MATCH,
+
+        /**
+         * Remove entry if the current value matches specified value.
+         */
+        REMOVE_IF_VALUE_MATCH,
+    }
+
+    private Type type;
+    private String tableName;
+    private String key;
+    private byte[] value;
+    private byte[] currentValue;
+    private long currentVersion = -1;
+
+    /**
+     * Returns the type of update operation.
+     * @return type of update.
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the tableName being updated.
+     * @return table name.
+     */
+    public String tableName() {
+        return tableName;
+    }
+
+    /**
+     * Returns the item key being updated.
+     * @return item key
+     */
+    public String key() {
+        return key;
+    }
+
+    /**
+     * Returns the new value.
+     * @return item's target value.
+     */
+    public byte[] value() {
+        return value;
+    }
+
+    /**
+     * Returns the expected current value in the database value for the key.
+     * @return current value in database.
+     */
+    public byte[] currentValue() {
+        return currentValue;
+    }
+
+    /**
+     * Returns the expected current version in the database for the key.
+     * @return expected version.
+     */
+    public long currentVersion() {
+        return currentVersion;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("type", type)
+            .add("tableName", tableName)
+            .add("key", key)
+            .add("value", value)
+            .add("currentValue", currentValue)
+            .add("currentVersion", currentVersion)
+            .toString();
+    }
+
+    /**
+     * Creates a new builder instance.
+     *
+     * @return builder.
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * DatabaseUpdate builder.
+     *
+     */
+    public static final class Builder {
+
+        private DatabaseUpdate update = new DatabaseUpdate();
+
+        public DatabaseUpdate build() {
+            validateInputs();
+            return update;
+        }
+
+        public Builder withType(Type type) {
+            update.type = checkNotNull(type, "type cannot be null");
+            return this;
+        }
+
+        public Builder withTableName(String tableName) {
+            update.tableName = checkNotNull(tableName, "tableName cannot be null");
+            return this;
+        }
+
+        public Builder withKey(String key) {
+            update.key = checkNotNull(key, "key cannot be null");
+            return this;
+        }
+
+        public Builder withCurrentValue(byte[] value) {
+            update.currentValue = checkNotNull(value, "currentValue cannot be null");
+            return this;
+        }
+
+        public Builder withValue(byte[] value) {
+            update.value = checkNotNull(value, "value cannot be null");
+            return this;
+        }
+
+        public Builder withCurrentVersion(long version) {
+            checkArgument(version >= 0, "version cannot be negative");
+            update.currentVersion = version;
+            return this;
+        }
+
+        private void validateInputs() {
+            checkNotNull(update.type, "type must be specified");
+            checkNotNull(update.tableName, "table name must be specified");
+            checkNotNull(update.key, "key must be specified");
+            switch (update.type) {
+            case PUT:
+            case PUT_IF_ABSENT:
+                checkNotNull(update.value, "value must be specified.");
+                break;
+            case PUT_IF_VERSION_MATCH:
+                checkNotNull(update.value, "value must be specified.");
+                checkState(update.currentVersion >= 0, "current version must be specified");
+                break;
+            case PUT_IF_VALUE_MATCH:
+                checkNotNull(update.value, "value must be specified.");
+                checkNotNull(update.currentValue, "currentValue must be specified.");
+                break;
+            case REMOVE:
+                break;
+            case REMOVE_IF_VERSION_MATCH:
+                checkState(update.currentVersion >= 0, "current version must be specified");
+                break;
+            case REMOVE_IF_VALUE_MATCH:
+                checkNotNull(update.currentValue, "currentValue must be specified.");
+                break;
+            default:
+                throw new IllegalStateException("Unknown operation type");
+            }
+        }
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
index 572867c..eb129fe 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.service;
 
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -35,4 +36,16 @@
      * @return list of map information
      */
     List<MapInfo> getMapInfo();
+
+    /**
+     * Returns all the transactions in the system.
+     *
+     * @return collection of transactions
+     */
+    Collection<Transaction> getTransactions();
+
+    /**
+     * Redrives stuck transactions while removing those that are done.
+     */
+    void redriveTransactions();
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index e165a95..5ea0420 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -29,13 +29,6 @@
 public interface StorageService {
 
     /**
-     * Creates a new transaction context.
-     *
-     * @return transaction context
-     */
-    TransactionContext createTransactionContext();
-
-    /**
      * Creates a new EventuallyConsistentMapBuilder.
      *
      * @param <K> key type
@@ -45,11 +38,11 @@
     <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
 
     /**
-     * Creates a new EventuallyConsistentMapBuilder.
+     * Creates a new ConsistentMapBuilder.
      *
      * @param <K> key type
      * @param <V> value type
-     * @return builder for an eventually consistent map
+     * @return builder for a consistent map
      */
     <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
 
@@ -60,4 +53,11 @@
      * @return builder for an distributed set
      */
     <E> SetBuilder<E> setBuilder();
-}
\ No newline at end of file
+
+    /**
+     * Creates a new transaction context.
+     *
+     * @return transaction context
+     */
+    TransactionContext createTransactionContext();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Transaction.java b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
new file mode 100644
index 0000000..514e1ab
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.List;
+
+/**
+ * An immutable transaction object.
+ */
+public interface Transaction {
+
+    public enum State {
+        /**
+         * Indicates a new transaction that is about to be prepared. All transactions
+         * start their life in this state.
+         */
+        PREPARING,
+
+        /**
+         * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+         */
+        PREPARED,
+
+        /**
+         * Indicates a transaction that is about to be committed.
+         */
+        COMMITTING,
+
+        /**
+         * Indicates a transaction that has successfully committed.
+         */
+        COMMITTED,
+
+        /**
+         * Indicates a transaction that is about to be rolled back.
+         */
+        ROLLINGBACK,
+
+        /**
+         * Indicates a transaction that has been rolled back and all locks are released.
+         */
+        ROLLEDBACK
+    }
+
+    /**
+     * Returns the transaction Id.
+     *
+     * @return transaction id
+     */
+    long id();
+
+    /**
+     * Returns the list of updates that are part of this transaction.
+     *
+     * @return list of database updates
+     */
+    List<DatabaseUpdate> updates();
+
+    /**
+     * Returns the current state of this transaction.
+     *
+     * @return transaction state
+     */
+    State state();
+
+    /**
+     * Returns true if this transaction has completed execution.
+     *
+     * @return true is yes, false otherwise
+     */
+    public default boolean isDone() {
+        return state() == State.COMMITTED || state() == State.ROLLEDBACK;
+    }
+
+    /**
+     * Returns a new transaction that is created by transitioning this one to the specified state.
+     *
+     * @param newState destination state
+     * @return a new transaction instance similar to the current one but its state set to specified state
+     */
+    Transaction transition(State newState);
+
+    /**
+     * Returns the system time when the transaction was last updated.
+     *
+     * @return last update time
+     */
+    public long lastUpdated();
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
index b404bae..94942e2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
@@ -19,21 +19,31 @@
 /**
  * Provides a context for transactional operations.
  * <p>
- * A transaction context provides a boundary within which transactions
- * are run. It also is a place where all modifications made within a transaction
- * are cached until the point when the transaction commits or aborts. It thus ensures
- * isolation of work happening with in the transaction boundary.
- * <p>
  * A transaction context is a vehicle for grouping operations into a unit with the
  * properties of atomicity, isolation, and durability. Transactions also provide the
  * ability to maintain an application's invariants or integrity constraints,
  * supporting the property of consistency. Together these properties are known as ACID.
+ * <p>
+ * A transaction context provides a boundary within which transactions
+ * are run. It also is a place where all modifications made within a transaction
+ * are cached until the point when the transaction commits or aborts. It thus ensures
+ * isolation of work happening with in the transaction boundary. Within a transaction
+ * context isolation level is REPEATABLE_READS i.e. only data that is committed can be read.
+ * The only uncommitted data that can be read is the data modified by the current transaction.
  */
 public interface TransactionContext {
 
     /**
+     * Returns the unique transactionId.
+     *
+     * @return transaction id
+     */
+    long transactionId();
+
+    /**
      * Returns if this transaction context is open.
-     * @return true if open, false otherwise.
+     *
+     * @return true if open, false otherwise
      */
     boolean isOpen();
 
@@ -45,22 +55,24 @@
     /**
      * Commits a transaction that was previously started thereby making its changes permanent
      * and externally visible.
-     * @throws TransactionException if transaction fails to commit.
+     *
+     * @throws TransactionException if transaction fails to commit
      */
     void commit();
 
     /**
-     * Rolls back the current transaction, discarding all its changes.
+     * Aborts any changes made in this transaction context and discarding all locally cached updates.
      */
-    void rollback();
+    void abort();
 
     /**
-     * Creates a new transactional map.
+     * Returns a transactional map data structure with the specified name.
+     *
      * @param <K> key type
      * @param <V> value type
-     * @param mapName name of the transactional map.
-     * @param serializer serializer to use for encoding/decoding keys and vaulues.
-     * @return new Transactional Map.
+     * @param mapName name of the transactional map
+     * @param serializer serializer to use for encoding/decoding keys and values of the map
+     * @return Transactional Map
      */
-    <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName, Serializer serializer);
+    <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, Serializer serializer);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionException.java b/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
index 8261fbd..6135394 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
@@ -41,8 +41,14 @@
     }
 
     /**
-     * Transaction failure due to optimistic concurrency failure.
+     * Transaction failure due to optimistic concurrency violation.
      */
     public static class OptimisticConcurrencyFailure extends TransactionException {
     }
+
+    /**
+     * Transaction failure due to a conflicting transaction in progress.
+     */
+    public static class ConcurrentModification extends TransactionException {
+    }
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java b/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
index c59600c..ebc6611 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
@@ -16,15 +16,12 @@
 
 package org.onosproject.store.service;
 
-import java.util.Collection;
-import java.util.Set;
-import java.util.Map.Entry;
 
 /**
  * Transactional Map data structure.
  * <p>
- * A TransactionalMap is created by invoking {@link TransactionContext#createTransactionalMap createTransactionalMap}
- * method. All operations performed on this map with in a transaction boundary are invisible externally
+ * A TransactionalMap is created by invoking {@link TransactionContext#getTransactionalMap getTransactionalMap}
+ * method. All operations performed on this map within a transaction boundary are invisible externally
  * until the point when the transaction commits. A commit usually succeeds in the absence of conflicts.
  *
  * @param <K> type of key.
@@ -33,36 +30,6 @@
 public interface TransactionalMap<K, V> {
 
     /**
-     * Returns the number of entries in the map.
-     *
-     * @return map size.
-     */
-    int size();
-
-    /**
-     * Returns true if the map is empty.
-     *
-     * @return true if map has no entries, false otherwise.
-     */
-    boolean isEmpty();
-
-    /**
-     * Returns true if this map contains a mapping for the specified key.
-     *
-     * @param key key
-     * @return true if map contains key, false otherwise.
-     */
-    boolean containsKey(K key);
-
-    /**
-     * Returns true if this map contains the specified value.
-     *
-     * @param value value
-     * @return true if map contains value, false otherwise.
-     */
-    boolean containsValue(V value);
-
-    /**
      * Returns the value to which the specified key is mapped, or null if this
      * map contains no mapping for the key.
      *
@@ -94,45 +61,6 @@
     V remove(K key);
 
     /**
-     * Removes all of the mappings from this map (optional operation).
-     * The map will be empty after this call returns.
-     */
-    void clear();
-
-    /**
-     * Returns a Set view of the keys contained in this map.
-     * This method differs from the behavior of java.util.Map.keySet() in that
-     * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
-     * Attempts to modify the returned set, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return a set of the keys contained in this map
-     */
-    Set<K> keySet();
-
-    /**
-     * Returns the collection of values contained in this map.
-     * This method differs from the behavior of java.util.Map.values() in that
-     * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
-     * Attempts to modify the returned collection, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return a collection of the values contained in this map
-     */
-    Collection<V> values();
-
-    /**
-     * Returns the set of entries contained in this map.
-     * This method differs from the behavior of java.util.Map.entrySet() in that
-     * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
-     * Attempts to modify the returned set, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return set of entries contained in this map.
-     */
-    Set<Entry<K, V>> entrySet();
-
-    /**
      * If the specified key is not already associated with a value
      * associates it with the given value and returns null, else returns the current value.
      *
diff --git a/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java b/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
deleted file mode 100644
index b4b05fc..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Database update operation.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-public class UpdateOperation<K, V> {
-
-    /**
-     * Type of database update operation.
-     */
-    public static enum Type {
-        PUT,
-        PUT_IF_ABSENT,
-        PUT_IF_VERSION_MATCH,
-        PUT_IF_VALUE_MATCH,
-        REMOVE,
-        REMOVE_IF_VERSION_MATCH,
-        REMOVE_IF_VALUE_MATCH,
-    }
-
-    private Type type;
-    private String tableName;
-    private K key;
-    private V value;
-    private V currentValue;
-    private long currentVersion = -1;
-
-    /**
-     * Returns the type of update operation.
-     * @return type of update.
-     */
-    public Type type() {
-        return type;
-    }
-
-    /**
-     * Returns the tableName being updated.
-     * @return table name.
-     */
-    public String tableName() {
-        return tableName;
-    }
-
-    /**
-     * Returns the item key being updated.
-     * @return item key
-     */
-    public K key() {
-        return key;
-    }
-
-    /**
-     * Returns the new value.
-     * @return item's target value.
-     */
-    public V value() {
-        return value;
-    }
-
-    /**
-     * Returns the expected current value in the database value for the key.
-     * @return current value in database.
-     */
-    public V currentValue() {
-        return currentValue;
-    }
-
-    /**
-     * Returns the expected current version in the database for the key.
-     * @return expected version.
-     */
-    public long currentVersion() {
-        return currentVersion;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("type", type)
-            .add("tableName", tableName)
-            .add("key", key)
-            .add("value", value)
-            .add("currentValue", currentValue)
-            .add("currentVersion", currentVersion)
-            .toString();
-    }
-
-    /**
-     * Creates a new builder instance.
-     * @param <K> key type.
-     * @param <V> value type.
-     *
-     * @return builder.
-     */
-    public static <K, V> Builder<K, V> newBuilder() {
-        return new Builder<>();
-    }
-
-    /**
-     * UpdatOperation builder.
-     *
-     * @param <K> key type.
-     * @param <V> value type.
-     */
-    public static final class Builder<K, V> {
-
-        private UpdateOperation<K, V> operation = new UpdateOperation<>();
-
-        public UpdateOperation<K, V> build() {
-            validateInputs();
-            return operation;
-        }
-
-        public Builder<K, V> withType(Type type) {
-            operation.type = checkNotNull(type, "type cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withTableName(String tableName) {
-            operation.tableName = checkNotNull(tableName, "tableName cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withKey(K key) {
-            operation.key = checkNotNull(key, "key cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withCurrentValue(V value) {
-            operation.currentValue = checkNotNull(value, "currentValue cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withValue(V value) {
-            operation.value = checkNotNull(value, "value cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withCurrentVersion(long version) {
-            checkArgument(version >= 0, "version cannot be negative");
-            operation.currentVersion = version;
-            return this;
-        }
-
-        private void validateInputs() {
-            checkNotNull(operation.type, "type must be specified");
-            checkNotNull(operation.tableName, "table name must be specified");
-            checkNotNull(operation.key, "key must be specified");
-            switch (operation.type) {
-            case PUT:
-            case PUT_IF_ABSENT:
-                checkNotNull(operation.value, "value must be specified.");
-                break;
-            case PUT_IF_VERSION_MATCH:
-                checkNotNull(operation.value, "value must be specified.");
-                checkState(operation.currentVersion >= 0, "current version must be specified");
-                break;
-            case PUT_IF_VALUE_MATCH:
-                checkNotNull(operation.value, "value must be specified.");
-                checkNotNull(operation.currentValue, "currentValue must be specified.");
-                break;
-            case REMOVE:
-                break;
-            case REMOVE_IF_VERSION_MATCH:
-                checkState(operation.currentVersion >= 0, "current version must be specified");
-                break;
-            case REMOVE_IF_VALUE_MATCH:
-                checkNotNull(operation.currentValue, "currentValue must be specified.");
-                break;
-            default:
-                throw new IllegalStateException("Unknown operation type");
-            }
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 010276a..363f330 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -34,6 +34,7 @@
 import net.kuujo.copycat.protocol.Protocol;
 import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
 
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,6 +42,7 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.store.cluster.impl.DistributedClusterStore;
 import org.onosproject.store.cluster.impl.NodeInfo;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -53,11 +55,13 @@
 import org.onosproject.store.service.SetBuilder;
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.TransactionContext;
 import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -92,6 +96,9 @@
     private PartitionedDatabase partitionedDatabase;
     private Database inMemoryDatabase;
 
+    private TransactionManager transactionManager;
+    private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
@@ -188,6 +195,7 @@
             Thread.currentThread().interrupt();
             log.warn("Failed to complete database initialization.");
         }
+        transactionManager = new TransactionManager(partitionedDatabase);
         log.info("Started");
     }
 
@@ -218,7 +226,7 @@
 
     @Override
     public TransactionContext createTransactionContext() {
-        return new DefaultTransactionContext(partitionedDatabase);
+        return new DefaultTransactionContext(partitionedDatabase, transactionIdGenerator.getNewId());
     }
 
     @Override
@@ -331,6 +339,11 @@
             .collect(Collectors.toList());
     }
 
+    @Override
+    public Collection<Transaction> getTransactions() {
+        return complete(transactionManager.getTransactions());
+    }
+
     private static <T> T complete(CompletableFuture<T> future) {
         try {
             return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -343,4 +356,9 @@
             throw new ConsistentMapException(e.getCause());
         }
     }
+
+    @Override
+    public void redriveTransactions() {
+        getTransactions().stream().forEach(transactionManager::execute);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 3ea06fb..3578525 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -17,12 +17,11 @@
 package org.onosproject.store.consistent.impl;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -87,7 +86,7 @@
    * @param value The value to set.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
+  CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value);
 
   /**
    * Removes a value from the table.
@@ -96,7 +95,7 @@
    * @param key The key to remove.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Versioned<V>> remove(String tableName, K key);
+  CompletableFuture<Result<Versioned<V>>> remove(String tableName, K key);
 
   /**
    * Clears the table.
@@ -104,7 +103,7 @@
    * @param tableName table name
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Void> clear(String tableName);
+  CompletableFuture<Result<Void>> clear(String tableName);
 
   /**
    * Gets a set of keys in the table.
@@ -138,7 +137,7 @@
    * @param value The value to set if the given key does not exist.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
+  CompletableFuture<Result<Versioned<V>>> putIfAbsent(String tableName, K key, V value);
 
   /**
    * Removes a key and if the existing value for that key matches the specified value.
@@ -148,7 +147,7 @@
    * @param value The value to remove.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Boolean> remove(String tableName, K key, V value);
+  CompletableFuture<Result<Boolean>> remove(String tableName, K key, V value);
 
   /**
    * Removes a key and if the existing version for that key matches the specified version.
@@ -158,7 +157,7 @@
    * @param version The expected version.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Boolean> remove(String tableName, K key, long version);
+  CompletableFuture<Result<Boolean>> remove(String tableName, K key, long version);
 
   /**
    * Replaces the entry for the specified key only if currently mapped to the specified value.
@@ -169,7 +168,7 @@
    * @param newValue The value with which to replace the given key and value.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
+  CompletableFuture<Result<Boolean>> replace(String tableName, K key, V oldValue, V newValue);
 
   /**
    * Replaces the entry for the specified key only if currently mapped to the specified version.
@@ -180,14 +179,42 @@
    * @param newValue The value with which to replace the given key and version.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
+  CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
 
   /**
-   * Perform a atomic batch update operation i.e. either all operations in batch succeed or
-   * none do and no state changes are made.
+   * Prepare and commit the specified transaction.
    *
-   * @param updates list of updates to apply atomically.
-   * @return A completable future to be completed with the result once complete.
+   * @param transaction transaction to commit (after preparation)
+   * @return A completable future to be completed with the result once complete
    */
-  CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
+  CompletableFuture<Boolean> prepareAndCommit(Transaction transaction);
+
+  /**
+   * Prepare the specified transaction for commit. A successful prepare implies
+   * all the affected resources are locked thus ensuring no concurrent updates can interfere.
+   *
+   * @param transaction transaction to prepare (for commit)
+   * @return A completable future to be completed with the result once complete. The future is completed
+   * with true if the transaction is successfully prepared i.e. all pre-conditions are met and
+   * applicable resources locked.
+   */
+  CompletableFuture<Boolean> prepare(Transaction transaction);
+
+  /**
+   * Commit the specified transaction. A successful commit implies
+   * all the updates are applied, are now durable and are now visible externally.
+   *
+   * @param transaction transaction to commit
+   * @return A completable future to be completed with the result once complete
+   */
+  CompletableFuture<Boolean> commit(Transaction transaction);
+
+  /**
+   * Rollback the specified transaction. A successful rollback implies
+   * all previously acquired locks for the affected resources are released.
+   *
+   * @param transaction transaction to rollback
+   * @return A completable future to be completed with the result once complete
+   */
+  CompletableFuture<Boolean> rollback(Transaction transaction);
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index 237c85b..f6fa6d1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -23,6 +23,8 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 import net.kuujo.copycat.cluster.internal.MemberInfo;
@@ -63,8 +65,14 @@
     private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
             .nextId(KryoNamespace.FLOATING_ID)
             .register(Versioned.class)
+            .register(DatabaseUpdate.class)
+            .register(DatabaseUpdate.Type.class)
             .register(Pair.class)
             .register(ImmutablePair.class)
+            .register(Result.class)
+            .register(Result.Status.class)
+            .register(DefaultTransaction.class)
+            .register(Transaction.State.class)
             .build();
 
     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 097261d..6685dde 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -17,11 +17,10 @@
 package org.onosproject.store.consistent.impl;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 import net.kuujo.copycat.state.Command;
@@ -62,13 +61,13 @@
   Versioned<V> get(String tableName, K key);
 
   @Command
-  Versioned<V> put(String tableName, K key, V value);
+  Result<Versioned<V>> put(String tableName, K key, V value);
 
   @Command
-  Versioned<V> remove(String tableName, K key);
+  Result<Versioned<V>> remove(String tableName, K key);
 
   @Command
-  void clear(String tableName);
+  Result<Void> clear(String tableName);
 
   @Query
   Set<K> keySet(String tableName);
@@ -80,20 +79,29 @@
   Set<Entry<K, Versioned<V>>> entrySet(String tableName);
 
   @Command
-  Versioned<V> putIfAbsent(String tableName, K key, V value);
+  Result<Versioned<V>> putIfAbsent(String tableName, K key, V value);
 
   @Command
-  boolean remove(String tableName, K key, V value);
+  Result<Boolean> remove(String tableName, K key, V value);
 
   @Command
-  boolean remove(String tableName, K key, long version);
+  Result<Boolean> remove(String tableName, K key, long version);
 
   @Command
-  boolean replace(String tableName, K key, V oldValue, V newValue);
+  Result<Boolean> replace(String tableName, K key, V oldValue, V newValue);
 
   @Command
-  boolean replace(String tableName, K key, long oldVersion, V newValue);
+  Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
 
   @Command
-  boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+  boolean prepareAndCommit(Transaction transaction);
+
+  @Command
+  boolean prepare(Transaction transaction);
+
+  @Command
+  boolean commit(Transaction transaction);
+
+  @Command
+  boolean rollback(Transaction transaction);
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index c86a6ea..09f612e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -28,6 +28,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.HexString;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
 
@@ -108,6 +109,7 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
         return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
+                .thenApply(this::unwrapResult)
                 .thenApply(v -> v != null
                 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
@@ -116,13 +118,14 @@
     public CompletableFuture<Versioned<V>> remove(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
         return database.remove(name, keyCache.getUnchecked(key))
+                .thenApply(this::unwrapResult)
                 .thenApply(v -> v != null
                 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return database.clear(name);
+        return database.clear(name).thenApply(this::unwrapResult);
     }
 
     @Override
@@ -154,23 +157,27 @@
     public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return database.putIfAbsent(
-                name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v ->
-                v != null ?
-                new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+        return database.putIfAbsent(name,
+                                    keyCache.getUnchecked(key),
+                                    serializer.encode(value))
+               .thenApply(this::unwrapResult)
+               .thenApply(v -> v != null ?
+                       new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
 
     @Override
     public CompletableFuture<Boolean> remove(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
+        return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
+                .thenApply(this::unwrapResult);
     }
 
     @Override
     public CompletableFuture<Boolean> remove(K key, long version) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return database.remove(name, keyCache.getUnchecked(key), version);
+        return database.remove(name, keyCache.getUnchecked(key), version)
+                .thenApply(this::unwrapResult);
 
     }
 
@@ -179,14 +186,16 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(newValue, ERROR_NULL_VALUE);
         byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
-        return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
+        return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
+                .thenApply(this::unwrapResult);
     }
 
     @Override
     public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(newValue, ERROR_NULL_VALUE);
-        return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
+        return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue))
+                .thenApply(this::unwrapResult);
     }
 
     private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
@@ -197,4 +206,14 @@
                         e.getValue().version(),
                         e.getValue().creationTime()));
     }
+
+    private <T> T unwrapResult(Result<T> result) {
+        if (result.status() == Result.Status.LOCKED) {
+            throw new ConsistentMapException.ConcurrentModification();
+        } else if (result.success()) {
+            return result.value();
+        } else {
+            throw new IllegalStateException("Must not be here");
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index c14e57c..8ed7670 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -23,13 +23,12 @@
 import net.kuujo.copycat.util.concurrent.Futures;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -39,7 +38,7 @@
     private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
     private DatabaseProxy<String, byte[]> proxy;
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public DefaultDatabase(ResourceContext context) {
         super(context);
         this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
@@ -91,17 +90,17 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
         return checkOpen(() -> proxy.put(tableName, key, value));
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+    public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
         return checkOpen(() -> proxy.remove(tableName, key));
     }
 
     @Override
-    public CompletableFuture<Void> clear(String tableName) {
+    public CompletableFuture<Result<Void>> clear(String tableName) {
         return checkOpen(() -> proxy.clear(tableName));
     }
 
@@ -121,33 +120,48 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
         return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
         return checkOpen(() -> proxy.remove(tableName, key, value));
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
         return checkOpen(() -> proxy.remove(tableName, key, version));
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
         return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(String tableName, String key, long oldVersion, byte[] newValue) {
         return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
     }
 
     @Override
-    public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
-        return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+    public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+        return checkOpen(() -> proxy.prepareAndCommit(transaction));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(Transaction transaction) {
+        return checkOpen(() -> proxy.prepare(transaction));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> commit(Transaction transaction) {
+        return checkOpen(() -> proxy.commit(transaction));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> rollback(Transaction transaction) {
+        return checkOpen(() -> proxy.rollback(transaction));
     }
 
     @Override
@@ -180,4 +194,4 @@
         }
         return false;
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index e63a3d8..c190a28 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,43 +18,58 @@
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DatabaseUpdate.Type;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 
 import net.kuujo.copycat.state.Initializer;
 import net.kuujo.copycat.state.StateContext;
 
 /**
  * Default database state.
- *
- * @param <K> key type
- * @param <V> value type
  */
-public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
-
+public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
     private Long nextVersion;
-    private Map<String, Map<K, Versioned<V>>> tables;
+    private Map<String, Map<String, Versioned<byte[]>>> tables;
+
+    /**
+     * This locks map has a structure similar to the "tables" map above and
+     * holds all the provisional updates made during a transaction's prepare phase.
+     * The entry value is represented as the tuple: (transactionId, newValue)
+     * If newValue == null that signifies this update is attempting to
+     * delete the existing value.
+     * This map also serves as a lock on the entries that are being updated.
+     * The presence of a entry in this map indicates that element is
+     * participating in a transaction and is currently locked for updates.
+     */
+    private Map<String, Map<String, Pair<Long, byte[]>>> locks;
 
     @Initializer
     @Override
-    public void init(StateContext<DatabaseState<K, V>> context) {
+    public void init(StateContext<DatabaseState<String, byte[]>> context) {
         tables = context.get("tables");
         if (tables == null) {
-            tables = new HashMap<>();
+            tables = Maps.newConcurrentMap();
             context.put("tables", tables);
         }
+        locks = context.get("locks");
+        if (locks == null) {
+            locks = Maps.newConcurrentMap();
+            context.put("locks", locks);
+        }
         nextVersion = context.get("nextVersion");
         if (nextVersion == null) {
             nextVersion = new Long(0);
@@ -62,15 +77,6 @@
         }
     }
 
-    private Map<K, Versioned<V>> getTableMap(String tableName) {
-        Map<K, Versioned<V>> table = tables.get(tableName);
-        if (table == null) {
-            table = new HashMap<>();
-            tables.put(tableName, table);
-        }
-        return table;
-    }
-
     @Override
     public Set<String> tableNames() {
         return new HashSet<>(tables.keySet());
@@ -87,47 +93,55 @@
     }
 
     @Override
-    public boolean containsKey(String tableName, K key) {
+    public boolean containsKey(String tableName, String key) {
         return getTableMap(tableName).containsKey(key);
     }
 
     @Override
-    public boolean containsValue(String tableName, V value) {
-        return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+    public boolean containsValue(String tableName, byte[] value) {
+        return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
     }
 
     @Override
-    public Versioned<V> get(String tableName, K key) {
+    public Versioned<byte[]> get(String tableName, String key) {
         return getTableMap(tableName).get(key);
     }
 
     @Override
-    public Versioned<V> put(String tableName, K key, V value) {
-        return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+    public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+        return isLockedForUpdates(tableName, key)
+                ? Result.locked()
+                : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
     }
 
     @Override
-    public Versioned<V> remove(String tableName, K key) {
-        return getTableMap(tableName).remove(key);
+    public Result<Versioned<byte[]>> remove(String tableName, String key) {
+        return isLockedForUpdates(tableName, key)
+                ? Result.locked()
+                : Result.ok(getTableMap(tableName).remove(key));
     }
 
     @Override
-    public void clear(String tableName) {
+    public Result<Void> clear(String tableName) {
+        if (areTransactionsInProgress(tableName)) {
+            return Result.locked();
+        }
         getTableMap(tableName).clear();
+        return Result.ok(null);
     }
 
     @Override
-    public Set<K> keySet(String tableName) {
+    public Set<String> keySet(String tableName) {
         return ImmutableSet.copyOf(getTableMap(tableName).keySet());
     }
 
     @Override
-    public Collection<Versioned<V>> values(String tableName) {
+    public Collection<Versioned<byte[]>> values(String tableName) {
         return ImmutableList.copyOf(getTableMap(tableName).values());
     }
 
     @Override
-    public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+    public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
         return ImmutableSet.copyOf(getTableMap(tableName)
                 .entrySet()
                 .stream()
@@ -136,93 +150,113 @@
     }
 
     @Override
-    public Versioned<V> putIfAbsent(String tableName, K key, V value) {
-        Versioned<V> existingValue = getTableMap(tableName).get(key);
-        return existingValue != null ? existingValue : put(tableName, key, value);
-    }
-
-    @Override
-    public boolean remove(String tableName, K key, V value) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && checkEquality(existing.value(), value)) {
-            getTableMap(tableName).remove(key);
-            return true;
+    public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
         }
-        return false;
+        Versioned<byte[]> existingValue = get(tableName, key);
+        Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
+        return Result.ok(currentValue);
     }
 
     @Override
-    public boolean remove(String tableName, K key, long version) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
+    public Result<Boolean> remove(String tableName, String key, byte[] value) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
+        if (existing != null && Arrays.equals(existing.value(), value)) {
+            getTableMap(tableName).remove(key);
+            return Result.ok(true);
+        }
+        return Result.ok(false);
+    }
+
+    @Override
+    public Result<Boolean> remove(String tableName, String key, long version) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
         if (existing != null && existing.version() == version) {
             remove(tableName, key);
-            return true;
+            return Result.ok(true);
         }
-        return false;
+        return Result.ok(false);
     }
 
     @Override
-    public boolean replace(String tableName, K key, V oldValue, V newValue) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && checkEquality(existing.value(), oldValue)) {
+    public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
+        if (existing != null && Arrays.equals(existing.value(), oldValue)) {
             put(tableName, key, newValue);
-            return true;
+            return Result.ok(true);
         }
-        return false;
+        return Result.ok(false);
     }
 
     @Override
-    public boolean replace(String tableName, K key, long oldVersion, V newValue) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
+    public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
         if (existing != null && existing.version() == oldVersion) {
             put(tableName, key, newValue);
+            return Result.ok(true);
+        }
+        return Result.ok(false);
+    }
+
+    @Override
+    public boolean prepareAndCommit(Transaction transaction) {
+        if (prepare(transaction)) {
+            return commit(transaction);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean prepare(Transaction transaction) {
+        if (transaction.updates().stream().anyMatch(update ->
+                    isLockedByAnotherTransaction(update.tableName(),
+                                                 update.key(),
+                                                 transaction.id()))) {
+            return false;
+        }
+
+        if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
+            transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
             return true;
         }
         return false;
     }
 
     @Override
-    public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
-        if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
-            return false;
-        } else {
-            updates.stream().forEach(this::doUpdate);
-            return true;
-        }
+    public boolean commit(Transaction transaction) {
+        transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
+        return true;
     }
 
-    private void doUpdate(UpdateOperation<K, V> update) {
-        String tableName = update.tableName();
-        K key = update.key();
-        switch (update.type()) {
-        case PUT:
-            put(tableName, key, update.value());
-            return;
-        case REMOVE:
-            remove(tableName, key);
-            return;
-        case PUT_IF_ABSENT:
-            putIfAbsent(tableName, key, update.value());
-            return;
-        case PUT_IF_VERSION_MATCH:
-            replace(tableName, key, update.currentValue(), update.value());
-            return;
-        case PUT_IF_VALUE_MATCH:
-            replace(tableName, key, update.currentVersion(), update.value());
-            return;
-        case REMOVE_IF_VERSION_MATCH:
-            remove(tableName, key, update.currentVersion());
-            return;
-        case REMOVE_IF_VALUE_MATCH:
-            remove(tableName, key, update.currentValue());
-            return;
-        default:
-            throw new IllegalStateException("Unsupported type: " + update.type());
-        }
+    @Override
+    public boolean rollback(Transaction transaction) {
+        transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
+        return true;
     }
 
-    private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
-        Versioned<V> existingEntry = get(update.tableName(), update.key());
+    private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
+        return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+    }
+
+    private Map<String, Pair<Long, byte[]>> getLockMap(String tableName) {
+        return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+    }
+
+    private boolean isUpdatePossible(DatabaseUpdate update) {
+        Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
         switch (update.type()) {
         case PUT:
         case REMOVE:
@@ -232,20 +266,85 @@
         case PUT_IF_VERSION_MATCH:
             return existingEntry != null && existingEntry.version() == update.currentVersion();
         case PUT_IF_VALUE_MATCH:
-            return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
+            return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
         case REMOVE_IF_VERSION_MATCH:
             return existingEntry == null || existingEntry.version() == update.currentVersion();
         case REMOVE_IF_VALUE_MATCH:
-            return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
+            return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
         default:
             throw new IllegalStateException("Unsupported type: " + update.type());
         }
     }
 
-    private boolean checkEquality(V value1, V value2) {
-        if (value1 instanceof byte[]) {
-            return Arrays.equals((byte[]) value1, (byte[]) value2);
+    private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        Map<String, Pair<Long, byte[]>> lockMap = getLockMap(update.tableName());
+        switch (update.type()) {
+        case PUT:
+        case PUT_IF_ABSENT:
+        case PUT_IF_VERSION_MATCH:
+        case PUT_IF_VALUE_MATCH:
+            lockMap.put(update.key(), Pair.of(transactionId, update.value()));
+            break;
+        case REMOVE:
+        case REMOVE_IF_VERSION_MATCH:
+        case REMOVE_IF_VALUE_MATCH:
+            lockMap.put(update.key(), null);
+            break;
+        default:
+            throw new IllegalStateException("Unsupported type: " + update.type());
         }
-        return value1.equals(value2);
+    }
+
+    private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        String tableName = update.tableName();
+        String key = update.key();
+        Type type = update.type();
+        Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+        if (Objects.equal(transactionId, provisionalUpdate.getLeft()))  {
+            getLockMap(tableName).remove(key);
+        } else {
+            return;
+        }
+
+        switch (type) {
+        case PUT:
+        case PUT_IF_ABSENT:
+        case PUT_IF_VERSION_MATCH:
+        case PUT_IF_VALUE_MATCH:
+            put(tableName, key, provisionalUpdate.getRight());
+            break;
+        case REMOVE:
+        case REMOVE_IF_VERSION_MATCH:
+        case REMOVE_IF_VALUE_MATCH:
+            remove(tableName, key);
+            break;
+        default:
+            break;
+        }
+    }
+
+    private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        String tableName = update.tableName();
+        String key = update.key();
+        Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+        if (provisionalUpdate == null) {
+            return;
+        }
+        if (Objects.equal(transactionId, provisionalUpdate.getLeft()))  {
+            getLockMap(tableName).remove(key);
+        }
+    }
+
+    private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
+        Pair<Long, byte[]> update = getLockMap(tableName).get(key);
+        return update != null && !Objects.equal(transactionId, update.getLeft());
+    }
+
+    private boolean isLockedForUpdates(String tableName, String key) {
+        return getLockMap(tableName).containsKey(key);
+    }
+
+    private boolean areTransactionsInProgress(String tableName) {
+        return !getLockMap(tableName).isEmpty();
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index e6eac62..97d5ef3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -18,6 +18,7 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
+
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 
@@ -46,6 +47,7 @@
         return backingMap.isEmpty();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean contains(Object o) {
         return backingMap.containsKey((E) o);
@@ -71,6 +73,7 @@
         return backingMap.putIfAbsent(e, true) == null;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean remove(Object o) {
         return backingMap.remove((E) o, true);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java
new file mode 100644
index 0000000..2ff7a2d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.List;
+
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A Default transaction implementation.
+ */
+public class DefaultTransaction implements Transaction {
+
+    private final long transactionId;
+    private final List<DatabaseUpdate> updates;
+    private final State state;
+    private final long lastUpdated;
+
+    public DefaultTransaction(long transactionId, List<DatabaseUpdate> updates) {
+        this(transactionId, updates, State.PREPARING, System.currentTimeMillis());
+    }
+
+    private DefaultTransaction(long transactionId, List<DatabaseUpdate> updates, State state, long lastUpdated) {
+        this.transactionId = transactionId;
+        this.updates = ImmutableList.copyOf(updates);
+        this.state = state;
+        this.lastUpdated = lastUpdated;
+    }
+
+    @Override
+    public long id() {
+        return transactionId;
+    }
+
+    @Override
+    public List<DatabaseUpdate> updates() {
+        return updates;
+    }
+
+    @Override
+    public State state() {
+        return state;
+    }
+
+    @Override
+    public Transaction transition(State newState) {
+        return new DefaultTransaction(transactionId, updates, newState, System.currentTimeMillis());
+    }
+
+    @Override
+    public long lastUpdated() {
+        return lastUpdated;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index d21543a..9d13833 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -18,19 +18,14 @@
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Preconditions.*;
 
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionException;
 import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.UpdateOperation;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -40,80 +35,69 @@
  */
 public class DefaultTransactionContext implements TransactionContext {
 
-    private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap();
+    private static final String TX_NOT_OPEN_ERROR = "Transaction Context is not open";
+
+    @SuppressWarnings("rawtypes")
+    private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap();
     private boolean isOpen = false;
     private final Database database;
-    private static final String TX_NOT_OPEN_ERROR = "Transaction is not open";
-    private static final int TRANSACTION_TIMEOUT_MILLIS = 2000;
+    private final long transactionId;
 
-    DefaultTransactionContext(Database database) {
-        this.database = checkNotNull(database, "Database must not be null");
+    public DefaultTransactionContext(Database database, long transactionId) {
+        this.database = checkNotNull(database);
+        this.transactionId = transactionId;
+    }
+
+    @Override
+    public long transactionId() {
+        return transactionId;
     }
 
     @Override
     public void begin() {
+        checkState(!isOpen, "Transaction Context is already open");
         isOpen = true;
     }
 
     @Override
+    public boolean isOpen() {
+        return isOpen;
+    }
+
+    @Override
     @SuppressWarnings("unchecked")
-    public <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName,
+    public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
             Serializer serializer) {
-        checkNotNull(mapName, "map name is null");
-        checkNotNull(serializer, "serializer is null");
         checkState(isOpen, TX_NOT_OPEN_ERROR);
-        if (!txMaps.containsKey(mapName)) {
-            ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, database, serializer);
-            DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
-            txMaps.put(mapName, txMap);
-        }
-        return txMaps.get(mapName);
+        checkNotNull(mapName);
+        checkNotNull(serializer);
+        return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
+                                name,
+                                new DefaultConsistentMap<>(name, database, serializer),
+                                this,
+                                serializer));
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void commit() {
         checkState(isOpen, TX_NOT_OPEN_ERROR);
-        List<UpdateOperation<String, byte[]>> allUpdates =
-                Lists.newLinkedList();
         try {
+            List<DatabaseUpdate> updates = Lists.newLinkedList();
             txMaps.values()
-                .stream()
-                .forEach(m -> {
-                    allUpdates.addAll(m.prepareDatabaseUpdates());
-                });
-
-            if (!complete(database.atomicBatchUpdate(allUpdates))) {
-                throw new TransactionException.OptimisticConcurrencyFailure();
-            }
+                  .forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
+            database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
+        } catch (Exception e) {
+            abort();
+            throw new TransactionException(e);
         } finally {
             isOpen = false;
         }
     }
 
     @Override
-    public void rollback() {
+    public void abort() {
         checkState(isOpen, TX_NOT_OPEN_ERROR);
-        txMaps.values()
-        .stream()
-        .forEach(m -> m.rollback());
+        txMaps.values().forEach(m -> m.rollback());
     }
-
-    @Override
-    public boolean isOpen() {
-        return false;
-    }
-
-    private static <T> T complete(CompletableFuture<T> future) {
-        try {
-            return future.get(TRANSACTION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new TransactionException.Interrupted();
-        } catch (TimeoutException e) {
-            throw new TransactionException.Timeout();
-        } catch (ExecutionException e) {
-            throw new TransactionException(e.getCause());
-        }
-    }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
index c98c336..91151b0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
@@ -16,23 +16,24 @@
 
 package org.onosproject.store.consistent.impl;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
 import java.util.Set;
 
 import org.onlab.util.HexString;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.UpdateOperation;
 import org.onosproject.store.service.Versioned;
 
 import static com.google.common.base.Preconditions.*;
 
+import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -55,6 +56,23 @@
     private final Map<K, V> writeCache = Maps.newConcurrentMap();
     private final Set<K> deleteSet = Sets.newConcurrentHashSet();
 
+    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+    private static final String ERROR_NULL_KEY = "Null key is not allowed";
+
+    private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+            .softValues()
+            .build(new CacheLoader<K, String>() {
+
+                @Override
+                public String load(K key) {
+                    return HexString.toHexString(serializer.encode(key));
+                }
+            });
+
+    protected K dK(String key) {
+        return serializer.decode(HexString.fromHexString(key));
+    }
+
     public DefaultTransactionalMap(
             String name,
             ConsistentMap<K, V> backingMap,
@@ -69,15 +87,15 @@
     @Override
     public V get(K key) {
         checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(key, ERROR_NULL_KEY);
         if (deleteSet.contains(key)) {
             return null;
-        } else if (writeCache.containsKey(key)) {
-            return writeCache.get(key);
+        }
+        V latest = writeCache.get(key);
+        if (latest != null) {
+            return latest;
         } else {
-            if (!readCache.containsKey(key)) {
-                readCache.put(key, backingMap.get(key));
-            }
-            Versioned<V> v = readCache.get(key);
+            Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k));
             return v != null ? v.value() : null;
         }
     }
@@ -85,25 +103,31 @@
     @Override
     public V put(K key, V value) {
         checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        Versioned<V> original = readCache.get(key);
-        V recentUpdate = writeCache.put(key, value);
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        writeCache.put(key, value);
         deleteSet.remove(key);
-        return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
+        return latest;
     }
 
     @Override
     public V remove(K key) {
         checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        Versioned<V> original = readCache.get(key);
-        V recentUpdate = writeCache.remove(key);
-        deleteSet.add(key);
-        return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
+        V latest = get(key);
+        if (latest != null) {
+            writeCache.remove(key);
+            deleteSet.add(key);
+        }
+        return latest;
     }
 
     @Override
     public boolean remove(K key, V value) {
-        V currentValue = get(key);
-        if (value.equals(currentValue)) {
+        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        V latest = get(key);
+        if (Objects.equal(value, latest)) {
             remove(key);
             return true;
         }
@@ -112,8 +136,11 @@
 
     @Override
     public boolean replace(K key, V oldValue, V newValue) {
-        V currentValue = get(key);
-        if (oldValue.equals(currentValue)) {
+        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(oldValue, ERROR_NULL_VALUE);
+        checkNotNull(newValue, ERROR_NULL_VALUE);
+        V latest = get(key);
+        if (Objects.equal(oldValue, latest)) {
             put(key, newValue);
             return true;
         }
@@ -121,70 +148,25 @@
     }
 
     @Override
-    public int size() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return size() == 0;
-    }
-
-    @Override
-    public boolean containsKey(K key) {
-        return get(key) != null;
-    }
-
-    @Override
-    public boolean containsValue(V value) {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void clear() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set<K> keySet() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Collection<V> values() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set<Entry<K, V>> entrySet() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public V putIfAbsent(K key, V value) {
-        V currentValue = get(key);
-        if (currentValue == null) {
+        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        V latest = get(key);
+        if (latest == null) {
             put(key, value);
-            return null;
         }
-        return currentValue;
+        return latest;
     }
 
-    protected List<UpdateOperation<String, byte[]>> prepareDatabaseUpdates() {
-        List<UpdateOperation<K, V>> updates = Lists.newLinkedList();
+    protected List<DatabaseUpdate> prepareDatabaseUpdates() {
+        List<DatabaseUpdate> updates = Lists.newLinkedList();
         deleteSet.forEach(key -> {
             Versioned<V> original = readCache.get(key);
             if (original != null) {
-                updates.add(UpdateOperation.<K, V>newBuilder()
+                updates.add(DatabaseUpdate.newBuilder()
                         .withTableName(name)
-                        .withType(UpdateOperation.Type.REMOVE_IF_VERSION_MATCH)
-                        .withKey(key)
+                        .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                        .withKey(keyCache.getUnchecked(key))
                         .withCurrentVersion(original.version())
                         .build());
             }
@@ -192,44 +174,23 @@
         writeCache.forEach((key, value) -> {
             Versioned<V> original = readCache.get(key);
             if (original == null) {
-                updates.add(UpdateOperation.<K, V>newBuilder()
+                updates.add(DatabaseUpdate.newBuilder()
                         .withTableName(name)
-                        .withType(UpdateOperation.Type.PUT_IF_ABSENT)
-                        .withKey(key)
-                        .withValue(value)
+                        .withType(DatabaseUpdate.Type.PUT_IF_ABSENT)
+                        .withKey(keyCache.getUnchecked(key))
+                        .withValue(serializer.encode(value))
                         .build());
             } else {
-                updates.add(UpdateOperation.<K, V>newBuilder()
+                updates.add(DatabaseUpdate.newBuilder()
                         .withTableName(name)
-                        .withType(UpdateOperation.Type.PUT_IF_VERSION_MATCH)
-                        .withKey(key)
+                        .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey(keyCache.getUnchecked(key))
                         .withCurrentVersion(original.version())
-                        .withValue(value)
+                        .withValue(serializer.encode(value))
                         .build());
             }
         });
-        return updates.stream().map(this::toRawUpdateOperation).collect(Collectors.toList());
-    }
-
-    private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
-
-        UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
-
-        rawUpdate = rawUpdate.withKey(HexString.toHexString(serializer.encode(update.key())))
-            .withCurrentVersion(update.currentVersion())
-            .withType(update.type());
-
-        rawUpdate = rawUpdate.withTableName(update.tableName());
-
-        if (update.value() != null) {
-            rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
-        }
-
-        if (update.currentValue() != null) {
-            rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
-        }
-
-        return rawUpdate.build();
+        return updates;
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index ad049e6..178f49f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -27,7 +27,8 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Lists;
@@ -129,24 +130,27 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).put(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+    public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key);
     }
 
     @Override
-    public CompletableFuture<Void> clear(String tableName) {
+    public CompletableFuture<Result<Void>> clear(String tableName) {
+        AtomicBoolean isLocked = new AtomicBoolean(false);
         checkState(isOpen.get(), DB_NOT_OPEN);
         return CompletableFuture.allOf(partitions
                     .stream()
-                    .map(p -> p.clear(tableName))
-                    .toArray(CompletableFuture[]::new));
+                    .map(p -> p.clear(tableName)
+                            .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
     }
 
     @Override
@@ -183,59 +187,86 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key, version);
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(
+            String tableName, String key, byte[] oldValue, byte[] newValue) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(
+            String tableName, String key, long oldVersion, byte[] newValue) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
     }
 
     @Override
-    public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
-        checkState(isOpen.get(), DB_NOT_OPEN);
-        Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
-        for (UpdateOperation<String, byte[]> update : updates) {
-            Database partition = partitioner.getPartition(update.tableName(), update.key());
-            List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
-            if (partitionUpdates == null) {
-                partitionUpdates = Lists.newArrayList();
-                perPartitionUpdates.put(partition, partitionUpdates);
-            }
-            partitionUpdates.add(update);
-        }
-        if (perPartitionUpdates.size() > 1) {
-            // TODO
-            throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+    public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        if (subTransactions.isEmpty()) {
+            return CompletableFuture.completedFuture(true);
+        } else if (subTransactions.size() == 1) {
+            Entry<Database, Transaction> entry =
+                    subTransactions.entrySet().iterator().next();
+            return entry.getKey().prepareAndCommit(entry.getValue());
         } else {
-            Entry<Database, List<UpdateOperation<String, byte[]>>> only =
-                    perPartitionUpdates.entrySet().iterator().next();
-            return only.getKey().atomicBatchUpdate(only.getValue());
+            return new TransactionManager(this).execute(transaction);
         }
     }
 
     @Override
+    public CompletableFuture<Boolean> prepare(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        AtomicBoolean status = new AtomicBoolean(true);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry
+                        .getKey()
+                        .prepare(entry.getValue())
+                        .thenApply(v -> status.compareAndSet(true, v)))
+                .toArray(CompletableFuture[]::new))
+            .thenApply(v -> status.get());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> commit(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry.getKey().commit(entry.getValue()))
+                .toArray(CompletableFuture[]::new))
+        .thenApply(v -> true);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> rollback(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry.getKey().rollback(entry.getValue()))
+                .toArray(CompletableFuture[]::new))
+            .thenApply(v -> true);
+    }
+
+    @Override
     public CompletableFuture<Database> open() {
         return CompletableFuture.allOf(partitions
                     .stream()
@@ -243,7 +274,8 @@
                     .toArray(CompletableFuture[]::new))
                 .thenApply(v -> {
                     isOpen.set(true);
-                    return this; });
+                    return this;
+                });
     }
 
     @Override
@@ -279,4 +311,19 @@
     public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
         throw new UnsupportedOperationException();
     }
-}
\ No newline at end of file
+
+    private Map<Database, Transaction> createSubTransactions(
+            Transaction transaction) {
+        Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
+        for (DatabaseUpdate update : transaction.updates()) {
+            Database partition = partitioner.getPartition(update.tableName(), update.key());
+            List<DatabaseUpdate> partitionUpdates =
+                    perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
+            partitionUpdates.add(update);
+        }
+        Map<Database, Transaction> subTransactions = Maps.newHashMap();
+        perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
+
+        return subTransactions;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java
new file mode 100644
index 0000000..548174a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Result of a database update operation.
+ *
+ * @param <V> return value type
+ */
+public final class Result<V> {
+
+    public enum Status {
+        /**
+         * Indicates a successful update.
+         */
+        OK,
+
+        /**
+         * Indicates a failure due to underlying state being locked by another transaction.
+         */
+        LOCKED
+    }
+
+    private final Status status;
+    private final V value;
+
+    /**
+     * Creates a new Result instance with the specified value with status set to Status.OK.
+     *
+     * @param <V> result value type
+     * @param value result value
+     * @return Result instance
+     */
+    public static <V> Result<V> ok(V value) {
+        return new Result<>(value, Status.OK);
+    }
+
+    /**
+     * Creates a new Result instance with status set to Status.LOCKED.
+     *
+     * @param <V> result value type
+     * @return Result instance
+     */
+    public static <V> Result<V> locked() {
+        return new Result<>(null, Status.LOCKED);
+    }
+
+    private Result(V value, Status status) {
+        this.value = value;
+        this.status = status;
+    }
+
+    /**
+     * Returns true if this result indicates a successful execution i.e status is Status.OK.
+     *
+     * @return true if successful, false otherwise
+     */
+    public boolean success() {
+        return status == Status.OK;
+    }
+
+    /**
+     * Returns the status of database update operation.
+     * @return database update status
+     */
+    public Status status() {
+        return status;
+    }
+
+    /**
+     * Returns the return value for the update.
+     * @return value returned by database update. If the status is another
+     * other than Status.OK, this returns a null
+     */
+    public V value() {
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
index adc5477..e8daeff 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
@@ -36,4 +36,4 @@
     public Database getPartition(String tableName, String key) {
         return partitions.get(hash(tableName) % partitions.size());
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
new file mode 100644
index 0000000..865934b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.Transaction.State;
+
+/**
+ * Agent that runs the two phase commit protocol.
+ */
+public class TransactionManager {
+
+    private final Database database;
+    private final AsyncConsistentMap<Long, Transaction> transactions;
+
+    private final Serializer serializer = new Serializer() {
+
+        private KryoNamespace kryo = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.BASIC)
+                .nextId(KryoNamespace.FLOATING_ID)
+                .register(Versioned.class)
+                .register(DatabaseUpdate.class)
+                .register(DatabaseUpdate.Type.class)
+                .register(DefaultTransaction.class)
+                .register(Transaction.State.class)
+                .register(Pair.class)
+                .register(ImmutablePair.class)
+                .build();
+
+        @Override
+        public <T> byte[] encode(T object) {
+            return kryo.serialize(object);
+        }
+
+        @Override
+        public <T> T decode(byte[] bytes) {
+            return kryo.deserialize(bytes);
+        }
+    };
+
+    /**
+     * Constructs a new TransactionManager for the specified database instance.
+     *
+     * @param database database
+     */
+    public TransactionManager(Database database) {
+        this.database = checkNotNull(database, "database cannot be null");
+        this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer);
+    }
+
+    /**
+     * Executes the specified transaction by employing a two phase commit protocol.
+     *
+     * @param transaction transaction to commit
+     * @return transaction result. Result value true indicates a successful commit, false
+     * indicates abort
+     */
+    public CompletableFuture<Boolean> execute(Transaction transaction) {
+        // clean up if this transaction in already in a terminal state.
+        if (transaction.state() == Transaction.State.COMMITTED ||
+                transaction.state() == Transaction.State.ROLLEDBACK) {
+            return transactions.remove(transaction.id()).thenApply(v -> true);
+        } else if (transaction.state() == Transaction.State.COMMITTING) {
+            return commit(transaction);
+        } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
+            return rollback(transaction);
+        } else {
+            return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
+        }
+    }
+
+
+    /**
+     * Returns all transactions in the system.
+     *
+     * @return future for a collection of transactions
+     */
+    public CompletableFuture<Collection<Transaction>> getTransactions() {
+        return transactions.values().thenApply(c -> {
+            Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
+            return txns;
+        });
+    }
+
+    private CompletableFuture<Boolean> prepare(Transaction transaction) {
+        return transactions.put(transaction.id(), transaction)
+                .thenCompose(v -> database.prepare(transaction))
+                .thenCompose(status -> transactions.put(
+                            transaction.id(),
+                            transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
+                .thenApply(v -> status));
+    }
+
+    private CompletableFuture<Boolean> commit(Transaction transaction) {
+        return database.commit(transaction)
+                .thenCompose(v -> transactions.put(
+                            transaction.id(),
+                            transaction.transition(Transaction.State.COMMITTED)))
+                .thenApply(v -> true);
+    }
+
+    private CompletableFuture<Boolean> rollback(Transaction transaction) {
+        return database.rollback(transaction)
+                .thenCompose(v -> transactions.put(
+                            transaction.id(),
+                            transaction.transition(Transaction.State.ROLLEDBACK)))
+                .thenApply(v -> true);
+    }
+}
\ No newline at end of file