Performance improvements
- Fast path of transactions updating a single key.
- Bug fix in StoragePartitionClient where we were always creating a CachingMap
Change-Id: Ide117fba34fd12a9ff4aabd5fb7a21952bae672b
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index aad7382..983e27e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -52,6 +52,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
@@ -111,6 +112,7 @@
executor.register(TransactionPrepare.class, this::prepare);
executor.register(TransactionCommit.class, this::commit);
executor.register(TransactionRollback.class, this::rollback);
+ executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
}
@Override
@@ -352,6 +354,20 @@
}
/**
+ * Handles an prepare and commit commit.
+ *
+ * @param commit transaction prepare and commit commit
+ * @return prepare result
+ */
+ protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
+ PrepareResult prepareResult = prepare(commit);
+ if (prepareResult == PrepareResult.OK) {
+ commitInternal(commit.operation().transaction().transactionId());
+ }
+ return prepareResult;
+ }
+
+ /**
* Handles an prepare commit.
*
* @param commit transaction prepare commit
@@ -399,44 +415,48 @@
protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
- Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
- .remove(transactionId);
- if (prepareCommit == null) {
- return CommitResult.UNKNOWN_TRANSACTION_ID;
- }
- MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
- long totalReferencesToCommit = transaction
- .updates()
- .stream()
- .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
- .count();
- CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
- new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
- List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
- for (MapUpdate<String, byte[]> update : transaction.updates()) {
- String key = update.key();
- MapEntryValue previousValue = mapEntries.remove(key);
- MapEntryValue newValue = null;
- checkState(preparedKeys.remove(key), "key is not prepared");
- if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
- newValue = new TransactionalCommit(key,
- versionCounter.incrementAndGet(), completer);
- }
- eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
- if (newValue != null) {
- mapEntries.put(key, newValue);
- }
- if (previousValue != null) {
- previousValue.discard();
- }
- }
- publish(eventsToPublish);
- return CommitResult.OK;
+ return commitInternal(transactionId);
} finally {
commit.close();
}
}
+ private CommitResult commitInternal(TransactionId transactionId) {
+ Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
+ .remove(transactionId);
+ if (prepareCommit == null) {
+ return CommitResult.UNKNOWN_TRANSACTION_ID;
+ }
+ MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
+ long totalReferencesToCommit = transaction
+ .updates()
+ .stream()
+ .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .count();
+ CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
+ new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+ List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
+ for (MapUpdate<String, byte[]> update : transaction.updates()) {
+ String key = update.key();
+ MapEntryValue previousValue = mapEntries.remove(key);
+ MapEntryValue newValue = null;
+ checkState(preparedKeys.remove(key), "key is not prepared");
+ if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+ newValue = new TransactionalCommit(key,
+ versionCounter.incrementAndGet(), completer);
+ }
+ eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+ if (newValue != null) {
+ mapEntries.put(key, newValue);
+ }
+ if (previousValue != null) {
+ previousValue.discard();
+ }
+ }
+ publish(eventsToPublish);
+ return CommitResult.OK;
+ }
+
/**
* Handles an rollback commit (ha!).
*