Performance improvements
 - Fast path of transactions updating a single key.
 - Bug fix in StoragePartitionClient where we were always creating a CachingMap

Change-Id: Ide117fba34fd12a9ff4aabd5fb7a21952bae672b
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index aad7382..983e27e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -52,6 +52,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
@@ -111,6 +112,7 @@
         executor.register(TransactionPrepare.class, this::prepare);
         executor.register(TransactionCommit.class, this::commit);
         executor.register(TransactionRollback.class, this::rollback);
+        executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
     }
 
     @Override
@@ -352,6 +354,20 @@
     }
 
     /**
+     * Handles an prepare and commit commit.
+     *
+     * @param commit transaction prepare and commit commit
+     * @return prepare result
+     */
+    protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
+        PrepareResult prepareResult = prepare(commit);
+        if (prepareResult == PrepareResult.OK) {
+            commitInternal(commit.operation().transaction().transactionId());
+        }
+        return prepareResult;
+    }
+
+    /**
      * Handles an prepare commit.
      *
      * @param commit transaction prepare commit
@@ -399,44 +415,48 @@
     protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
         TransactionId transactionId = commit.operation().transactionId();
         try {
-            Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
-                    .remove(transactionId);
-            if (prepareCommit == null) {
-                return CommitResult.UNKNOWN_TRANSACTION_ID;
-            }
-            MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
-            long totalReferencesToCommit = transaction
-                    .updates()
-                    .stream()
-                    .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
-                    .count();
-            CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
-                    new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
-            List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
-            for (MapUpdate<String, byte[]> update : transaction.updates()) {
-                String key = update.key();
-                MapEntryValue previousValue = mapEntries.remove(key);
-                MapEntryValue newValue = null;
-                checkState(preparedKeys.remove(key), "key is not prepared");
-                if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
-                    newValue = new TransactionalCommit(key,
-                            versionCounter.incrementAndGet(), completer);
-                }
-                eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
-                if (newValue != null) {
-                    mapEntries.put(key, newValue);
-                }
-                if (previousValue != null) {
-                    previousValue.discard();
-                }
-            }
-            publish(eventsToPublish);
-            return CommitResult.OK;
+            return commitInternal(transactionId);
         } finally {
             commit.close();
         }
     }
 
+    private CommitResult commitInternal(TransactionId transactionId) {
+        Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
+                .remove(transactionId);
+        if (prepareCommit == null) {
+            return CommitResult.UNKNOWN_TRANSACTION_ID;
+        }
+        MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
+        long totalReferencesToCommit = transaction
+                .updates()
+                .stream()
+                .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                .count();
+        CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
+                new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+        List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
+        for (MapUpdate<String, byte[]> update : transaction.updates()) {
+            String key = update.key();
+            MapEntryValue previousValue = mapEntries.remove(key);
+            MapEntryValue newValue = null;
+            checkState(preparedKeys.remove(key), "key is not prepared");
+            if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+                newValue = new TransactionalCommit(key,
+                        versionCounter.incrementAndGet(), completer);
+            }
+            eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+            if (newValue != null) {
+                mapEntries.put(key, newValue);
+            }
+            if (previousValue != null) {
+                previousValue.discard();
+            }
+        }
+        publish(eventsToPublish);
+        return CommitResult.OK;
+    }
+
     /**
      * Handles an rollback commit (ha!).
      *