AsyncConsistentMap methods for supporting transactional updates

Change-Id: Iaeb0aa0abf9f52d514a2c040598599a5b8a55ee8
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
similarity index 85%
rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
rename to core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
index dc80c56..c8151e3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
@@ -14,12 +14,13 @@
  * limitations under the License.
  */
 
-package org.onosproject.store.primitives.resources.impl;
+package org.onosproject.store.primitives;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.function.Function;
+
 import com.google.common.base.MoreObjects;
 
 /**
@@ -126,6 +127,26 @@
         return currentVersion;
     }
 
+    /**
+     * Transforms this instance into an instance of different paramterized types.
+     *
+     * @param keyMapper transcoder for key type
+     * @param valueMapper transcoder to value type
+     * @return new instance
+     * @param <S> key type of returned instance
+     * @param <T> value type of returned instance
+     */
+    public <S, T> MapUpdate<S, T> map(Function<K, S> keyMapper, Function<V, T> valueMapper) {
+        return MapUpdate.<S, T>newBuilder()
+                .withMapName(mapName)
+                .withType(type)
+                .withKey(keyMapper.apply(key))
+                .withValue(value == null ? null : valueMapper.apply(value))
+                .withCurrentValue(currentValue == null ? null : valueMapper.apply(currentValue))
+                .withCurrentVersion(currentVersion)
+                .build();
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
@@ -180,17 +201,16 @@
         }
 
         public Builder<K, V> withCurrentValue(V value) {
-            update.currentValue = checkNotNull(value, "currentValue cannot be null");
+            update.currentValue = value;
             return this;
         }
 
         public Builder<K, V> withValue(V value) {
-            update.value = checkNotNull(value, "value cannot be null");
+            update.value = value;
             return this;
         }
 
         public Builder<K, V> withCurrentVersion(long version) {
-            checkArgument(version >= 0, "version cannot be negative");
             update.currentVersion = version;
             return this;
         }
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 8e2bae0..a914d0c 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -26,6 +26,7 @@
 import java.util.function.Predicate;
 
 import org.onosproject.store.primitives.DefaultConsistentMap;
+import org.onosproject.store.primitives.TransactionId;
 
 /**
  * A distributed, strongly consistent map whose methods are all executed asynchronously.
@@ -319,6 +320,37 @@
     CompletableFuture<Void> removeListener(MapEventListener<K, V> listener);
 
     /**
+     * Prepares a transaction for commitment.
+     * @param transaction transaction
+     * @return {@code true} if prepare is successful and transaction is ready to be committed;
+     * {@code false} otherwise
+     */
+    CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction);
+
+    /**
+     * Commits a previously prepared transaction.
+     * @param transactionId transaction identifier
+     * @return future that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> commit(TransactionId transactionId);
+
+    /**
+     * Aborts a previously prepared transaction.
+     * @param transactionId transaction identifier
+     * @return future that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> rollback(TransactionId transactionId);
+
+    /**
+     * Returns a new {@link ConsistentMap} that is backed by this instance.
+     *
+     * @return new {@code ConsistentMap} instance
+     */
+    default ConsistentMap<K, V> asConsistentMap() {
+        return asConsistentMap(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS);
+    }
+
+    /**
      * Returns a new {@link ConsistentMap} that is backed by this instance.
      *
      * @param timeoutMillis timeout duration for the returned ConsistentMap operations
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapTransaction.java b/core/api/src/main/java/org/onosproject/store/service/MapTransaction.java
new file mode 100644
index 0000000..3c4f743
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapTransaction.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.List;
+import java.util.function.Function;
+
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Collection of map updates to be committed atomically.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MapTransaction<K, V> {
+
+    private final TransactionId transactionId;
+    private final List<MapUpdate<K, V>> updates;
+
+    public MapTransaction(TransactionId transactionId, List<MapUpdate<K, V>> updates) {
+        this.transactionId = transactionId;
+        this.updates = ImmutableList.copyOf(updates);
+    }
+
+    /**
+     * Returns the transaction identifier.
+     *
+     * @return transaction id
+     */
+    public TransactionId transactionId() {
+        return transactionId;
+    }
+
+    /**
+     * Returns the list of map updates.
+     *
+     * @return map updates
+     */
+    public List<MapUpdate<K, V>> updates() {
+        return updates;
+    }
+
+    /**
+     * Maps this instance to another {@code MapTransaction} with different key and value types.
+     *
+     * @param keyMapper function for mapping key types
+     * @param valueMapper function for mapping value types
+     * @return newly typed instance
+     *
+     * @param <S> key type of returned instance
+     * @param <T> value type of returned instance
+     */
+    public <S, T> MapTransaction<S, T> map(Function<K, S> keyMapper, Function<V, T> valueMapper) {
+        return new MapTransaction<>(transactionId, Lists.transform(updates, u -> u.map(keyMapper, valueMapper)));
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/MapUpdateTest.java b/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
similarity index 98%
rename from core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/MapUpdateTest.java
rename to core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
index 6a0db93..b8ce039 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/MapUpdateTest.java
+++ b/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
@@ -13,9 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.store.primitives.resources.impl;
+package org.onosproject.store.primitives;
 
 import com.google.common.testing.EqualsTester;
+
 import org.junit.Test;
 
 import static org.hamcrest.MatcherAssert.assertThat;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index cb78e27..92f6870 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -32,17 +32,18 @@
 import org.onlab.util.Match;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
 import org.onosproject.store.primitives.resources.impl.CommitResult;
 import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
 import org.onosproject.store.primitives.resources.impl.PrepareResult;
 import org.onosproject.store.primitives.resources.impl.RollbackResult;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.Throwables;
@@ -65,7 +66,7 @@
                                 MapEntryUpdateResult.Status.class,
                                 MapUpdate.class,
                                 MapUpdate.Type.class,
-                                Transaction.class,
+                                MapTransaction.class,
                                 Transaction.State.class,
                                 TransactionId.class,
                                 PrepareResult.class,
@@ -99,7 +100,7 @@
         serializer.register(Match.class, factory);
         serializer.register(MapEntryUpdateResult.class, factory);
         serializer.register(MapEntryUpdateResult.Status.class, factory);
-        serializer.register(Transaction.class, factory);
+        serializer.register(MapTransaction.class, factory);
         serializer.register(Transaction.State.class, factory);
         serializer.register(PrepareResult.class, factory);
         serializer.register(CommitResult.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
index d5cce9b2..b6d5cd2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
@@ -65,8 +65,8 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AtomicCounterBuilder;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
index b91f4c9..ec9b926 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
@@ -21,8 +21,8 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Match;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.service.Versioned;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
index 958734c..97cd23b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
@@ -40,11 +40,13 @@
 import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
 import org.onosproject.utils.MeteringAgent;
@@ -491,6 +493,21 @@
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+        return Tools.exceptionalFuture(new UnsupportedOperationException());
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return Tools.exceptionalFuture(new UnsupportedOperationException());
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return Tools.exceptionalFuture(new UnsupportedOperationException());
+    }
+
     protected void notifyListeners(MapEvent<K, V> event) {
         if (event == null) {
             return;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
index d6d4ab4..027341e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
@@ -26,8 +26,8 @@
 import net.kuujo.copycat.state.StateContext;
 
 import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Arrays;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index d165bc1..b71902b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -24,9 +24,9 @@
 
 import static com.google.common.base.Preconditions.*;
 
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.CommitResult;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
@@ -84,7 +84,10 @@
         checkNotNull(serializer);
         return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
                                 name,
-                                mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
+                                mapBuilderSupplier.get()
+                                                  .withName(name)
+                                                  .withSerializer(serializer)
+                                                  .buildAsyncMap(),
                                 this,
                                 serializer));
     }
@@ -113,7 +116,7 @@
     public void abort() {
         if (isOpen) {
             try {
-                txMaps.values().forEach(m -> m.rollback());
+                txMaps.values().forEach(m -> m.abort());
             } finally {
                 isOpen = false;
             }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
index 32b3057..1aaf297 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
@@ -19,10 +19,13 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.onlab.util.HexString;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
 import org.onosproject.store.service.TransactionalMap;
@@ -46,11 +49,12 @@
  * @param <K> key type
  * @param <V> value type.
  */
-public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
+public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
 
     private final TransactionContext txContext;
     private static final String TX_CLOSED_ERROR = "Transaction is closed";
-    private final ConsistentMap<K, V> backingMap;
+    private final AsyncConsistentMap<K, V> backingMap;
+    private final ConsistentMap<K, V> backingConsitentMap;
     private final String name;
     private final Serializer serializer;
     private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
@@ -76,11 +80,12 @@
 
     public DefaultTransactionalMap(
             String name,
-            ConsistentMap<K, V> backingMap,
+            AsyncConsistentMap<K, V> backingMap,
             TransactionContext txContext,
             Serializer serializer) {
         this.name = name;
         this.backingMap = backingMap;
+        this.backingConsitentMap = backingMap.asConsistentMap();
         this.txContext = txContext;
         this.serializer = serializer;
     }
@@ -96,7 +101,7 @@
         if (latest != null) {
             return latest;
         } else {
-            Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k));
+            Versioned<V> v = readCache.computeIfAbsent(key, k -> backingConsitentMap.get(k));
             return v != null ? v.value() : null;
         }
     }
@@ -159,6 +164,62 @@
         return latest;
     }
 
+    @Override
+    public CompletableFuture<Boolean> prepare() {
+        return backingMap.prepare(new MapTransaction<>(txContext.transactionId(), updates()));
+    }
+
+    @Override
+    public CompletableFuture<Void> commit() {
+        return backingMap.commit(txContext.transactionId());
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback() {
+        return backingMap.rollback(txContext.transactionId());
+    }
+
+    @Override
+    public boolean hasPendingUpdates() {
+        return updates().size() > 0;
+    }
+
+    protected List<MapUpdate<K, V>> updates() {
+        List<MapUpdate<K, V>> updates = Lists.newLinkedList();
+        deleteSet.forEach(key -> {
+            Versioned<V> original = readCache.get(key);
+            if (original != null) {
+                updates.add(MapUpdate.<K, V>newBuilder()
+                        .withMapName(name)
+                        .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                        .withKey(key)
+                        .withCurrentVersion(original.version())
+                        .build());
+            }
+        });
+        writeCache.forEach((key, value) -> {
+            Versioned<V> original = readCache.get(key);
+            if (original == null) {
+                updates.add(MapUpdate.<K, V>newBuilder()
+                        .withMapName(name)
+                        .withType(MapUpdate.Type.PUT_IF_ABSENT)
+                        .withKey(key)
+                        .withValue(value)
+                        .build());
+            } else {
+                updates.add(MapUpdate.<K, V>newBuilder()
+                        .withMapName(name)
+                        .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey(key)
+                        .withCurrentVersion(original.version())
+                        .withValue(value)
+                        .build());
+            }
+        });
+        return updates;
+    }
+
+
     protected List<MapUpdate<String, byte[]>> toMapUpdates() {
         List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
         deleteSet.forEach(key -> {
@@ -194,19 +255,18 @@
         return updates;
     }
 
-    // TODO: build expected result Map processing DB updates?
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
                 .add("backingMap", backingMap)
-                .add("updates", toMapUpdates())
+                .add("updates", updates())
                 .toString();
     }
 
     /**
      * Discards all changes made to this transactional map.
      */
-    protected void rollback() {
+    protected void abort() {
         readCache.clear();
         writeCache.clear();
         deleteSet.clear();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 76d7488..f1095d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -27,8 +27,10 @@
 import java.util.function.Predicate;
 
 import org.onosproject.core.ApplicationId;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.MoreObjects;
@@ -161,6 +163,21 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+        return delegateMap.prepare(transaction);
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return delegateMap.commit(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return delegateMap.rollback(transactionId);
+    }
+
+    @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                           .add("delegateMap", delegateMap)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 3090436..6715ba7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -28,10 +28,15 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
+import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Lists;
@@ -198,6 +203,39 @@
                                                 .toArray(CompletableFuture[]::new));
     }
 
+    @Override
+    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+
+        Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
+        transaction.updates().forEach(update -> {
+            AsyncConsistentMap<K, V> map = getMap(update.key());
+            updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
+        });
+        Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
+                Maps.transformValues(updatesGroupedByMap,
+                                     list -> new MapTransaction<>(transaction.transactionId(), list));
+
+        return Tools.allOf(transactionsByMap.entrySet()
+                         .stream()
+                         .map(e -> e.getKey().prepare(e.getValue()))
+                         .collect(Collectors.toList()))
+                    .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return CompletableFuture.allOf(getMaps().stream()
+                                                .map(e -> e.commit(transactionId))
+                                                .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return CompletableFuture.allOf(getMaps().stream()
+                .map(e -> e.rollback(transactionId))
+                .toArray(CompletableFuture[]::new));
+    }
+
     /**
      * Returns the map (partition) to which the specified key maps.
      * @param key key
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
index 24674f8..31e22a8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
@@ -26,8 +26,8 @@
 import net.kuujo.copycat.resource.ResourceState;
 
 import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.resources.impl.CommitResult;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index ac5238a..4c8b240 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -17,8 +17,8 @@
 
 import java.util.List;
 
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
index 58957a2..66e1f3a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
@@ -17,34 +17,32 @@
 
 import java.util.concurrent.CompletableFuture;
 
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.CommitResult;
-import org.onosproject.store.primitives.resources.impl.PrepareResult;
-import org.onosproject.store.primitives.resources.impl.RollbackResult;
-
 /**
  * Participant in a two-phase commit protocol.
  */
 public interface TransactionParticipant {
 
     /**
-     * Attempts to execute the prepare phase for the specified {@link Transaction transaction}.
-     * @param transaction transaction
-     * @return future for prepare result
+     * Returns if this participant has updates that need to be committed.
+     * @return {@code true} if yes; {@code false} otherwise
      */
-    CompletableFuture<PrepareResult> prepare(Transaction transaction);
+    boolean hasPendingUpdates();
+
+    /**
+     * Executes the prepare phase.
+     * @return {@code true} is successful; {@code false} otherwise
+     */
+    CompletableFuture<Boolean> prepare();
 
     /**
      * Attempts to execute the commit phase for previously prepared transaction.
-     * @param transactionId transaction identifier
-     * @return future for commit result
+     * @return future that is completed when the operation completes
      */
-    CompletableFuture<CommitResult> commit(TransactionId transactionId);
+    CompletableFuture<Void> commit();
 
     /**
      * Attempts to execute the rollback phase for previously prepared transaction.
-     * @param transactionId transaction identifier
-     * @return future for rollback result
+     * @return future that is completed when the operation completes
      */
-    CompletableFuture<RollbackResult> rollback(TransactionId transactionId);
+    CompletableFuture<Void> rollback();
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 438d9b3..a2989ac 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -26,9 +26,11 @@
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Maps;
@@ -196,6 +198,21 @@
         }
     }
 
+    @Override
+    public CompletableFuture<Boolean> prepare(MapTransaction<K1, V1> transaction) {
+        return backingMap.prepare(transaction.map(keyEncoder, valueEncoder));
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return backingMap.commit(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return backingMap.rollback(transactionId);
+    }
+
     private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
 
         private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 5342d74..bd9690c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -32,11 +32,10 @@
 
 import org.onlab.util.Match;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.impl.Transaction;
-import org.onosproject.store.primitives.impl.TransactionParticipant;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Sets;
@@ -46,7 +45,7 @@
  */
 @ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
 public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
-    implements AsyncConsistentMap<String, byte[]>, TransactionParticipant {
+    implements AsyncConsistentMap<String, byte[]> {
 
     private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
 
@@ -266,18 +265,21 @@
     }
 
     @Override
-    public CompletableFuture<PrepareResult> prepare(Transaction transaction) {
-        return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction));
+    public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
+        return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction))
+                .thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
-    public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
-        return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId))
+                .thenApply(v -> null);
     }
 
     @Override
-    public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
-        return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId))
+                .thenApply(v -> null);
     }
 
     /**
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index a5dd232..458e5fb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -29,7 +29,7 @@
 
 import org.onlab.util.Match;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.impl.Transaction;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.MoreObjects;
@@ -209,35 +209,35 @@
      */
     @SuppressWarnings("serial")
     public static class TransactionPrepare extends MapCommand<PrepareResult> {
-        private Transaction transaction;
+        private MapTransaction<String, byte[]> mapTransaction;
 
         public TransactionPrepare() {
         }
 
-        public TransactionPrepare(Transaction transaction) {
-            this.transaction = transaction;
+        public TransactionPrepare(MapTransaction<String, byte[]> mapTransaction) {
+            this.mapTransaction = mapTransaction;
         }
 
-        public Transaction transaction() {
-            return transaction;
+        public MapTransaction<String, byte[]> transaction() {
+            return mapTransaction;
         }
 
         @Override
         public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
             super.writeObject(buffer, serializer);
-            serializer.writeObject(transaction, buffer);
+            serializer.writeObject(mapTransaction, buffer);
         }
 
         @Override
         public void readObject(BufferInput<?> buffer, Serializer serializer) {
             super.readObject(buffer, serializer);
-            transaction = serializer.readObject(buffer);
+            mapTransaction = serializer.readObject(buffer);
         }
 
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(getClass())
-                    .add("transaction", transaction)
+                    .add("mapTransaction", mapTransaction)
                     .toString();
         }
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index de22a75..e580fed 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -37,10 +37,11 @@
 
 import org.onlab.util.CountDownCompleter;
 import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.impl.Transaction;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
 import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Maps;
@@ -384,7 +385,7 @@
             Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
         boolean ok = false;
         try {
-            Transaction transaction = commit.operation().transaction();
+            MapTransaction<String, byte[]> transaction = commit.operation().transaction();
             for (MapUpdate<String, byte[]> update : transaction.updates()) {
                 String key = update.key();
                 if (preparedKeys.contains(key)) {
@@ -404,7 +405,7 @@
             // No violations detected. Add to pendingTranctions and mark
             // modified keys as
             // currently locked to updates.
-            pendingTransactions.put(transaction.id(), commit);
+            pendingTransactions.put(transaction.transactionId(), commit);
             transaction.updates().forEach(u -> preparedKeys.add(u.key()));
             ok = true;
             return PrepareResult.OK;
@@ -430,7 +431,7 @@
             if (prepareCommit == null) {
                 return CommitResult.UNKNOWN_TRANSACTION_ID;
             }
-            Transaction transaction = prepareCommit.operation().transaction();
+            MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
             long totalReferencesToCommit = transaction
                     .updates()
                     .stream()
@@ -610,7 +611,7 @@
 
         @Override
         public byte[] value() {
-            Transaction transaction = completer.object().operation().transaction();
+            MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
             return valueForKey(key, transaction);
         }
 
@@ -624,7 +625,7 @@
             completer.countDown();
         }
 
-        private byte[] valueForKey(String key, Transaction transaction) {
+        private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
             MapUpdate<String, byte[]>  update = transaction.updates()
                                                            .stream()
                                                            .filter(u -> u.key().equals(key))
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 21d8edc..26aa9a8 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,10 +27,11 @@
 import org.junit.Ignore;
 import org.junit.Test;
 import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.impl.Transaction;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Sets;
@@ -353,10 +354,10 @@
                 .withValue(value1)
                 .build();
 
-        Transaction tx = new Transaction(TransactionId.from("tx1"), Arrays.asList(update1));
+        MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
 
         map.prepare(tx).thenAccept(result -> {
-            assertEquals(PrepareResult.OK, result);
+            assertEquals(true, result);
         }).join();
         assertNull(listener.event());
 
@@ -377,7 +378,7 @@
 
         assertNull(listener.event());
 
-        map.commit(tx.id()).join();
+        map.commit(tx.transactionId()).join();
         assertNotNull(listener.event());
         assertEquals(MapEvent.Type.INSERT, listener.event().type());
         assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
@@ -407,13 +408,13 @@
                 .withKey("foo")
                 .withValue(value1)
                 .build();
-        Transaction tx = new Transaction(TransactionId.from("tx1"), Arrays.asList(update1));
+        MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
         map.prepare(tx).thenAccept(result -> {
-            assertEquals(PrepareResult.OK, result);
+            assertEquals(true, result);
         }).join();
         assertNull(listener.event());
 
-        map.rollback(tx.id()).join();
+        map.rollback(tx.transactionId()).join();
         assertNull(listener.event());
 
         map.get("foo").thenAccept(result -> {