[ONOS-6345] Track tombstones within transactions for optimistic locking on null values

Change-Id: Ib4764721e512462ec1552124ff696b8f89687d8f
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
index e04f10e..f720ded 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
@@ -31,7 +31,6 @@
  *
  * @param <K> map key type
  * @param <V> map value type
- *
  */
 public final class MapUpdate<K, V> {
 
@@ -39,47 +38,38 @@
      * Type of database update operation.
      */
     public enum Type {
-        /**
-         * Insert/Update entry without any checks.
-         */
-        PUT,
 
         /**
-         * Insert an entry iff there is no existing entry for that key.
+         * Acquires a read lock on a key.
+         * <p>
+         * This record type will check to ensure that the lock version matches the current version for the key
+         * in the map and acquire a read lock on the key for the duration of the transaction.
          */
-        PUT_IF_ABSENT,
+        LOCK,
 
         /**
-         * Update entry if the current version matches specified version.
+         * Checks the version of a key without locking the key.
+         * <p>
+         * This record type will perform a simple version check during the prepare phase of the two-phase commit
+         * protocol to ensure that the key has not changed during a transaction.
+         */
+        VERSION_MATCH,
+
+        /**
+         * Updates an entry if the current version matches specified version.
          */
         PUT_IF_VERSION_MATCH,
 
         /**
-         * Update entry if the current value matches specified value.
-         */
-        PUT_IF_VALUE_MATCH,
-
-        /**
-         * Remove entry without any checks.
-         */
-        REMOVE,
-
-        /**
-         * Remove entry if the current version matches specified version.
+         * Removes an entry if the current version matches specified version.
          */
         REMOVE_IF_VERSION_MATCH,
-
-        /**
-         * Remove entry if the current value matches specified value.
-         */
-        REMOVE_IF_VALUE_MATCH,
     }
 
     private Type type;
     private K key;
     private V value;
-    private V currentValue;
-    private long currentVersion = -1;
+    private long version = -1;
 
     /**
      * Returns the type of update operation.
@@ -106,19 +96,11 @@
     }
 
     /**
-     * Returns the expected current value for the key.
-     * @return current value in database.
-     */
-    public V currentValue() {
-        return currentValue;
-    }
-
-    /**
      * Returns the expected current version in the database for the key.
      * @return expected version.
      */
-    public long currentVersion() {
-        return currentVersion;
+    public long version() {
+        return version;
     }
 
     /**
@@ -135,14 +117,13 @@
                 .withType(type)
                 .withKey(keyMapper.apply(key))
                 .withValue(value == null ? null : valueMapper.apply(value))
-                .withCurrentValue(currentValue == null ? null : valueMapper.apply(currentValue))
-                .withCurrentVersion(currentVersion)
+                .withVersion(version)
                 .build();
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(type, key, value, currentValue, currentVersion);
+        return Objects.hash(type, key, value, version);
     }
 
     @Override
@@ -152,8 +133,7 @@
             return this.type == that.type
                     && Objects.equals(this.key, that.key)
                     && Objects.equals(this.value, that.value)
-                    && Objects.equals(this.currentValue, that.currentValue)
-                    && Objects.equals(this.currentVersion, that.currentVersion);
+                    && Objects.equals(this.version, that.version);
         }
         return false;
     }
@@ -164,8 +144,7 @@
             .add("type", type)
             .add("key", key)
             .add("value", value instanceof byte[] ? new ByteArraySizeHashPrinter((byte[]) value) : value)
-            .add("currentValue", currentValue)
-            .add("currentVersion", currentVersion)
+            .add("version", version)
             .toString();
     }
 
@@ -205,48 +184,38 @@
             return this;
         }
 
-        public Builder<K, V> withCurrentValue(V value) {
-            update.currentValue = value;
-            return this;
-        }
-
         public Builder<K, V> withValue(V value) {
             update.value = value;
             return this;
         }
 
-        public Builder<K, V> withCurrentVersion(long version) {
-            update.currentVersion = version;
+        public Builder<K, V> withVersion(long version) {
+            update.version = version;
             return this;
         }
 
         private void validateInputs() {
             checkNotNull(update.type, "type must be specified");
-            checkNotNull(update.key, "key must be specified");
             switch (update.type) {
-            case PUT:
-            case PUT_IF_ABSENT:
-                checkNotNull(update.value, "value must be specified.");
-                break;
-            case PUT_IF_VERSION_MATCH:
-                checkNotNull(update.value, "value must be specified.");
-                checkState(update.currentVersion >= 0, "current version must be specified");
-                break;
-            case PUT_IF_VALUE_MATCH:
-                checkNotNull(update.value, "value must be specified.");
-                checkNotNull(update.currentValue, "currentValue must be specified.");
-                break;
-            case REMOVE:
-                break;
-            case REMOVE_IF_VERSION_MATCH:
-                checkState(update.currentVersion >= 0, "current version must be specified");
-                break;
-            case REMOVE_IF_VALUE_MATCH:
-                checkNotNull(update.currentValue, "currentValue must be specified.");
-                break;
-            default:
-                throw new IllegalStateException("Unknown operation type");
+                case VERSION_MATCH:
+                    break;
+                case LOCK:
+                    checkNotNull(update.key, "key must be specified");
+                    checkState(update.version >= 0, "version must be specified");
+                    break;
+                case PUT_IF_VERSION_MATCH:
+                    checkNotNull(update.key, "key must be specified");
+                    checkNotNull(update.value, "value must be specified.");
+                    checkState(update.version >= 0, "version must be specified");
+                    break;
+                case REMOVE_IF_VERSION_MATCH:
+                    checkNotNull(update.key, "key must be specified");
+                    checkState(update.version >= 0, "version must be specified");
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown operation type");
             }
+
         }
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java b/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java
index c367f5d..ab84c3a 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java
@@ -31,10 +31,12 @@
  */
 public class TransactionLog<T> {
     private final TransactionId transactionId;
+    private final long version;
     private final List<T> records;
 
-    public TransactionLog(TransactionId transactionId, List<T> records) {
+    public TransactionLog(TransactionId transactionId, long version, List<T> records) {
         this.transactionId = transactionId;
+        this.version = version;
         this.records = ImmutableList.copyOf(records);
     }
 
@@ -48,6 +50,15 @@
     }
 
     /**
+     * Returns the transaction lock version.
+     *
+     * @return the transaction lock version
+     */
+    public long version() {
+        return version;
+    }
+
+    /**
      * Returns the list of transaction log records.
      *
      * @return a list of transaction log records
@@ -75,6 +86,7 @@
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                 .add("transactionId", transactionId)
+                .add("version", version)
                 .add("records", records)
                 .toString();
     }
@@ -88,6 +100,6 @@
      * @param <U> record type of returned instance
      */
     public <U> TransactionLog<U> map(Function<T, U> mapper) {
-        return new TransactionLog<>(transactionId, Lists.transform(records, mapper::apply));
+        return new TransactionLog<>(transactionId, version, Lists.transform(records, mapper::apply));
     }
 }
\ No newline at end of file
diff --git a/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java b/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
index 53cc160..6f16e97 100644
--- a/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
+++ b/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
@@ -29,49 +29,27 @@
 public class MapUpdateTest {
 
     private final MapUpdate<String, byte[]> stats1 = MapUpdate.<String, byte[]>newBuilder()
-            .withCurrentValue("1".getBytes())
             .withValue("2".getBytes())
-            .withCurrentVersion(3)
+            .withVersion(3)
             .withKey("4")
-            .withType(MapUpdate.Type.PUT)
+            .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
             .build();
 
     private final MapUpdate<String, byte[]> stats2 = MapUpdate.<String, byte[]>newBuilder()
-            .withCurrentValue("1".getBytes())
-            .withValue("2".getBytes())
-            .withCurrentVersion(3)
-            .withKey("4")
-            .withType(MapUpdate.Type.REMOVE)
+            .withType(MapUpdate.Type.VERSION_MATCH)
+            .withVersion(10)
             .build();
 
     private final MapUpdate<String, byte[]> stats3 = MapUpdate.<String, byte[]>newBuilder()
-            .withCurrentValue("1".getBytes())
             .withValue("2".getBytes())
-            .withCurrentVersion(3)
-            .withKey("4")
-            .withType(MapUpdate.Type.REMOVE_IF_VALUE_MATCH)
-            .build();
-
-    private final MapUpdate<String, byte[]> stats4 = MapUpdate.<String, byte[]>newBuilder()
-            .withCurrentValue("1".getBytes())
-            .withValue("2".getBytes())
-            .withCurrentVersion(3)
+            .withVersion(3)
             .withKey("4")
             .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
             .build();
 
-    private final MapUpdate<String, byte[]> stats5 = MapUpdate.<String, byte[]>newBuilder()
-            .withCurrentValue("1".getBytes())
+    private final MapUpdate<String, byte[]> stats4 = MapUpdate.<String, byte[]>newBuilder()
             .withValue("2".getBytes())
-            .withCurrentVersion(3)
-            .withKey("4")
-            .withType(MapUpdate.Type.PUT_IF_VALUE_MATCH)
-            .build();
-
-    private final MapUpdate<String, byte[]> stats6 = MapUpdate.<String, byte[]>newBuilder()
-            .withCurrentValue("1".getBytes())
-            .withValue("2".getBytes())
-            .withCurrentVersion(3)
+            .withVersion(3)
             .withKey("4")
             .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
             .build();
@@ -81,11 +59,10 @@
      */
     @Test
     public void testConstruction() {
-        assertThat(stats1.currentValue(), is("1".getBytes()));
         assertThat(stats1.value(), is("2".getBytes()));
-        assertThat(stats1.currentVersion(), is(3L));
+        assertThat(stats1.version(), is(3L));
         assertThat(stats1.key(), is("4"));
-        assertThat(stats1.type(), is(MapUpdate.Type.PUT));
+        assertThat(stats1.type(), is(MapUpdate.Type.PUT_IF_VERSION_MATCH));
     }
 
     /**
@@ -102,11 +79,6 @@
                 .addEqualityGroup(stats3, stats3)
                 .addEqualityGroup(stats4)
                 .testEquals();
-
-        new EqualsTester()
-                .addEqualityGroup(stats5, stats5)
-                .addEqualityGroup(stats6)
-                .testEquals();
     }
 
     /**
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
index 9acc377..19f6e0c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
@@ -22,6 +22,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -37,13 +38,19 @@
 
     @Override
     protected V read(K key) {
-        Versioned<V> value = readCache.computeIfAbsent(key, backingMap::get);
-        return value != null ? value.value() : null;
+        Versioned<V> value = backingMap.getOrDefault(key, null);
+        readCache.put(key, value);
+        return value.value();
     }
 
     @Override
-    protected Stream<MapUpdate<K, V>> records() {
-        return Stream.concat(deleteStream(), writeStream());
+    public boolean hasPendingUpdates() {
+        return !writeCache.isEmpty() || !deleteSet.isEmpty();
+    }
+
+    @Override
+    protected Stream<MapUpdate<K, V>> records(Version lockVersion) {
+        return Stream.concat(deleteStream(), writeStream(lockVersion));
     }
 
     /**
@@ -52,34 +59,26 @@
     private Stream<MapUpdate<K, V>> deleteStream() {
         return deleteSet.stream()
                 .map(key -> Pair.of(key, readCache.get(key)))
-                .filter(e -> e.getValue() != null)
                 .map(e -> MapUpdate.<K, V>newBuilder()
                         .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
                         .withKey(e.getKey())
-                        .withCurrentVersion(e.getValue().version())
+                        .withVersion(e.getValue().version())
                         .build());
     }
 
     /**
      * Returns a transaction record stream for updated keys.
      */
-    private Stream<MapUpdate<K, V>> writeStream() {
-        return writeCache.entrySet().stream().map(entry -> {
-            Versioned<V> original = readCache.get(entry.getKey());
-            if (original == null) {
-                return MapUpdate.<K, V>newBuilder()
-                        .withType(MapUpdate.Type.PUT_IF_ABSENT)
-                        .withKey(entry.getKey())
-                        .withValue(entry.getValue())
-                        .build();
-            } else {
-                return MapUpdate.<K, V>newBuilder()
-                        .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
-                        .withKey(entry.getKey())
-                        .withCurrentVersion(original.version())
-                        .withValue(entry.getValue())
-                        .build();
-            }
-        });
+    private Stream<MapUpdate<K, V>> writeStream(Version lockVersion) {
+        return writeCache.entrySet().stream()
+                .map(entry -> {
+                    Versioned<V> original = readCache.get(entry.getKey());
+                    return MapUpdate.<K, V>newBuilder()
+                            .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                            .withKey(entry.getKey())
+                            .withValue(entry.getValue())
+                            .withVersion(Math.max(original.version(), lockVersion.value()))
+                            .build();
+                });
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index 3ea2066..d74748b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -110,6 +110,7 @@
     protected final Transactional<T> transactionalObject;
     private final AtomicBoolean open = new AtomicBoolean();
     private volatile State state = State.ACTIVE;
+    private volatile Version lock;
 
     public Transaction(TransactionId transactionId, Transactional<T> transactionalObject) {
         this.transactionId = transactionId;
@@ -192,7 +193,10 @@
      */
     public CompletableFuture<Version> begin() {
         open();
-        return transactionalObject.begin(transactionId);
+        return transactionalObject.begin(transactionId).thenApply(lock -> {
+            this.lock = lock;
+            return lock;
+        });
     }
 
     /**
@@ -209,8 +213,10 @@
     public CompletableFuture<Boolean> prepare(List<T> updates) {
         checkOpen();
         checkActive();
+        Version lock = this.lock;
+        checkState(lock != null, TX_INACTIVE_ERROR);
         setState(State.PREPARING);
-        return transactionalObject.prepare(new TransactionLog<T>(transactionId, updates))
+        return transactionalObject.prepare(new TransactionLog<T>(transactionId, lock.value(), updates))
                 .thenApply(succeeded -> {
                     setState(State.PREPARED);
                     return succeeded;
@@ -229,8 +235,10 @@
     public CompletableFuture<Boolean> prepareAndCommit(List<T> updates) {
         checkOpen();
         checkActive();
+        Version lock = this.lock;
+        checkState(lock != null, TX_INACTIVE_ERROR);
         setState(State.PREPARING);
-        return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, updates))
+        return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, lock.value(), updates))
                 .thenApply(succeeded -> {
                     setState(State.COMMITTED);
                     return succeeded;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
index 4429a1b..977312a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.store.primitives.impl;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -60,7 +59,6 @@
     protected final Transaction<MapUpdate<K, V>> transaction;
     protected final Map<K, V> writeCache = Maps.newConcurrentMap();
     protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
-    protected final List<MapUpdate<K, V>> log = new ArrayList<>();
     protected volatile Version lock;
 
     protected TransactionalMapParticipant(
@@ -77,18 +75,22 @@
      * synchronized prior to a read.
      */
     private void beginTransaction() {
-        if (!transaction.isOpen()) {
-            try {
-                lock = transaction.begin()
-                        .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new TransactionException.Interrupted();
-            } catch (TimeoutException e) {
-                throw new TransactionException.Timeout();
-            } catch (ExecutionException e) {
-                Throwables.propagateIfPossible(e.getCause());
-                throw new TransactionException(e.getCause());
+        if (lock == null) {
+            synchronized (this) {
+                if (lock == null) {
+                    try {
+                        lock = transaction.begin()
+                                .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new TransactionException.Interrupted();
+                    } catch (TimeoutException e) {
+                        throw new TransactionException.Timeout();
+                    } catch (ExecutionException e) {
+                        Throwables.propagateIfPossible(e.getCause());
+                        throw new TransactionException(e.getCause());
+                    }
+                }
             }
         }
     }
@@ -183,13 +185,8 @@
     }
 
     @Override
-    public boolean hasPendingUpdates() {
-        return records().findAny().isPresent();
-    }
-
-    @Override
     public CompletableFuture<Boolean> prepare() {
-        return transaction.prepare(log());
+        return transaction.prepare(log(lock));
     }
 
     @Override
@@ -199,7 +196,7 @@
 
     @Override
     public CompletableFuture<Boolean> prepareAndCommit() {
-        return transaction.prepareAndCommit(log());
+        return transaction.prepareAndCommit(log(lock));
     }
 
     @Override
@@ -210,24 +207,25 @@
     /**
      * Returns a list of updates performed within this map partition.
      *
+     * @param lockVersion the global transaction lock version
      * @return a list of map updates
      */
-    protected List<MapUpdate<K, V>> log() {
-        return records().collect(Collectors.toList());
+    protected List<MapUpdate<K, V>> log(Version lockVersion) {
+        return records(lockVersion).collect(Collectors.toList());
     }
 
     /**
      * Returns a stream of updates performed within this map partition.
      *
+     * @param lockVersion the global transaction lock version
      * @return a stream of map updates
      */
-    protected abstract Stream<MapUpdate<K, V>> records();
+    protected abstract Stream<MapUpdate<K, V>> records(Version lockVersion);
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
                 .add("backingMap", backingMap)
-                .add("updates", log())
                 .toString();
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 8a13c01..7220c37 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -298,7 +298,7 @@
 
     @Override
     public CompletableFuture<Version> begin(TransactionId transactionId) {
-        return client.submit(new TransactionBegin()).thenApply(Version::new);
+        return client.submit(new TransactionBegin(transactionId)).thenApply(Version::new);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 0c5dd76..8dd4096 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -15,6 +15,11 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.MoreObjects;
 import io.atomix.catalyst.buffer.BufferInput;
 import io.atomix.catalyst.buffer.BufferOutput;
 import io.atomix.catalyst.serializer.CatalystSerializable;
@@ -24,19 +29,12 @@
 import io.atomix.catalyst.util.Assert;
 import io.atomix.copycat.Command;
 import io.atomix.copycat.Query;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
 import org.onlab.util.Match;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 
-import com.google.common.base.MoreObjects;
-
 /**
  * {@link AtomixConsistentMap} resource state machine operations.
  */
@@ -203,12 +201,32 @@
     }
 
     /**
-     * Transaction begin query.
+     * Transaction begin command.
      */
-    public static class TransactionBegin extends MapQuery<Long> {
+    public static class TransactionBegin extends MapCommand<Long> {
+        private TransactionId transactionId;
+
+        public TransactionBegin() {
+        }
+
+        public TransactionBegin(TransactionId transactionId) {
+            this.transactionId = transactionId;
+        }
+
+        public TransactionId transactionId() {
+            return transactionId;
+        }
+
         @Override
-        public ConsistencyLevel consistency() {
-            return ConsistencyLevel.LINEARIZABLE;
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(transactionId, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            transactionId = serializer.readObject(buffer);
         }
     }
 
@@ -264,7 +282,7 @@
 
         @Override
         public CompactionMode compaction() {
-            return CompactionMode.QUORUM;
+            return CompactionMode.TOMBSTONE;
         }
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 433b3b3..2fc91c8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -15,11 +15,6 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.onosproject.store.service.MapEvent.Type.INSERT;
-import static org.onosproject.store.service.MapEvent.Type.REMOVE;
-import static org.onosproject.store.service.MapEvent.Type.UPDATE;
-import static org.slf4j.LoggerFactory.getLogger;
 import io.atomix.copycat.server.Commit;
 import io.atomix.copycat.server.Snapshottable;
 import io.atomix.copycat.server.StateMachineExecutor;
@@ -36,7 +31,6 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import org.onlab.util.CountDownCompleter;
@@ -71,6 +65,12 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.store.service.MapEvent.Type.INSERT;
+import static org.onosproject.store.service.MapEvent.Type.REMOVE;
+import static org.onosproject.store.service.MapEvent.Type.UPDATE;
+import static org.slf4j.LoggerFactory.getLogger;
+
 /**
  * State Machine for {@link AtomixConsistentMap} resource.
  */
@@ -80,8 +80,8 @@
     private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
     private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
     private final Set<String> preparedKeys = Sets.newHashSet();
-    private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
-    private AtomicLong versionCounter = new AtomicLong(0);
+    private final Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+    private long currentVersion;
 
     public AtomixConsistentMapState(Properties properties) {
         super(properties);
@@ -89,12 +89,10 @@
 
     @Override
     public void snapshot(SnapshotWriter writer) {
-        writer.writeLong(versionCounter.get());
     }
 
     @Override
     public void install(SnapshotReader reader) {
-        versionCounter = new AtomicLong(reader.readLong());
     }
 
     @Override
@@ -141,7 +139,8 @@
      */
     protected boolean containsKey(Commit<? extends ContainsKey> commit) {
         try {
-            return toVersioned(mapEntries.get(commit.operation().key())) != null;
+            MapEntryValue value = mapEntries.get(commit.operation().key());
+            return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
         } finally {
             commit.close();
         }
@@ -155,9 +154,9 @@
      */
     protected boolean containsValue(Commit<? extends ContainsValue> commit) {
         try {
-            Match<byte[]> valueMatch = Match
-                    .ifValue(commit.operation().value());
+            Match<byte[]> valueMatch = Match.ifValue(commit.operation().value());
             return mapEntries.values().stream()
+                    .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
                     .anyMatch(value -> valueMatch.matches(value.value()));
         } finally {
             commit.close();
@@ -186,8 +185,14 @@
      */
     protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
         try {
-            Versioned<byte[]> value = toVersioned(mapEntries.get(commit.operation().key()));
-            return value != null ? value : new Versioned<>(commit.operation().defaultValue(), 0);
+            MapEntryValue value = mapEntries.get(commit.operation().key());
+            if (value == null) {
+                return new Versioned<>(commit.operation().defaultValue(), 0);
+            } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
+                return new Versioned<>(commit.operation().defaultValue(), value.version);
+            } else {
+                return new Versioned<>(value.value(), value.version);
+            }
         } finally {
             commit.close();
         }
@@ -201,7 +206,9 @@
      */
     protected int size(Commit<? extends Size> commit) {
         try {
-            return mapEntries.size();
+            return (int) mapEntries.values().stream()
+                    .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+                    .count();
         } finally {
             commit.close();
         }
@@ -215,7 +222,8 @@
      */
     protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
         try {
-            return mapEntries.isEmpty();
+            return mapEntries.values().stream()
+                    .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
         } finally {
             commit.close();
         }
@@ -229,7 +237,10 @@
      */
     protected Set<String> keySet(Commit<? extends KeySet> commit) {
         try {
-            return mapEntries.keySet().stream().collect(Collectors.toSet());
+            return mapEntries.entrySet().stream()
+                    .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toSet());
         } finally {
             commit.close();
         }
@@ -243,7 +254,10 @@
      */
     protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
         try {
-            return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
+            return mapEntries.entrySet().stream()
+                    .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+                    .map(entry -> toVersioned(entry.getValue()))
+                    .collect(Collectors.toList());
         } finally {
             commit.close();
         }
@@ -258,11 +272,9 @@
      */
     protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
         try {
-            return mapEntries
-                    .entrySet()
-                    .stream()
-                    .map(e -> Maps.immutableEntry(e.getKey(),
-                            toVersioned(e.getValue())))
+            return mapEntries.entrySet().stream()
+                    .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+                    .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
                     .collect(Collectors.toSet());
         } finally {
             commit.close();
@@ -289,21 +301,34 @@
             }
 
             byte[] newValue = commit.operation().value();
-            long newVersion = versionCounter.incrementAndGet();
+            currentVersion = commit.index();
             Versioned<byte[]> newMapValue = newValue == null ? null
-                    : new Versioned<>(newValue, newVersion);
+                    : new Versioned<>(newValue, currentVersion);
 
             MapEvent.Type updateType = newValue == null ? REMOVE
                     : oldCommitValue == null ? INSERT : UPDATE;
+
+            // If a value existed in the map, remove and discard the value to ensure disk can be freed.
             if (updateType == REMOVE || updateType == UPDATE) {
                 mapEntries.remove(key);
                 oldCommitValue.discard();
             }
+
+            // If this is an insert/update commit, add the commit to the map entries.
             if (updateType == INSERT || updateType == UPDATE) {
-                mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
+                mapEntries.put(key, new NonTransactionalCommit(commit));
+            } else if (!activeTransactions.isEmpty()) {
+                // If this is a delete but transactions are currently running, ensure tombstones are retained
+                // for version checks.
+                TombstoneCommit tombstone = new TombstoneCommit(
+                        commit.index(),
+                        new CountDownCompleter<>(commit, 1, Commit::close));
+                mapEntries.put(key, tombstone);
             } else {
+                // If no transactions are in progress, we can safely delete the key from memory.
                 commit.close();
             }
+
             publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
             return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
                     newMapValue);
@@ -387,7 +412,9 @@
      */
     protected long begin(Commit<? extends TransactionBegin> commit) {
         try {
-            return commit.index();
+            long version = commit.index();
+            activeTransactions.put(commit.operation().transactionId(), new TransactionScope(version));
+            return version;
         } finally {
             commit.close();
         }
@@ -401,11 +428,18 @@
      */
     protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
         PrepareResult prepareResult = prepare(commit);
+        TransactionScope transactionScope =
+                activeTransactions.remove(commit.operation().transactionLog().transactionId());
+        this.currentVersion = commit.index();
         if (prepareResult == PrepareResult.OK) {
-
-            commitInternal(commit.operation().transactionLog().transactionId()/*, commit.index()*/);
+            transactionScope = transactionScope.prepared(commit);
+            commit(transactionScope);
+        } else {
+            transactionScope.close();
         }
+        discardTombstones();
         return prepareResult;
+
     }
 
     /**
@@ -416,30 +450,72 @@
      */
     protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
         boolean ok = false;
+
         try {
-            TransactionLog<MapUpdate<String, byte[]>> transaction = commit.operation().transactionLog();
-            for (MapUpdate<String, byte[]> update : transaction.records()) {
-                String key = update.key();
+            TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.operation().transactionLog();
+
+            // Iterate through records in the transaction log and perform isolation checks.
+            for (MapUpdate<String, byte[]> record : transactionLog.records()) {
+                String key = record.key();
+
+                // If the record is a VERSION_MATCH then check that the record's version matches the current
+                // version of the state machine.
+                if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
+                    if (record.version() > currentVersion) {
+                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                    } else {
+                        continue;
+                    }
+                }
+
+                // If the prepared keys already contains the key contained within the record, that indicates a
+                // conflict with a concurrent transaction.
                 if (preparedKeys.contains(key)) {
                     return PrepareResult.CONCURRENT_TRANSACTION;
                 }
+
+                // Read the existing value from the map.
                 MapEntryValue existingValue = mapEntries.get(key);
+
+                // Note: if the existing value is null, that means the key has not changed during the transaction,
+                // otherwise a tombstone would have been retained.
                 if (existingValue == null) {
-                    if (update.type() != MapUpdate.Type.PUT_IF_ABSENT) {
+                    // If the value is null, ensure the version is equal to the transaction version.
+                    if (record.version() != transactionLog.version()) {
                         return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
                     }
                 } else {
-                    if (existingValue.version() != update.currentVersion()) {
+                    // If the value is non-null, compare the current version with the record version.
+                    if (existingValue.version() > record.version()) {
                         return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
                     }
                 }
             }
-            // No violations detected. Add to pendingTransactions and mark
-            // modified keys as locked for updates.
-            pendingTransactions.put(transaction.transactionId(), commit);
-            transaction.records().forEach(u -> preparedKeys.add(u.key()));
+
+            // No violations detected. Mark modified keys locked for transactions.
+            transactionLog.records().forEach(record -> {
+                if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+                    preparedKeys.add(record.key());
+                }
+            });
+
             ok = true;
-            return PrepareResult.OK;
+
+            // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
+            // coordinator is communicating with another node. Transactions assume that the client is communicating
+            // with a single leader in order to limit the overhead of retaining tombstones.
+            TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
+            if (transactionScope == null) {
+                activeTransactions.put(
+                        transactionLog.transactionId(),
+                        new TransactionScope(transactionLog.version(), commit));
+                return PrepareResult.PARTIAL_FAILURE;
+            } else {
+                activeTransactions.put(
+                        transactionLog.transactionId(),
+                        transactionScope.prepared(commit));
+                return PrepareResult.OK;
+            }
         } catch (Exception e) {
             log.warn("Failure applying {}", commit, e);
             throw Throwables.propagate(e);
@@ -458,43 +534,72 @@
      */
     protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
         TransactionId transactionId = commit.operation().transactionId();
+        TransactionScope transactionScope = activeTransactions.remove(transactionId);
+        if (transactionScope == null) {
+            return CommitResult.UNKNOWN_TRANSACTION_ID;
+        }
+
         try {
-            return commitInternal(transactionId);
+            this.currentVersion = commit.index();
+            return commit(transactionScope.committed(commit));
         } catch (Exception e) {
             log.warn("Failure applying {}", commit, e);
             throw Throwables.propagate(e);
         } finally {
-            commit.close();
+            discardTombstones();
         }
     }
 
-    private CommitResult commitInternal(TransactionId transactionId) {
-        Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
-                .remove(transactionId);
-        if (prepareCommit == null) {
-            return CommitResult.UNKNOWN_TRANSACTION_ID;
-        }
-        TransactionLog<MapUpdate<String, byte[]>> transaction = prepareCommit.operation().transactionLog();
-        long totalReferencesToCommit = transaction
-                .records()
-                .stream()
-                .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+    /**
+     * Applies committed operations to the state machine.
+     */
+    private CommitResult commit(TransactionScope transactionScope) {
+        TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
+        boolean retainTombstones = !activeTransactions.isEmpty();
+
+        // Count the total number of keys that will be set by this transaction. This is necessary to do reference
+        // counting for garbage collection.
+        long totalReferencesToCommit = transactionLog.records().stream()
+                // No keys are set for version checks. For deletes, references are only retained of tombstones
+                // need to be retained for concurrent transactions.
+                .filter(record -> record.type() != MapUpdate.Type.VERSION_MATCH && record.type() != MapUpdate.Type.LOCK
+                        && (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH || retainTombstones))
                 .count();
-        CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
-                new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+
+        // Create a count down completer that counts references to the transaction commit for garbage collection.
+        CountDownCompleter<TransactionScope> completer = new CountDownCompleter<>(
+                transactionScope, totalReferencesToCommit, TransactionScope::close);
+
         List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
-        for (MapUpdate<String, byte[]> update : transaction.records()) {
-            String key = update.key();
+        for (MapUpdate<String, byte[]> record : transactionLog.records()) {
+            if (record.type() == MapUpdate.Type.VERSION_MATCH) {
+                continue;
+            }
+
+            String key = record.key();
             checkState(preparedKeys.remove(key), "key is not prepared");
+
+            if (record.type() == MapUpdate.Type.LOCK) {
+                continue;
+            }
+
             MapEntryValue previousValue = mapEntries.remove(key);
             MapEntryValue newValue = null;
-            if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
-                newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
+
+            // If the record is not a delete, create a transactional commit.
+            if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+                newValue = new TransactionalCommit(currentVersion, record.value(), completer);
+            } else if (retainTombstones) {
+                // For deletes, if tombstones need to be retained then create and store a tombstone commit.
+                newValue = new TombstoneCommit(currentVersion, completer);
             }
+
             eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+
             if (newValue != null) {
                 mapEntries.put(key, newValue);
             }
+
             if (previousValue != null) {
                 previousValue.discard();
             }
@@ -511,20 +616,57 @@
      */
     protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
         TransactionId transactionId = commit.operation().transactionId();
-        try {
-            Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
-            if (prepareCommit == null) {
-                return RollbackResult.UNKNOWN_TRANSACTION_ID;
-            } else {
-                prepareCommit.operation()
-                             .transactionLog()
-                             .records()
-                             .forEach(u -> preparedKeys.remove(u.key()));
-                prepareCommit.close();
-                return RollbackResult.OK;
-            }
-        } finally {
+        TransactionScope transactionScope = activeTransactions.remove(transactionId);
+        if (transactionScope == null) {
+            return RollbackResult.UNKNOWN_TRANSACTION_ID;
+        } else if (!transactionScope.isPrepared()) {
+            discardTombstones();
+            transactionScope.close();
             commit.close();
+            return RollbackResult.OK;
+        } else {
+            try {
+                transactionScope.transactionLog().records()
+                        .forEach(record -> {
+                            if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+                                preparedKeys.remove(record.key());
+                            }
+                        });
+                return RollbackResult.OK;
+            } finally {
+                discardTombstones();
+                transactionScope.close();
+                commit.close();
+            }
+        }
+
+    }
+
+    /**
+     * Discards tombstones no longer needed by active transactions.
+     */
+    private void discardTombstones() {
+        if (activeTransactions.isEmpty()) {
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+            while (iterator.hasNext()) {
+                MapEntryValue value = iterator.next().getValue();
+                if (value.type() == MapEntryValue.Type.TOMBSTONE) {
+                    iterator.remove();
+                    value.discard();
+                }
+            }
+        } else {
+            long lowWaterMark = activeTransactions.values().stream()
+                    .mapToLong(TransactionScope::version)
+                    .min().getAsLong();
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+            while (iterator.hasNext()) {
+                MapEntryValue value = iterator.next().getValue();
+                if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
+                    iterator.remove();
+                    value.discard();
+                }
+            }
         }
     }
 
@@ -558,7 +700,8 @@
      * @return versioned instance
      */
     private Versioned<byte[]> toVersioned(MapEntryValue value) {
-        return value == null ? null : new Versioned<>(value.value(), value.version());
+        return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
+                ? new Versioned<>(value.value(), value.version()) : null;
     }
 
     /**
@@ -599,52 +742,73 @@
     /**
      * Interface implemented by map values.
      */
-    private interface MapEntryValue {
+    private abstract static class MapEntryValue {
+        protected final Type type;
+        protected final long version;
+
+        MapEntryValue(Type type, long version) {
+            this.type = type;
+            this.version = version;
+        }
+
         /**
-         * Returns the raw {@code byte[]}.
+         * Returns the value type.
          *
-         * @return raw value
+         * @return the value type
          */
-        byte[] value();
+        Type type() {
+            return type;
+        }
 
         /**
          * Returns the version of the value.
          *
          * @return version
          */
-        long version();
+        long version() {
+            return version;
+        }
+
+        /**
+         * Returns the raw {@code byte[]}.
+         *
+         * @return raw value
+         */
+        abstract byte[] value();
 
         /**
          * Discards the value by invoke appropriate clean up actions.
          */
-        void discard();
+        abstract void discard();
+
+        /**
+         * Value type.
+         */
+        enum Type {
+            VALUE,
+            TOMBSTONE,
+        }
     }
 
     /**
      * A {@code MapEntryValue} that is derived from a non-transactional update
      * i.e. via any standard map update operation.
      */
-    private class NonTransactionalCommit implements MapEntryValue {
-        private final long version;
+    private static class NonTransactionalCommit extends MapEntryValue {
         private final Commit<? extends UpdateAndGet> commit;
 
-        public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
-            this.version = version;
+        NonTransactionalCommit(Commit<? extends UpdateAndGet> commit) {
+            super(Type.VALUE, commit.index());
             this.commit = commit;
         }
 
         @Override
-        public byte[] value() {
+        byte[] value() {
             return commit.operation().value();
         }
 
         @Override
-        public long version() {
-            return version;
-        }
-
-        @Override
-        public void discard() {
+        void discard() {
             commit.close();
         }
     }
@@ -653,43 +817,135 @@
      * A {@code MapEntryValue} that is derived from updates submitted via a
      * transaction.
      */
-    private class TransactionalCommit implements MapEntryValue {
-        private final String key;
+    private static class TransactionalCommit extends MapEntryValue {
+        private final byte[] value;
+        private final CountDownCompleter<?> completer;
+
+        TransactionalCommit(long version, byte[] value, CountDownCompleter<?> completer) {
+            super(Type.VALUE, version);
+            this.value = value;
+            this.completer = completer;
+        }
+
+        @Override
+        byte[] value() {
+            return value;
+        }
+
+        @Override
+        void discard() {
+            completer.countDown();
+        }
+    }
+
+    /**
+     * A {@code MapEntryValue} that represents a deleted entry.
+     */
+    private static class TombstoneCommit extends MapEntryValue {
+        private final CountDownCompleter<?> completer;
+
+        public TombstoneCommit(long version, CountDownCompleter<?> completer) {
+            super(Type.TOMBSTONE, version);
+            this.completer = completer;
+        }
+
+        @Override
+        byte[] value() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        void discard() {
+            completer.countDown();
+        }
+    }
+
+    /**
+     * Map transaction scope.
+     */
+    private static final class TransactionScope {
         private final long version;
-        private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
+        private final Commit<? extends TransactionPrepare> prepareCommit;
+        private final Commit<? extends TransactionCommit> commitCommit;
 
-        public TransactionalCommit(
-                String key,
+        private TransactionScope(long version) {
+            this(version, null, null);
+        }
+
+        private TransactionScope(
                 long version,
-                CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
-            this.key = key;
+                Commit<? extends TransactionPrepare> prepareCommit) {
+            this(version, prepareCommit, null);
+        }
+
+        private TransactionScope(
+                long version,
+                Commit<? extends TransactionPrepare> prepareCommit,
+                Commit<? extends TransactionCommit> commitCommit) {
             this.version = version;
-            this.completer = commit;
+            this.prepareCommit = prepareCommit;
+            this.commitCommit = commitCommit;
         }
 
-        @Override
-        public byte[] value() {
-            TransactionLog<MapUpdate<String, byte[]>> transaction = completer.object().operation().transactionLog();
-            return valueForKey(key, transaction);
-        }
-
-        @Override
-        public long version() {
+        /**
+         * Returns the transaction version.
+         *
+         * @return the transaction version
+         */
+        long version() {
             return version;
         }
 
-        @Override
-        public void discard() {
-            completer.countDown();
+        /**
+         * Returns whether this is a prepared transaction scope.
+         *
+         * @return whether this is a prepared transaction scope
+         */
+        boolean isPrepared() {
+            return prepareCommit != null;
         }
 
-        private byte[] valueForKey(String key, TransactionLog<MapUpdate<String, byte[]>> transaction) {
-            MapUpdate<String, byte[]>  update = transaction.records()
-                                                           .stream()
-                                                           .filter(u -> u.key().equals(key))
-                                                           .findFirst()
-                                                           .orElse(null);
-            return update == null ? null : update.value();
+        /**
+         * Returns the transaction commit log.
+         *
+         * @return the transaction commit log
+         */
+        TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
+            checkState(isPrepared());
+            return prepareCommit.operation().transactionLog();
+        }
+
+        /**
+         * Returns a new transaction scope with a prepare commit.
+         *
+         * @param commit the prepare commit
+         * @return new transaction scope updated with the prepare commit
+         */
+        TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
+            return new TransactionScope(version, commit);
+        }
+
+        /**
+         * Returns a new transaction scope with a commit commit.
+         *
+         * @param commit the commit commit ;-)
+         * @return new transaction scope updated with the commit commit
+         */
+        TransactionScope committed(Commit<? extends TransactionCommit> commit) {
+            checkState(isPrepared());
+            return new TransactionScope(version, prepareCommit, commit);
+        }
+
+        /**
+         * Closes the transaction and all associated commits.
+         */
+        void close() {
+            if (prepareCommit != null) {
+                prepareCommit.close();
+            }
+            if (commitCommit != null) {
+                commitCommit.close();
+            }
         }
     }
 }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
index 533ea97..ef03935 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
@@ -74,7 +74,7 @@
 
         expect(asyncMap.begin(transactionId))
                 .andReturn(CompletableFuture.completedFuture(new Version(1)));
-        expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+        expect(asyncMap.prepare(new TransactionLog<>(transactionId, 1, updates)))
                 .andReturn(CompletableFuture.completedFuture(true));
         expect(asyncMap.commit(transactionId))
                 .andReturn(CompletableFuture.completedFuture(null));
@@ -117,7 +117,7 @@
 
         expect(asyncMap.begin(transactionId))
                 .andReturn(CompletableFuture.completedFuture(new Version(1)));
-        expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+        expect(asyncMap.prepare(new TransactionLog<>(transactionId, 1, updates)))
                 .andReturn(CompletableFuture.completedFuture(true));
         replay(asyncMap);
 
@@ -160,16 +160,16 @@
                 .andReturn(CompletableFuture.completedFuture(new Version(1)));
 
         expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.prepare(
-                new TransactionLog<>(transactionId, Arrays.asList(
+                new TransactionLog<>(transactionId, 1, Arrays.asList(
                         MapUpdate.<String, String>newBuilder()
                                 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
                                 .withKey("foo")
-                                .withCurrentVersion(1)
+                                .withVersion(1)
                                 .build(),
                         MapUpdate.<String, String>newBuilder()
                                 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
                                 .withKey("baz")
-                                .withCurrentVersion(2)
+                                .withVersion(2)
                                 .build()
                 )))).andReturn(CompletableFuture.completedFuture(true));
 
@@ -181,11 +181,12 @@
                 .andReturn(CompletableFuture.completedFuture(new Version(1)));
 
         expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.prepare(
-                new TransactionLog<>(transactionId, Arrays.asList(
+                new TransactionLog<>(transactionId, 1, Arrays.asList(
                         MapUpdate.<String, String>newBuilder()
-                                .withType(MapUpdate.Type.PUT_IF_ABSENT)
+                                .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
                                 .withKey("bar")
                                 .withValue("baz")
+                                .withVersion(1)
                                 .build()
                 )))).andReturn(CompletableFuture.completedFuture(true));
 
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index ac4c60d..284b57b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,7 @@
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Arrays;
@@ -41,7 +42,6 @@
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -92,6 +92,14 @@
     }
 
     /**
+     * Tests map transaction prepare.
+     */
+    @Test
+    public void testTransactionPrepare() throws Throwable {
+        transactionPrepareTests();
+    }
+
+    /**
      * Tests map transaction commit.
      */
     @Test
@@ -112,22 +120,12 @@
         final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testBasicMapOperationMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
 
         map.isEmpty().thenAccept(result -> {
             assertTrue(result);
         }).join();
 
-        map.getOrDefault("nothing", null).thenAccept(result -> {
-            assertEquals(0, result.version());
-            assertNull(result.value());
-        }).join();
-
-        map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
-            assertEquals(0, result.version());
-            assertArrayEquals("bar".getBytes(), result.value());
-        }).join();
-
         map.put("foo", rawFooValue).thenAccept(result -> {
             assertNull(result);
         }).join();
@@ -175,11 +173,6 @@
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
         }).join();
 
-        map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
-            assertNotEquals(0, result.version());
-            assertArrayEquals(rawFooValue, result.value());
-        }).join();
-
         map.remove("foo").thenAccept(result -> {
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
         }).join();
@@ -257,7 +250,7 @@
         final byte[] value3 = Tools.getBytesUtf8("value3");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testMapComputeOperationsMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
 
         map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
@@ -295,7 +288,7 @@
         final byte[] value3 = Tools.getBytesUtf8("value3");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testMapListenerMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
         TestMapEventListener listener = new TestMapEventListener();
 
         // add listener; insert new value into map and verify an INSERT event is received.
@@ -349,27 +342,104 @@
         map.removeListener(listener).join();
     }
 
+    protected void transactionPrepareTests() throws Throwable {
+        AtomixConsistentMap map = createAtomixClient().getResource("testPrepareTestsMap",
+                AtomixConsistentMap.class).join();
+
+        TransactionId transactionId1 = TransactionId.from("tx1");
+        TransactionId transactionId2 = TransactionId.from("tx2");
+        TransactionId transactionId3 = TransactionId.from("tx3");
+        TransactionId transactionId4 = TransactionId.from("tx4");
+
+        Version lock1 = map.begin(transactionId1).join();
+
+        MapUpdate<String, byte[]> update1 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("foo")
+                        .withVersion(lock1.value())
+                        .build();
+        MapUpdate<String, byte[]> update2 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("bar")
+                        .withVersion(lock1.value())
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId1, lock1.value(), Arrays.asList(update1, update2)))
+                .thenAccept(result -> {
+                    assertTrue(result);
+                }).join();
+
+        Version lock2 = map.begin(transactionId2).join();
+
+        MapUpdate<String, byte[]> update3 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("foo")
+                        .withVersion(lock2.value())
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId2, lock2.value(), Arrays.asList(update3)))
+                .thenAccept(result -> {
+                    assertFalse(result);
+                }).join();
+        map.rollback(transactionId2).join();
+
+        Version lock3 = map.begin(transactionId3).join();
+
+        MapUpdate<String, byte[]> update4 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("baz")
+                        .withVersion(0)
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId3, lock3.value(), Arrays.asList(update4)))
+                .thenAccept(result -> {
+                    assertFalse(result);
+                }).join();
+        map.rollback(transactionId3).join();
+
+        Version lock4 = map.begin(transactionId4).join();
+
+        MapUpdate<String, byte[]> update5 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("baz")
+                        .withVersion(lock4.value())
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId4, lock4.value(), Arrays.asList(update5)))
+                .thenAccept(result -> {
+                    assertTrue(result);
+                }).join();
+    }
+
     protected void transactionCommitTests() throws Throwable {
         final byte[] value1 = Tools.getBytesUtf8("value1");
         final byte[] value2 = Tools.getBytesUtf8("value2");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testCommitTestsMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
         TestMapEventListener listener = new TestMapEventListener();
 
         map.addListener(listener).join();
 
-        // PUT_IF_ABSENT
+        TransactionId transactionId = TransactionId.from("tx1");
+
+        // Begin the transaction.
+        Version lock = map.begin(transactionId).join();
+
+        // PUT_IF_VERSION_MATCH
         MapUpdate<String, byte[]> update1 =
-                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
-                .withKey("foo")
-                .withValue(value1)
-                .build();
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey("foo")
+                        .withValue(value1)
+                        .withVersion(lock.value())
+                        .build();
 
-        TransactionLog<MapUpdate<String, byte[]>> tx =
-                new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
-
-        map.prepare(tx).thenAccept(result -> {
+        map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
             assertEquals(true, result);
         }).join();
         // verify changes in Tx is not visible yet until commit
@@ -392,7 +462,7 @@
 
         assertFalse(listener.eventReceived());
 
-        map.commit(tx.transactionId()).join();
+        map.commit(transactionId).join();
         MapEvent<String, byte[]> event = listener.event();
         assertNotNull(event);
         assertEquals(MapEvent.Type.INSERT, event.type());
@@ -407,19 +477,21 @@
         assertEquals(MapEvent.Type.UPDATE, event.type());
         assertTrue(Arrays.equals(value2, event.newValue().value()));
 
-
         // REMOVE_IF_VERSION_MATCH
         byte[] currFoo = map.get("foo").get().value();
         long currFooVersion = map.get("foo").get().version();
         MapUpdate<String, byte[]> remove1 =
                 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
-                .withKey("foo")
-                .withCurrentVersion(currFooVersion)
-                .build();
+                        .withKey("foo")
+                        .withVersion(currFooVersion)
+                        .build();
 
-        tx = new TransactionLog<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+        transactionId = TransactionId.from("tx2");
 
-        map.prepare(tx).thenAccept(result -> {
+        // Begin the transaction.
+        map.begin(transactionId).join();
+
+        map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(remove1))).thenAccept(result -> {
             assertTrue("prepare should succeed", result);
         }).join();
         // verify changes in Tx is not visible yet until commit
@@ -433,7 +505,7 @@
             assertThat(result.value(), is(currFoo));
         }).join();
 
-        map.commit(tx.transactionId()).join();
+        map.commit(transactionId).join();
         event = listener.event();
         assertNotNull(event);
         assertEquals(MapEvent.Type.REMOVE, event.type());
@@ -450,24 +522,28 @@
         final byte[] value2 = Tools.getBytesUtf8("value2");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testTransactionRollbackTestsMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
         TestMapEventListener listener = new TestMapEventListener();
 
         map.addListener(listener).join();
 
+        TransactionId transactionId = TransactionId.from("tx1");
+
+        Version lock = map.begin(transactionId).join();
+
         MapUpdate<String, byte[]> update1 =
-                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
-                .withKey("foo")
-                .withValue(value1)
-                .build();
-        TransactionLog<MapUpdate<String, byte[]>> tx =
-                new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
-        map.prepare(tx).thenAccept(result -> {
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey("foo")
+                        .withValue(value1)
+                        .withVersion(lock.value())
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
             assertEquals(true, result);
         }).join();
         assertFalse(listener.eventReceived());
 
-        map.rollback(tx.transactionId()).join();
+        map.rollback(transactionId).join();
         assertFalse(listener.eventReceived());
 
         map.get("foo").thenAccept(result -> {