Added distributed transaction support through a two phase commit protocol

Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 010276a..363f330 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -34,6 +34,7 @@
 import net.kuujo.copycat.protocol.Protocol;
 import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
 
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,6 +42,7 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.store.cluster.impl.DistributedClusterStore;
 import org.onosproject.store.cluster.impl.NodeInfo;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -53,11 +55,13 @@
 import org.onosproject.store.service.SetBuilder;
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.TransactionContext;
 import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -92,6 +96,9 @@
     private PartitionedDatabase partitionedDatabase;
     private Database inMemoryDatabase;
 
+    private TransactionManager transactionManager;
+    private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
@@ -188,6 +195,7 @@
             Thread.currentThread().interrupt();
             log.warn("Failed to complete database initialization.");
         }
+        transactionManager = new TransactionManager(partitionedDatabase);
         log.info("Started");
     }
 
@@ -218,7 +226,7 @@
 
     @Override
     public TransactionContext createTransactionContext() {
-        return new DefaultTransactionContext(partitionedDatabase);
+        return new DefaultTransactionContext(partitionedDatabase, transactionIdGenerator.getNewId());
     }
 
     @Override
@@ -331,6 +339,11 @@
             .collect(Collectors.toList());
     }
 
+    @Override
+    public Collection<Transaction> getTransactions() {
+        return complete(transactionManager.getTransactions());
+    }
+
     private static <T> T complete(CompletableFuture<T> future) {
         try {
             return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -343,4 +356,9 @@
             throw new ConsistentMapException(e.getCause());
         }
     }
+
+    @Override
+    public void redriveTransactions() {
+        getTransactions().stream().forEach(transactionManager::execute);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 3ea06fb..3578525 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -17,12 +17,11 @@
 package org.onosproject.store.consistent.impl;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -87,7 +86,7 @@
    * @param value The value to set.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
+  CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value);
 
   /**
    * Removes a value from the table.
@@ -96,7 +95,7 @@
    * @param key The key to remove.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Versioned<V>> remove(String tableName, K key);
+  CompletableFuture<Result<Versioned<V>>> remove(String tableName, K key);
 
   /**
    * Clears the table.
@@ -104,7 +103,7 @@
    * @param tableName table name
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Void> clear(String tableName);
+  CompletableFuture<Result<Void>> clear(String tableName);
 
   /**
    * Gets a set of keys in the table.
@@ -138,7 +137,7 @@
    * @param value The value to set if the given key does not exist.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
+  CompletableFuture<Result<Versioned<V>>> putIfAbsent(String tableName, K key, V value);
 
   /**
    * Removes a key and if the existing value for that key matches the specified value.
@@ -148,7 +147,7 @@
    * @param value The value to remove.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Boolean> remove(String tableName, K key, V value);
+  CompletableFuture<Result<Boolean>> remove(String tableName, K key, V value);
 
   /**
    * Removes a key and if the existing version for that key matches the specified version.
@@ -158,7 +157,7 @@
    * @param version The expected version.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Boolean> remove(String tableName, K key, long version);
+  CompletableFuture<Result<Boolean>> remove(String tableName, K key, long version);
 
   /**
    * Replaces the entry for the specified key only if currently mapped to the specified value.
@@ -169,7 +168,7 @@
    * @param newValue The value with which to replace the given key and value.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
+  CompletableFuture<Result<Boolean>> replace(String tableName, K key, V oldValue, V newValue);
 
   /**
    * Replaces the entry for the specified key only if currently mapped to the specified version.
@@ -180,14 +179,42 @@
    * @param newValue The value with which to replace the given key and version.
    * @return A completable future to be completed with the result once complete.
    */
-  CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
+  CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
 
   /**
-   * Perform a atomic batch update operation i.e. either all operations in batch succeed or
-   * none do and no state changes are made.
+   * Prepare and commit the specified transaction.
    *
-   * @param updates list of updates to apply atomically.
-   * @return A completable future to be completed with the result once complete.
+   * @param transaction transaction to commit (after preparation)
+   * @return A completable future to be completed with the result once complete
    */
-  CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
+  CompletableFuture<Boolean> prepareAndCommit(Transaction transaction);
+
+  /**
+   * Prepare the specified transaction for commit. A successful prepare implies
+   * all the affected resources are locked thus ensuring no concurrent updates can interfere.
+   *
+   * @param transaction transaction to prepare (for commit)
+   * @return A completable future to be completed with the result once complete. The future is completed
+   * with true if the transaction is successfully prepared i.e. all pre-conditions are met and
+   * applicable resources locked.
+   */
+  CompletableFuture<Boolean> prepare(Transaction transaction);
+
+  /**
+   * Commit the specified transaction. A successful commit implies
+   * all the updates are applied, are now durable and are now visible externally.
+   *
+   * @param transaction transaction to commit
+   * @return A completable future to be completed with the result once complete
+   */
+  CompletableFuture<Boolean> commit(Transaction transaction);
+
+  /**
+   * Rollback the specified transaction. A successful rollback implies
+   * all previously acquired locks for the affected resources are released.
+   *
+   * @param transaction transaction to rollback
+   * @return A completable future to be completed with the result once complete
+   */
+  CompletableFuture<Boolean> rollback(Transaction transaction);
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index 237c85b..f6fa6d1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -23,6 +23,8 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 import net.kuujo.copycat.cluster.internal.MemberInfo;
@@ -63,8 +65,14 @@
     private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
             .nextId(KryoNamespace.FLOATING_ID)
             .register(Versioned.class)
+            .register(DatabaseUpdate.class)
+            .register(DatabaseUpdate.Type.class)
             .register(Pair.class)
             .register(ImmutablePair.class)
+            .register(Result.class)
+            .register(Result.Status.class)
+            .register(DefaultTransaction.class)
+            .register(Transaction.State.class)
             .build();
 
     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 097261d..6685dde 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -17,11 +17,10 @@
 package org.onosproject.store.consistent.impl;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 import net.kuujo.copycat.state.Command;
@@ -62,13 +61,13 @@
   Versioned<V> get(String tableName, K key);
 
   @Command
-  Versioned<V> put(String tableName, K key, V value);
+  Result<Versioned<V>> put(String tableName, K key, V value);
 
   @Command
-  Versioned<V> remove(String tableName, K key);
+  Result<Versioned<V>> remove(String tableName, K key);
 
   @Command
-  void clear(String tableName);
+  Result<Void> clear(String tableName);
 
   @Query
   Set<K> keySet(String tableName);
@@ -80,20 +79,29 @@
   Set<Entry<K, Versioned<V>>> entrySet(String tableName);
 
   @Command
-  Versioned<V> putIfAbsent(String tableName, K key, V value);
+  Result<Versioned<V>> putIfAbsent(String tableName, K key, V value);
 
   @Command
-  boolean remove(String tableName, K key, V value);
+  Result<Boolean> remove(String tableName, K key, V value);
 
   @Command
-  boolean remove(String tableName, K key, long version);
+  Result<Boolean> remove(String tableName, K key, long version);
 
   @Command
-  boolean replace(String tableName, K key, V oldValue, V newValue);
+  Result<Boolean> replace(String tableName, K key, V oldValue, V newValue);
 
   @Command
-  boolean replace(String tableName, K key, long oldVersion, V newValue);
+  Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
 
   @Command
-  boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+  boolean prepareAndCommit(Transaction transaction);
+
+  @Command
+  boolean prepare(Transaction transaction);
+
+  @Command
+  boolean commit(Transaction transaction);
+
+  @Command
+  boolean rollback(Transaction transaction);
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index c86a6ea..09f612e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -28,6 +28,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.HexString;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
 
@@ -108,6 +109,7 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
         return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
+                .thenApply(this::unwrapResult)
                 .thenApply(v -> v != null
                 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
@@ -116,13 +118,14 @@
     public CompletableFuture<Versioned<V>> remove(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
         return database.remove(name, keyCache.getUnchecked(key))
+                .thenApply(this::unwrapResult)
                 .thenApply(v -> v != null
                 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return database.clear(name);
+        return database.clear(name).thenApply(this::unwrapResult);
     }
 
     @Override
@@ -154,23 +157,27 @@
     public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return database.putIfAbsent(
-                name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v ->
-                v != null ?
-                new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+        return database.putIfAbsent(name,
+                                    keyCache.getUnchecked(key),
+                                    serializer.encode(value))
+               .thenApply(this::unwrapResult)
+               .thenApply(v -> v != null ?
+                       new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
 
     @Override
     public CompletableFuture<Boolean> remove(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
+        return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
+                .thenApply(this::unwrapResult);
     }
 
     @Override
     public CompletableFuture<Boolean> remove(K key, long version) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return database.remove(name, keyCache.getUnchecked(key), version);
+        return database.remove(name, keyCache.getUnchecked(key), version)
+                .thenApply(this::unwrapResult);
 
     }
 
@@ -179,14 +186,16 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(newValue, ERROR_NULL_VALUE);
         byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
-        return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
+        return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
+                .thenApply(this::unwrapResult);
     }
 
     @Override
     public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(newValue, ERROR_NULL_VALUE);
-        return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
+        return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue))
+                .thenApply(this::unwrapResult);
     }
 
     private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
@@ -197,4 +206,14 @@
                         e.getValue().version(),
                         e.getValue().creationTime()));
     }
+
+    private <T> T unwrapResult(Result<T> result) {
+        if (result.status() == Result.Status.LOCKED) {
+            throw new ConsistentMapException.ConcurrentModification();
+        } else if (result.success()) {
+            return result.value();
+        } else {
+            throw new IllegalStateException("Must not be here");
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index c14e57c..8ed7670 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -23,13 +23,12 @@
 import net.kuujo.copycat.util.concurrent.Futures;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -39,7 +38,7 @@
     private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
     private DatabaseProxy<String, byte[]> proxy;
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public DefaultDatabase(ResourceContext context) {
         super(context);
         this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
@@ -91,17 +90,17 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
         return checkOpen(() -> proxy.put(tableName, key, value));
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+    public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
         return checkOpen(() -> proxy.remove(tableName, key));
     }
 
     @Override
-    public CompletableFuture<Void> clear(String tableName) {
+    public CompletableFuture<Result<Void>> clear(String tableName) {
         return checkOpen(() -> proxy.clear(tableName));
     }
 
@@ -121,33 +120,48 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
         return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
         return checkOpen(() -> proxy.remove(tableName, key, value));
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
         return checkOpen(() -> proxy.remove(tableName, key, version));
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
         return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(String tableName, String key, long oldVersion, byte[] newValue) {
         return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
     }
 
     @Override
-    public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
-        return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+    public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+        return checkOpen(() -> proxy.prepareAndCommit(transaction));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(Transaction transaction) {
+        return checkOpen(() -> proxy.prepare(transaction));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> commit(Transaction transaction) {
+        return checkOpen(() -> proxy.commit(transaction));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> rollback(Transaction transaction) {
+        return checkOpen(() -> proxy.rollback(transaction));
     }
 
     @Override
@@ -180,4 +194,4 @@
         }
         return false;
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index e63a3d8..c190a28 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,43 +18,58 @@
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DatabaseUpdate.Type;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 
 import net.kuujo.copycat.state.Initializer;
 import net.kuujo.copycat.state.StateContext;
 
 /**
  * Default database state.
- *
- * @param <K> key type
- * @param <V> value type
  */
-public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
-
+public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
     private Long nextVersion;
-    private Map<String, Map<K, Versioned<V>>> tables;
+    private Map<String, Map<String, Versioned<byte[]>>> tables;
+
+    /**
+     * This locks map has a structure similar to the "tables" map above and
+     * holds all the provisional updates made during a transaction's prepare phase.
+     * The entry value is represented as the tuple: (transactionId, newValue)
+     * If newValue == null that signifies this update is attempting to
+     * delete the existing value.
+     * This map also serves as a lock on the entries that are being updated.
+     * The presence of a entry in this map indicates that element is
+     * participating in a transaction and is currently locked for updates.
+     */
+    private Map<String, Map<String, Pair<Long, byte[]>>> locks;
 
     @Initializer
     @Override
-    public void init(StateContext<DatabaseState<K, V>> context) {
+    public void init(StateContext<DatabaseState<String, byte[]>> context) {
         tables = context.get("tables");
         if (tables == null) {
-            tables = new HashMap<>();
+            tables = Maps.newConcurrentMap();
             context.put("tables", tables);
         }
+        locks = context.get("locks");
+        if (locks == null) {
+            locks = Maps.newConcurrentMap();
+            context.put("locks", locks);
+        }
         nextVersion = context.get("nextVersion");
         if (nextVersion == null) {
             nextVersion = new Long(0);
@@ -62,15 +77,6 @@
         }
     }
 
-    private Map<K, Versioned<V>> getTableMap(String tableName) {
-        Map<K, Versioned<V>> table = tables.get(tableName);
-        if (table == null) {
-            table = new HashMap<>();
-            tables.put(tableName, table);
-        }
-        return table;
-    }
-
     @Override
     public Set<String> tableNames() {
         return new HashSet<>(tables.keySet());
@@ -87,47 +93,55 @@
     }
 
     @Override
-    public boolean containsKey(String tableName, K key) {
+    public boolean containsKey(String tableName, String key) {
         return getTableMap(tableName).containsKey(key);
     }
 
     @Override
-    public boolean containsValue(String tableName, V value) {
-        return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+    public boolean containsValue(String tableName, byte[] value) {
+        return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
     }
 
     @Override
-    public Versioned<V> get(String tableName, K key) {
+    public Versioned<byte[]> get(String tableName, String key) {
         return getTableMap(tableName).get(key);
     }
 
     @Override
-    public Versioned<V> put(String tableName, K key, V value) {
-        return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+    public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+        return isLockedForUpdates(tableName, key)
+                ? Result.locked()
+                : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
     }
 
     @Override
-    public Versioned<V> remove(String tableName, K key) {
-        return getTableMap(tableName).remove(key);
+    public Result<Versioned<byte[]>> remove(String tableName, String key) {
+        return isLockedForUpdates(tableName, key)
+                ? Result.locked()
+                : Result.ok(getTableMap(tableName).remove(key));
     }
 
     @Override
-    public void clear(String tableName) {
+    public Result<Void> clear(String tableName) {
+        if (areTransactionsInProgress(tableName)) {
+            return Result.locked();
+        }
         getTableMap(tableName).clear();
+        return Result.ok(null);
     }
 
     @Override
-    public Set<K> keySet(String tableName) {
+    public Set<String> keySet(String tableName) {
         return ImmutableSet.copyOf(getTableMap(tableName).keySet());
     }
 
     @Override
-    public Collection<Versioned<V>> values(String tableName) {
+    public Collection<Versioned<byte[]>> values(String tableName) {
         return ImmutableList.copyOf(getTableMap(tableName).values());
     }
 
     @Override
-    public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+    public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
         return ImmutableSet.copyOf(getTableMap(tableName)
                 .entrySet()
                 .stream()
@@ -136,93 +150,113 @@
     }
 
     @Override
-    public Versioned<V> putIfAbsent(String tableName, K key, V value) {
-        Versioned<V> existingValue = getTableMap(tableName).get(key);
-        return existingValue != null ? existingValue : put(tableName, key, value);
-    }
-
-    @Override
-    public boolean remove(String tableName, K key, V value) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && checkEquality(existing.value(), value)) {
-            getTableMap(tableName).remove(key);
-            return true;
+    public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
         }
-        return false;
+        Versioned<byte[]> existingValue = get(tableName, key);
+        Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
+        return Result.ok(currentValue);
     }
 
     @Override
-    public boolean remove(String tableName, K key, long version) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
+    public Result<Boolean> remove(String tableName, String key, byte[] value) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
+        if (existing != null && Arrays.equals(existing.value(), value)) {
+            getTableMap(tableName).remove(key);
+            return Result.ok(true);
+        }
+        return Result.ok(false);
+    }
+
+    @Override
+    public Result<Boolean> remove(String tableName, String key, long version) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
         if (existing != null && existing.version() == version) {
             remove(tableName, key);
-            return true;
+            return Result.ok(true);
         }
-        return false;
+        return Result.ok(false);
     }
 
     @Override
-    public boolean replace(String tableName, K key, V oldValue, V newValue) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && checkEquality(existing.value(), oldValue)) {
+    public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
+        if (existing != null && Arrays.equals(existing.value(), oldValue)) {
             put(tableName, key, newValue);
-            return true;
+            return Result.ok(true);
         }
-        return false;
+        return Result.ok(false);
     }
 
     @Override
-    public boolean replace(String tableName, K key, long oldVersion, V newValue) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
+    public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
         if (existing != null && existing.version() == oldVersion) {
             put(tableName, key, newValue);
+            return Result.ok(true);
+        }
+        return Result.ok(false);
+    }
+
+    @Override
+    public boolean prepareAndCommit(Transaction transaction) {
+        if (prepare(transaction)) {
+            return commit(transaction);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean prepare(Transaction transaction) {
+        if (transaction.updates().stream().anyMatch(update ->
+                    isLockedByAnotherTransaction(update.tableName(),
+                                                 update.key(),
+                                                 transaction.id()))) {
+            return false;
+        }
+
+        if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
+            transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
             return true;
         }
         return false;
     }
 
     @Override
-    public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
-        if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
-            return false;
-        } else {
-            updates.stream().forEach(this::doUpdate);
-            return true;
-        }
+    public boolean commit(Transaction transaction) {
+        transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
+        return true;
     }
 
-    private void doUpdate(UpdateOperation<K, V> update) {
-        String tableName = update.tableName();
-        K key = update.key();
-        switch (update.type()) {
-        case PUT:
-            put(tableName, key, update.value());
-            return;
-        case REMOVE:
-            remove(tableName, key);
-            return;
-        case PUT_IF_ABSENT:
-            putIfAbsent(tableName, key, update.value());
-            return;
-        case PUT_IF_VERSION_MATCH:
-            replace(tableName, key, update.currentValue(), update.value());
-            return;
-        case PUT_IF_VALUE_MATCH:
-            replace(tableName, key, update.currentVersion(), update.value());
-            return;
-        case REMOVE_IF_VERSION_MATCH:
-            remove(tableName, key, update.currentVersion());
-            return;
-        case REMOVE_IF_VALUE_MATCH:
-            remove(tableName, key, update.currentValue());
-            return;
-        default:
-            throw new IllegalStateException("Unsupported type: " + update.type());
-        }
+    @Override
+    public boolean rollback(Transaction transaction) {
+        transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
+        return true;
     }
 
-    private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
-        Versioned<V> existingEntry = get(update.tableName(), update.key());
+    private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
+        return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+    }
+
+    private Map<String, Pair<Long, byte[]>> getLockMap(String tableName) {
+        return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+    }
+
+    private boolean isUpdatePossible(DatabaseUpdate update) {
+        Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
         switch (update.type()) {
         case PUT:
         case REMOVE:
@@ -232,20 +266,85 @@
         case PUT_IF_VERSION_MATCH:
             return existingEntry != null && existingEntry.version() == update.currentVersion();
         case PUT_IF_VALUE_MATCH:
-            return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
+            return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
         case REMOVE_IF_VERSION_MATCH:
             return existingEntry == null || existingEntry.version() == update.currentVersion();
         case REMOVE_IF_VALUE_MATCH:
-            return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
+            return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
         default:
             throw new IllegalStateException("Unsupported type: " + update.type());
         }
     }
 
-    private boolean checkEquality(V value1, V value2) {
-        if (value1 instanceof byte[]) {
-            return Arrays.equals((byte[]) value1, (byte[]) value2);
+    private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        Map<String, Pair<Long, byte[]>> lockMap = getLockMap(update.tableName());
+        switch (update.type()) {
+        case PUT:
+        case PUT_IF_ABSENT:
+        case PUT_IF_VERSION_MATCH:
+        case PUT_IF_VALUE_MATCH:
+            lockMap.put(update.key(), Pair.of(transactionId, update.value()));
+            break;
+        case REMOVE:
+        case REMOVE_IF_VERSION_MATCH:
+        case REMOVE_IF_VALUE_MATCH:
+            lockMap.put(update.key(), null);
+            break;
+        default:
+            throw new IllegalStateException("Unsupported type: " + update.type());
         }
-        return value1.equals(value2);
+    }
+
+    private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        String tableName = update.tableName();
+        String key = update.key();
+        Type type = update.type();
+        Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+        if (Objects.equal(transactionId, provisionalUpdate.getLeft()))  {
+            getLockMap(tableName).remove(key);
+        } else {
+            return;
+        }
+
+        switch (type) {
+        case PUT:
+        case PUT_IF_ABSENT:
+        case PUT_IF_VERSION_MATCH:
+        case PUT_IF_VALUE_MATCH:
+            put(tableName, key, provisionalUpdate.getRight());
+            break;
+        case REMOVE:
+        case REMOVE_IF_VERSION_MATCH:
+        case REMOVE_IF_VALUE_MATCH:
+            remove(tableName, key);
+            break;
+        default:
+            break;
+        }
+    }
+
+    private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        String tableName = update.tableName();
+        String key = update.key();
+        Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+        if (provisionalUpdate == null) {
+            return;
+        }
+        if (Objects.equal(transactionId, provisionalUpdate.getLeft()))  {
+            getLockMap(tableName).remove(key);
+        }
+    }
+
+    private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
+        Pair<Long, byte[]> update = getLockMap(tableName).get(key);
+        return update != null && !Objects.equal(transactionId, update.getLeft());
+    }
+
+    private boolean isLockedForUpdates(String tableName, String key) {
+        return getLockMap(tableName).containsKey(key);
+    }
+
+    private boolean areTransactionsInProgress(String tableName) {
+        return !getLockMap(tableName).isEmpty();
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index e6eac62..97d5ef3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -18,6 +18,7 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
+
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 
@@ -46,6 +47,7 @@
         return backingMap.isEmpty();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean contains(Object o) {
         return backingMap.containsKey((E) o);
@@ -71,6 +73,7 @@
         return backingMap.putIfAbsent(e, true) == null;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean remove(Object o) {
         return backingMap.remove((E) o, true);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java
new file mode 100644
index 0000000..2ff7a2d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.List;
+
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A Default transaction implementation.
+ */
+public class DefaultTransaction implements Transaction {
+
+    private final long transactionId;
+    private final List<DatabaseUpdate> updates;
+    private final State state;
+    private final long lastUpdated;
+
+    public DefaultTransaction(long transactionId, List<DatabaseUpdate> updates) {
+        this(transactionId, updates, State.PREPARING, System.currentTimeMillis());
+    }
+
+    private DefaultTransaction(long transactionId, List<DatabaseUpdate> updates, State state, long lastUpdated) {
+        this.transactionId = transactionId;
+        this.updates = ImmutableList.copyOf(updates);
+        this.state = state;
+        this.lastUpdated = lastUpdated;
+    }
+
+    @Override
+    public long id() {
+        return transactionId;
+    }
+
+    @Override
+    public List<DatabaseUpdate> updates() {
+        return updates;
+    }
+
+    @Override
+    public State state() {
+        return state;
+    }
+
+    @Override
+    public Transaction transition(State newState) {
+        return new DefaultTransaction(transactionId, updates, newState, System.currentTimeMillis());
+    }
+
+    @Override
+    public long lastUpdated() {
+        return lastUpdated;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index d21543a..9d13833 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -18,19 +18,14 @@
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Preconditions.*;
 
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionException;
 import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.UpdateOperation;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -40,80 +35,69 @@
  */
 public class DefaultTransactionContext implements TransactionContext {
 
-    private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap();
+    private static final String TX_NOT_OPEN_ERROR = "Transaction Context is not open";
+
+    @SuppressWarnings("rawtypes")
+    private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap();
     private boolean isOpen = false;
     private final Database database;
-    private static final String TX_NOT_OPEN_ERROR = "Transaction is not open";
-    private static final int TRANSACTION_TIMEOUT_MILLIS = 2000;
+    private final long transactionId;
 
-    DefaultTransactionContext(Database database) {
-        this.database = checkNotNull(database, "Database must not be null");
+    public DefaultTransactionContext(Database database, long transactionId) {
+        this.database = checkNotNull(database);
+        this.transactionId = transactionId;
+    }
+
+    @Override
+    public long transactionId() {
+        return transactionId;
     }
 
     @Override
     public void begin() {
+        checkState(!isOpen, "Transaction Context is already open");
         isOpen = true;
     }
 
     @Override
+    public boolean isOpen() {
+        return isOpen;
+    }
+
+    @Override
     @SuppressWarnings("unchecked")
-    public <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName,
+    public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
             Serializer serializer) {
-        checkNotNull(mapName, "map name is null");
-        checkNotNull(serializer, "serializer is null");
         checkState(isOpen, TX_NOT_OPEN_ERROR);
-        if (!txMaps.containsKey(mapName)) {
-            ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, database, serializer);
-            DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
-            txMaps.put(mapName, txMap);
-        }
-        return txMaps.get(mapName);
+        checkNotNull(mapName);
+        checkNotNull(serializer);
+        return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
+                                name,
+                                new DefaultConsistentMap<>(name, database, serializer),
+                                this,
+                                serializer));
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void commit() {
         checkState(isOpen, TX_NOT_OPEN_ERROR);
-        List<UpdateOperation<String, byte[]>> allUpdates =
-                Lists.newLinkedList();
         try {
+            List<DatabaseUpdate> updates = Lists.newLinkedList();
             txMaps.values()
-                .stream()
-                .forEach(m -> {
-                    allUpdates.addAll(m.prepareDatabaseUpdates());
-                });
-
-            if (!complete(database.atomicBatchUpdate(allUpdates))) {
-                throw new TransactionException.OptimisticConcurrencyFailure();
-            }
+                  .forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
+            database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
+        } catch (Exception e) {
+            abort();
+            throw new TransactionException(e);
         } finally {
             isOpen = false;
         }
     }
 
     @Override
-    public void rollback() {
+    public void abort() {
         checkState(isOpen, TX_NOT_OPEN_ERROR);
-        txMaps.values()
-        .stream()
-        .forEach(m -> m.rollback());
+        txMaps.values().forEach(m -> m.rollback());
     }
-
-    @Override
-    public boolean isOpen() {
-        return false;
-    }
-
-    private static <T> T complete(CompletableFuture<T> future) {
-        try {
-            return future.get(TRANSACTION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new TransactionException.Interrupted();
-        } catch (TimeoutException e) {
-            throw new TransactionException.Timeout();
-        } catch (ExecutionException e) {
-            throw new TransactionException(e.getCause());
-        }
-    }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
index c98c336..91151b0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
@@ -16,23 +16,24 @@
 
 package org.onosproject.store.consistent.impl;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
 import java.util.Set;
 
 import org.onlab.util.HexString;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.UpdateOperation;
 import org.onosproject.store.service.Versioned;
 
 import static com.google.common.base.Preconditions.*;
 
+import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -55,6 +56,23 @@
     private final Map<K, V> writeCache = Maps.newConcurrentMap();
     private final Set<K> deleteSet = Sets.newConcurrentHashSet();
 
+    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+    private static final String ERROR_NULL_KEY = "Null key is not allowed";
+
+    private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+            .softValues()
+            .build(new CacheLoader<K, String>() {
+
+                @Override
+                public String load(K key) {
+                    return HexString.toHexString(serializer.encode(key));
+                }
+            });
+
+    protected K dK(String key) {
+        return serializer.decode(HexString.fromHexString(key));
+    }
+
     public DefaultTransactionalMap(
             String name,
             ConsistentMap<K, V> backingMap,
@@ -69,15 +87,15 @@
     @Override
     public V get(K key) {
         checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(key, ERROR_NULL_KEY);
         if (deleteSet.contains(key)) {
             return null;
-        } else if (writeCache.containsKey(key)) {
-            return writeCache.get(key);
+        }
+        V latest = writeCache.get(key);
+        if (latest != null) {
+            return latest;
         } else {
-            if (!readCache.containsKey(key)) {
-                readCache.put(key, backingMap.get(key));
-            }
-            Versioned<V> v = readCache.get(key);
+            Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k));
             return v != null ? v.value() : null;
         }
     }
@@ -85,25 +103,31 @@
     @Override
     public V put(K key, V value) {
         checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        Versioned<V> original = readCache.get(key);
-        V recentUpdate = writeCache.put(key, value);
+        checkNotNull(value, ERROR_NULL_VALUE);
+
+        V latest = get(key);
+        writeCache.put(key, value);
         deleteSet.remove(key);
-        return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
+        return latest;
     }
 
     @Override
     public V remove(K key) {
         checkState(txContext.isOpen(), TX_CLOSED_ERROR);
-        Versioned<V> original = readCache.get(key);
-        V recentUpdate = writeCache.remove(key);
-        deleteSet.add(key);
-        return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
+        V latest = get(key);
+        if (latest != null) {
+            writeCache.remove(key);
+            deleteSet.add(key);
+        }
+        return latest;
     }
 
     @Override
     public boolean remove(K key, V value) {
-        V currentValue = get(key);
-        if (value.equals(currentValue)) {
+        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        V latest = get(key);
+        if (Objects.equal(value, latest)) {
             remove(key);
             return true;
         }
@@ -112,8 +136,11 @@
 
     @Override
     public boolean replace(K key, V oldValue, V newValue) {
-        V currentValue = get(key);
-        if (oldValue.equals(currentValue)) {
+        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(oldValue, ERROR_NULL_VALUE);
+        checkNotNull(newValue, ERROR_NULL_VALUE);
+        V latest = get(key);
+        if (Objects.equal(oldValue, latest)) {
             put(key, newValue);
             return true;
         }
@@ -121,70 +148,25 @@
     }
 
     @Override
-    public int size() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return size() == 0;
-    }
-
-    @Override
-    public boolean containsKey(K key) {
-        return get(key) != null;
-    }
-
-    @Override
-    public boolean containsValue(V value) {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void clear() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set<K> keySet() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Collection<V> values() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set<Entry<K, V>> entrySet() {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public V putIfAbsent(K key, V value) {
-        V currentValue = get(key);
-        if (currentValue == null) {
+        checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        V latest = get(key);
+        if (latest == null) {
             put(key, value);
-            return null;
         }
-        return currentValue;
+        return latest;
     }
 
-    protected List<UpdateOperation<String, byte[]>> prepareDatabaseUpdates() {
-        List<UpdateOperation<K, V>> updates = Lists.newLinkedList();
+    protected List<DatabaseUpdate> prepareDatabaseUpdates() {
+        List<DatabaseUpdate> updates = Lists.newLinkedList();
         deleteSet.forEach(key -> {
             Versioned<V> original = readCache.get(key);
             if (original != null) {
-                updates.add(UpdateOperation.<K, V>newBuilder()
+                updates.add(DatabaseUpdate.newBuilder()
                         .withTableName(name)
-                        .withType(UpdateOperation.Type.REMOVE_IF_VERSION_MATCH)
-                        .withKey(key)
+                        .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                        .withKey(keyCache.getUnchecked(key))
                         .withCurrentVersion(original.version())
                         .build());
             }
@@ -192,44 +174,23 @@
         writeCache.forEach((key, value) -> {
             Versioned<V> original = readCache.get(key);
             if (original == null) {
-                updates.add(UpdateOperation.<K, V>newBuilder()
+                updates.add(DatabaseUpdate.newBuilder()
                         .withTableName(name)
-                        .withType(UpdateOperation.Type.PUT_IF_ABSENT)
-                        .withKey(key)
-                        .withValue(value)
+                        .withType(DatabaseUpdate.Type.PUT_IF_ABSENT)
+                        .withKey(keyCache.getUnchecked(key))
+                        .withValue(serializer.encode(value))
                         .build());
             } else {
-                updates.add(UpdateOperation.<K, V>newBuilder()
+                updates.add(DatabaseUpdate.newBuilder()
                         .withTableName(name)
-                        .withType(UpdateOperation.Type.PUT_IF_VERSION_MATCH)
-                        .withKey(key)
+                        .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey(keyCache.getUnchecked(key))
                         .withCurrentVersion(original.version())
-                        .withValue(value)
+                        .withValue(serializer.encode(value))
                         .build());
             }
         });
-        return updates.stream().map(this::toRawUpdateOperation).collect(Collectors.toList());
-    }
-
-    private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
-
-        UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
-
-        rawUpdate = rawUpdate.withKey(HexString.toHexString(serializer.encode(update.key())))
-            .withCurrentVersion(update.currentVersion())
-            .withType(update.type());
-
-        rawUpdate = rawUpdate.withTableName(update.tableName());
-
-        if (update.value() != null) {
-            rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
-        }
-
-        if (update.currentValue() != null) {
-            rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
-        }
-
-        return rawUpdate.build();
+        return updates;
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index ad049e6..178f49f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -27,7 +27,8 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Lists;
@@ -129,24 +130,27 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).put(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+    public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key);
     }
 
     @Override
-    public CompletableFuture<Void> clear(String tableName) {
+    public CompletableFuture<Result<Void>> clear(String tableName) {
+        AtomicBoolean isLocked = new AtomicBoolean(false);
         checkState(isOpen.get(), DB_NOT_OPEN);
         return CompletableFuture.allOf(partitions
                     .stream()
-                    .map(p -> p.clear(tableName))
-                    .toArray(CompletableFuture[]::new));
+                    .map(p -> p.clear(tableName)
+                            .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
     }
 
     @Override
@@ -183,59 +187,86 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key, version);
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(
+            String tableName, String key, byte[] oldValue, byte[] newValue) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(
+            String tableName, String key, long oldVersion, byte[] newValue) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
     }
 
     @Override
-    public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
-        checkState(isOpen.get(), DB_NOT_OPEN);
-        Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
-        for (UpdateOperation<String, byte[]> update : updates) {
-            Database partition = partitioner.getPartition(update.tableName(), update.key());
-            List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
-            if (partitionUpdates == null) {
-                partitionUpdates = Lists.newArrayList();
-                perPartitionUpdates.put(partition, partitionUpdates);
-            }
-            partitionUpdates.add(update);
-        }
-        if (perPartitionUpdates.size() > 1) {
-            // TODO
-            throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+    public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        if (subTransactions.isEmpty()) {
+            return CompletableFuture.completedFuture(true);
+        } else if (subTransactions.size() == 1) {
+            Entry<Database, Transaction> entry =
+                    subTransactions.entrySet().iterator().next();
+            return entry.getKey().prepareAndCommit(entry.getValue());
         } else {
-            Entry<Database, List<UpdateOperation<String, byte[]>>> only =
-                    perPartitionUpdates.entrySet().iterator().next();
-            return only.getKey().atomicBatchUpdate(only.getValue());
+            return new TransactionManager(this).execute(transaction);
         }
     }
 
     @Override
+    public CompletableFuture<Boolean> prepare(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        AtomicBoolean status = new AtomicBoolean(true);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry
+                        .getKey()
+                        .prepare(entry.getValue())
+                        .thenApply(v -> status.compareAndSet(true, v)))
+                .toArray(CompletableFuture[]::new))
+            .thenApply(v -> status.get());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> commit(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry.getKey().commit(entry.getValue()))
+                .toArray(CompletableFuture[]::new))
+        .thenApply(v -> true);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> rollback(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry.getKey().rollback(entry.getValue()))
+                .toArray(CompletableFuture[]::new))
+            .thenApply(v -> true);
+    }
+
+    @Override
     public CompletableFuture<Database> open() {
         return CompletableFuture.allOf(partitions
                     .stream()
@@ -243,7 +274,8 @@
                     .toArray(CompletableFuture[]::new))
                 .thenApply(v -> {
                     isOpen.set(true);
-                    return this; });
+                    return this;
+                });
     }
 
     @Override
@@ -279,4 +311,19 @@
     public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
         throw new UnsupportedOperationException();
     }
-}
\ No newline at end of file
+
+    private Map<Database, Transaction> createSubTransactions(
+            Transaction transaction) {
+        Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
+        for (DatabaseUpdate update : transaction.updates()) {
+            Database partition = partitioner.getPartition(update.tableName(), update.key());
+            List<DatabaseUpdate> partitionUpdates =
+                    perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
+            partitionUpdates.add(update);
+        }
+        Map<Database, Transaction> subTransactions = Maps.newHashMap();
+        perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
+
+        return subTransactions;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java
new file mode 100644
index 0000000..548174a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Result of a database update operation.
+ *
+ * @param <V> return value type
+ */
+public final class Result<V> {
+
+    public enum Status {
+        /**
+         * Indicates a successful update.
+         */
+        OK,
+
+        /**
+         * Indicates a failure due to underlying state being locked by another transaction.
+         */
+        LOCKED
+    }
+
+    private final Status status;
+    private final V value;
+
+    /**
+     * Creates a new Result instance with the specified value with status set to Status.OK.
+     *
+     * @param <V> result value type
+     * @param value result value
+     * @return Result instance
+     */
+    public static <V> Result<V> ok(V value) {
+        return new Result<>(value, Status.OK);
+    }
+
+    /**
+     * Creates a new Result instance with status set to Status.LOCKED.
+     *
+     * @param <V> result value type
+     * @return Result instance
+     */
+    public static <V> Result<V> locked() {
+        return new Result<>(null, Status.LOCKED);
+    }
+
+    private Result(V value, Status status) {
+        this.value = value;
+        this.status = status;
+    }
+
+    /**
+     * Returns true if this result indicates a successful execution i.e status is Status.OK.
+     *
+     * @return true if successful, false otherwise
+     */
+    public boolean success() {
+        return status == Status.OK;
+    }
+
+    /**
+     * Returns the status of database update operation.
+     * @return database update status
+     */
+    public Status status() {
+        return status;
+    }
+
+    /**
+     * Returns the return value for the update.
+     * @return value returned by database update. If the status is another
+     * other than Status.OK, this returns a null
+     */
+    public V value() {
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
index adc5477..e8daeff 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
@@ -36,4 +36,4 @@
     public Database getPartition(String tableName, String key) {
         return partitions.get(hash(tableName) % partitions.size());
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
new file mode 100644
index 0000000..865934b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.Transaction.State;
+
+/**
+ * Agent that runs the two phase commit protocol.
+ */
+public class TransactionManager {
+
+    private final Database database;
+    private final AsyncConsistentMap<Long, Transaction> transactions;
+
+    private final Serializer serializer = new Serializer() {
+
+        private KryoNamespace kryo = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.BASIC)
+                .nextId(KryoNamespace.FLOATING_ID)
+                .register(Versioned.class)
+                .register(DatabaseUpdate.class)
+                .register(DatabaseUpdate.Type.class)
+                .register(DefaultTransaction.class)
+                .register(Transaction.State.class)
+                .register(Pair.class)
+                .register(ImmutablePair.class)
+                .build();
+
+        @Override
+        public <T> byte[] encode(T object) {
+            return kryo.serialize(object);
+        }
+
+        @Override
+        public <T> T decode(byte[] bytes) {
+            return kryo.deserialize(bytes);
+        }
+    };
+
+    /**
+     * Constructs a new TransactionManager for the specified database instance.
+     *
+     * @param database database
+     */
+    public TransactionManager(Database database) {
+        this.database = checkNotNull(database, "database cannot be null");
+        this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer);
+    }
+
+    /**
+     * Executes the specified transaction by employing a two phase commit protocol.
+     *
+     * @param transaction transaction to commit
+     * @return transaction result. Result value true indicates a successful commit, false
+     * indicates abort
+     */
+    public CompletableFuture<Boolean> execute(Transaction transaction) {
+        // clean up if this transaction in already in a terminal state.
+        if (transaction.state() == Transaction.State.COMMITTED ||
+                transaction.state() == Transaction.State.ROLLEDBACK) {
+            return transactions.remove(transaction.id()).thenApply(v -> true);
+        } else if (transaction.state() == Transaction.State.COMMITTING) {
+            return commit(transaction);
+        } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
+            return rollback(transaction);
+        } else {
+            return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
+        }
+    }
+
+
+    /**
+     * Returns all transactions in the system.
+     *
+     * @return future for a collection of transactions
+     */
+    public CompletableFuture<Collection<Transaction>> getTransactions() {
+        return transactions.values().thenApply(c -> {
+            Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
+            return txns;
+        });
+    }
+
+    private CompletableFuture<Boolean> prepare(Transaction transaction) {
+        return transactions.put(transaction.id(), transaction)
+                .thenCompose(v -> database.prepare(transaction))
+                .thenCompose(status -> transactions.put(
+                            transaction.id(),
+                            transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
+                .thenApply(v -> status));
+    }
+
+    private CompletableFuture<Boolean> commit(Transaction transaction) {
+        return database.commit(transaction)
+                .thenCompose(v -> transactions.put(
+                            transaction.id(),
+                            transaction.transition(Transaction.State.COMMITTED)))
+                .thenApply(v -> true);
+    }
+
+    private CompletableFuture<Boolean> rollback(Transaction transaction) {
+        return database.rollback(transaction)
+                .thenCompose(v -> transactions.put(
+                            transaction.id(),
+                            transaction.transition(Transaction.State.ROLLEDBACK)))
+                .thenApply(v -> true);
+    }
+}
\ No newline at end of file