Performance improvements
 - Fast path of transactions updating a single key.
 - Bug fix in StoragePartitionClient where we were always creating a CachingMap

Change-Id: Ide117fba34fd12a9ff4aabd5fb7a21952bae672b
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index fe85e21..b8212b7 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -342,6 +342,14 @@
     CompletableFuture<Void> rollback(TransactionId transactionId);
 
     /**
+     * Prepares a transaction and commits it in one go.
+     * @param transaction transaction
+     * @return {@code true} if operation is successful and updates are committed
+     * {@code false} otherwise
+     */
+    CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction);
+
+    /**
      * Returns a new {@link ConsistentMap} that is backed by this instance.
      *
      * @return new {@code ConsistentMap} instance
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index be9ead4..cd61eb9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -76,7 +76,7 @@
     public CompletableFuture<CommitStatus> commit() {
         final MeteringAgent.Context timer = monitor.startTimer("commit");
         return transactionCoordinator.commit(transactionId, txParticipants)
-                .whenComplete((r, e) -> timer.stop(e));
+                                     .whenComplete((r, e) -> timer.stop(e));
     }
 
     @Override
@@ -89,7 +89,7 @@
             Serializer serializer) {
         // FIXME: Do not create duplicates.
         DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<K, V>(mapName,
-                creator.<K, V>newAsyncConsistentMap(mapName, serializer),
+                DistributedPrimitives.newMeteredMap(creator.<K, V>newAsyncConsistentMap(mapName, serializer)),
                 this,
                 serializer);
         txParticipants.add(txMap);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
index 1aaf297..2d80970 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
@@ -180,8 +180,13 @@
     }
 
     @Override
-    public boolean hasPendingUpdates() {
-        return updates().size() > 0;
+    public CompletableFuture<Boolean> prepareAndCommit() {
+        return backingMap.prepareAndCommit(new MapTransaction<>(txContext.transactionId(), updates()));
+    }
+
+    @Override
+    public int totalUpdates() {
+        return updates().size();
     }
 
     protected List<MapUpdate<K, V>> updates() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index f1095d9..8b6b9aa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -178,6 +178,11 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+        return delegateMap.prepareAndCommit(transaction);
+    }
+
+    @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                           .add("delegateMap", delegateMap)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
index c397f30..0a34189 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
@@ -25,13 +25,16 @@
 import java.util.function.Function;
 import java.util.function.Predicate;
 
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
+
 import org.onosproject.utils.MeteringAgent;
 
 /**
@@ -59,6 +62,10 @@
     private static final String ENTRY_SET = "entrySet";
     private static final String REPLACE = "replace";
     private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
+    private static final String PREPARE = "prepare";
+    private static final String COMMIT = "commit";
+    private static final String ROLLBACK = "rollback";
+    private static final String PREPARE_AND_COMMIT = "prepareAndCommit";
     private static final String ADD_LISTENER = "addListener";
     private static final String REMOVE_LISTENER = "removeListener";
     private static final String NOTIFY_LISTENER = "notifyListener";
@@ -240,6 +247,34 @@
         }
     }
 
+    @Override
+    public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+        final MeteringAgent.Context timer = monitor.startTimer(PREPARE);
+        return super.prepare(transaction)
+                    .whenComplete((r, e) -> timer.stop(e));
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        final MeteringAgent.Context timer = monitor.startTimer(COMMIT);
+        return super.commit(transactionId)
+                    .whenComplete((r, e) -> timer.stop(e));
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        final MeteringAgent.Context timer = monitor.startTimer(ROLLBACK);
+        return super.rollback(transactionId)
+                    .whenComplete((r, e) -> timer.stop(e));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+        final MeteringAgent.Context timer = monitor.startTimer(PREPARE_AND_COMMIT);
+        return super.prepareAndCommit(transaction)
+                    .whenComplete((r, e) -> timer.stop(e));
+    }
+
     private class InternalMeteredMapEventListener implements MapEventListener<K, V> {
 
         private final MapEventListener<K, V> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 6715ba7..0bc90e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -236,6 +236,24 @@
                 .toArray(CompletableFuture[]::new));
     }
 
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+        Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
+        transaction.updates().forEach(update -> {
+            AsyncConsistentMap<K, V> map = getMap(update.key());
+            updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
+        });
+        Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
+                Maps.transformValues(updatesGroupedByMap,
+                                     list -> new MapTransaction<>(transaction.transactionId(), list));
+
+        return Tools.allOf(transactionsByMap.entrySet()
+                                            .stream()
+                                            .map(e -> e.getKey().prepareAndCommit(e.getValue()))
+                                            .collect(Collectors.toList()))
+                    .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+    }
+
     /**
      * Returns the map (partition) to which the specified key maps.
      * @param key key
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index c70f4e4..8fd0a06 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -108,7 +108,7 @@
                         value -> value == null ? null : serializer.encode(value),
                         bytes -> serializer.decode(bytes));
 
-        return DistributedPrimitives.newCachingMap(transcodedMap);
+        return transcodedMap;
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 1904894..3670aac 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -44,20 +44,32 @@
      */
     CompletableFuture<CommitStatus> commit(TransactionId transactionId,
                                            Set<TransactionParticipant> transactionParticipants) {
-        if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) {
-            return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
-        }
+        int totalUpdates = transactionParticipants.stream()
+                                                  .map(TransactionParticipant::totalUpdates)
+                                                  .reduce(Math::addExact)
+                                                  .orElse(0);
 
-        CompletableFuture<CommitStatus> status =  transactions.put(transactionId, Transaction.State.PREPARING)
+        if (totalUpdates == 0) {
+            return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
+        } else if (totalUpdates == 1) {
+            return transactionParticipants.stream()
+                                          .filter(p -> p.totalUpdates() == 1)
+                                          .findFirst()
+                                          .get()
+                                          .prepareAndCommit()
+                                          .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
+        } else {
+            CompletableFuture<CommitStatus> status =  transactions.put(transactionId, Transaction.State.PREPARING)
                     .thenCompose(v -> this.doPrepare(transactionParticipants))
                     .thenCompose(result -> result
-                           ? transactions.put(transactionId, Transaction.State.COMMITTING)
-                                         .thenCompose(v -> doCommit(transactionParticipants))
-                                         .thenApply(v -> CommitStatus.SUCCESS)
-                           : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
-                                         .thenCompose(v -> doRollback(transactionParticipants))
-                                         .thenApply(v -> CommitStatus.FAILURE));
-        return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
+                            ? transactions.put(transactionId, Transaction.State.COMMITTING)
+                                          .thenCompose(v -> doCommit(transactionParticipants))
+                                          .thenApply(v -> CommitStatus.SUCCESS)
+                            : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
+                                          .thenCompose(v -> doRollback(transactionParticipants))
+                                          .thenApply(v -> CommitStatus.FAILURE));
+            return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
+        }
     }
 
     private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
index 66e1f3a..c8308af 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
@@ -26,7 +26,21 @@
      * Returns if this participant has updates that need to be committed.
      * @return {@code true} if yes; {@code false} otherwise
      */
-    boolean hasPendingUpdates();
+    default boolean hasPendingUpdates() {
+        return totalUpdates() > 0;
+    }
+
+    /**
+     * Returns the number of updates that need to committed for this participant.
+     * @return update count.
+     */
+    int totalUpdates();
+
+    /**
+     * Executes the prepare and commit steps in a single go.
+     * @return {@code true} is successful i.e updates are committed; {@code false} otherwise
+     */
+    CompletableFuture<Boolean> prepareAndCommit();
 
     /**
      * Executes the prepare phase.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index a2989ac..b679925 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -213,6 +213,11 @@
         return backingMap.rollback(transactionId);
     }
 
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K1, V1> transaction) {
+        return backingMap.prepareAndCommit(transaction.map(keyEncoder, valueEncoder));
+    }
+
     private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
 
         private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index bb320e4..0847153 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import static org.slf4j.LoggerFactory.getLogger;
 import io.atomix.copycat.client.CopycatClient;
 import io.atomix.resource.AbstractResource;
 import io.atomix.resource.ResourceTypeInfo;
@@ -44,6 +43,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
@@ -53,7 +53,6 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
 
 import com.google.common.collect.Sets;
 
@@ -64,7 +63,6 @@
 public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
     implements AsyncConsistentMap<String, byte[]> {
 
-    private final Logger log = getLogger(getClass());
     private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
 
     public static final String CHANGE_SUBJECT = "changeEvents";
@@ -288,4 +286,9 @@
         return submit(new TransactionRollback(transactionId))
                 .thenApply(v -> null);
     }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
+        return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+    }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index dbc3157..73af19f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -245,6 +245,19 @@
     }
 
     /**
+     * Map prepareAndCommit command.
+     */
+    @SuppressWarnings("serial")
+    public static class TransactionPrepareAndCommit extends TransactionPrepare {
+        public TransactionPrepareAndCommit() {
+        }
+
+        public TransactionPrepareAndCommit(MapTransaction<String, byte[]> mapTransaction) {
+            super(mapTransaction);
+        }
+    }
+
+    /**
      * Map transaction commit command.
      */
     @SuppressWarnings("serial")
@@ -489,12 +502,6 @@
         @Override
         public void readObject(BufferInput<?> buffer, Serializer serializer) {
         }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(getClass())
-                    .toString();
-        }
     }
 
     /**
@@ -509,12 +516,6 @@
         @Override
         public void readObject(BufferInput<?> buffer, Serializer serializer) {
         }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(getClass())
-                    .toString();
-        }
     }
 
     /**
@@ -537,7 +538,8 @@
             registry.register(TransactionPrepare.class, -772);
             registry.register(TransactionCommit.class, -773);
             registry.register(TransactionRollback.class, -774);
-            registry.register(UpdateAndGet.class, -775);
+            registry.register(TransactionPrepareAndCommit.class, -775);
+            registry.register(UpdateAndGet.class, -776);
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index aad7382..983e27e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -52,6 +52,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
@@ -111,6 +112,7 @@
         executor.register(TransactionPrepare.class, this::prepare);
         executor.register(TransactionCommit.class, this::commit);
         executor.register(TransactionRollback.class, this::rollback);
+        executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
     }
 
     @Override
@@ -352,6 +354,20 @@
     }
 
     /**
+     * Handles an prepare and commit commit.
+     *
+     * @param commit transaction prepare and commit commit
+     * @return prepare result
+     */
+    protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
+        PrepareResult prepareResult = prepare(commit);
+        if (prepareResult == PrepareResult.OK) {
+            commitInternal(commit.operation().transaction().transactionId());
+        }
+        return prepareResult;
+    }
+
+    /**
      * Handles an prepare commit.
      *
      * @param commit transaction prepare commit
@@ -399,44 +415,48 @@
     protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
         TransactionId transactionId = commit.operation().transactionId();
         try {
-            Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
-                    .remove(transactionId);
-            if (prepareCommit == null) {
-                return CommitResult.UNKNOWN_TRANSACTION_ID;
-            }
-            MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
-            long totalReferencesToCommit = transaction
-                    .updates()
-                    .stream()
-                    .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
-                    .count();
-            CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
-                    new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
-            List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
-            for (MapUpdate<String, byte[]> update : transaction.updates()) {
-                String key = update.key();
-                MapEntryValue previousValue = mapEntries.remove(key);
-                MapEntryValue newValue = null;
-                checkState(preparedKeys.remove(key), "key is not prepared");
-                if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
-                    newValue = new TransactionalCommit(key,
-                            versionCounter.incrementAndGet(), completer);
-                }
-                eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
-                if (newValue != null) {
-                    mapEntries.put(key, newValue);
-                }
-                if (previousValue != null) {
-                    previousValue.discard();
-                }
-            }
-            publish(eventsToPublish);
-            return CommitResult.OK;
+            return commitInternal(transactionId);
         } finally {
             commit.close();
         }
     }
 
+    private CommitResult commitInternal(TransactionId transactionId) {
+        Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
+                .remove(transactionId);
+        if (prepareCommit == null) {
+            return CommitResult.UNKNOWN_TRANSACTION_ID;
+        }
+        MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
+        long totalReferencesToCommit = transaction
+                .updates()
+                .stream()
+                .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                .count();
+        CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
+                new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+        List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
+        for (MapUpdate<String, byte[]> update : transaction.updates()) {
+            String key = update.key();
+            MapEntryValue previousValue = mapEntries.remove(key);
+            MapEntryValue newValue = null;
+            checkState(preparedKeys.remove(key), "key is not prepared");
+            if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+                newValue = new TransactionalCommit(key,
+                        versionCounter.incrementAndGet(), completer);
+            }
+            eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+            if (newValue != null) {
+                mapEntries.put(key, newValue);
+            }
+            if (previousValue != null) {
+                previousValue.discard();
+            }
+        }
+        publish(eventsToPublish);
+        return CommitResult.OK;
+    }
+
     /**
      * Handles an rollback commit (ha!).
      *