Performance improvements
- Fast path of transactions updating a single key.
- Bug fix in StoragePartitionClient where we were always creating a CachingMap
Change-Id: Ide117fba34fd12a9ff4aabd5fb7a21952bae672b
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index bb320e4..0847153 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
@@ -44,6 +43,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
@@ -53,7 +53,6 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
import com.google.common.collect.Sets;
@@ -64,7 +63,6 @@
public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
implements AsyncConsistentMap<String, byte[]> {
- private final Logger log = getLogger(getClass());
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
public static final String CHANGE_SUBJECT = "changeEvents";
@@ -288,4 +286,9 @@
return submit(new TransactionRollback(transactionId))
.thenApply(v -> null);
}
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
+ return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+ }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index dbc3157..73af19f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -245,6 +245,19 @@
}
/**
+ * Map prepareAndCommit command.
+ */
+ @SuppressWarnings("serial")
+ public static class TransactionPrepareAndCommit extends TransactionPrepare {
+ public TransactionPrepareAndCommit() {
+ }
+
+ public TransactionPrepareAndCommit(MapTransaction<String, byte[]> mapTransaction) {
+ super(mapTransaction);
+ }
+ }
+
+ /**
* Map transaction commit command.
*/
@SuppressWarnings("serial")
@@ -489,12 +502,6 @@
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .toString();
- }
}
/**
@@ -509,12 +516,6 @@
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .toString();
- }
}
/**
@@ -537,7 +538,8 @@
registry.register(TransactionPrepare.class, -772);
registry.register(TransactionCommit.class, -773);
registry.register(TransactionRollback.class, -774);
- registry.register(UpdateAndGet.class, -775);
+ registry.register(TransactionPrepareAndCommit.class, -775);
+ registry.register(UpdateAndGet.class, -776);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index aad7382..983e27e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -52,6 +52,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
@@ -111,6 +112,7 @@
executor.register(TransactionPrepare.class, this::prepare);
executor.register(TransactionCommit.class, this::commit);
executor.register(TransactionRollback.class, this::rollback);
+ executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
}
@Override
@@ -352,6 +354,20 @@
}
/**
+ * Handles an prepare and commit commit.
+ *
+ * @param commit transaction prepare and commit commit
+ * @return prepare result
+ */
+ protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
+ PrepareResult prepareResult = prepare(commit);
+ if (prepareResult == PrepareResult.OK) {
+ commitInternal(commit.operation().transaction().transactionId());
+ }
+ return prepareResult;
+ }
+
+ /**
* Handles an prepare commit.
*
* @param commit transaction prepare commit
@@ -399,44 +415,48 @@
protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
- Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
- .remove(transactionId);
- if (prepareCommit == null) {
- return CommitResult.UNKNOWN_TRANSACTION_ID;
- }
- MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
- long totalReferencesToCommit = transaction
- .updates()
- .stream()
- .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
- .count();
- CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
- new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
- List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
- for (MapUpdate<String, byte[]> update : transaction.updates()) {
- String key = update.key();
- MapEntryValue previousValue = mapEntries.remove(key);
- MapEntryValue newValue = null;
- checkState(preparedKeys.remove(key), "key is not prepared");
- if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
- newValue = new TransactionalCommit(key,
- versionCounter.incrementAndGet(), completer);
- }
- eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
- if (newValue != null) {
- mapEntries.put(key, newValue);
- }
- if (previousValue != null) {
- previousValue.discard();
- }
- }
- publish(eventsToPublish);
- return CommitResult.OK;
+ return commitInternal(transactionId);
} finally {
commit.close();
}
}
+ private CommitResult commitInternal(TransactionId transactionId) {
+ Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
+ .remove(transactionId);
+ if (prepareCommit == null) {
+ return CommitResult.UNKNOWN_TRANSACTION_ID;
+ }
+ MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
+ long totalReferencesToCommit = transaction
+ .updates()
+ .stream()
+ .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .count();
+ CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
+ new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+ List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
+ for (MapUpdate<String, byte[]> update : transaction.updates()) {
+ String key = update.key();
+ MapEntryValue previousValue = mapEntries.remove(key);
+ MapEntryValue newValue = null;
+ checkState(preparedKeys.remove(key), "key is not prepared");
+ if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+ newValue = new TransactionalCommit(key,
+ versionCounter.incrementAndGet(), completer);
+ }
+ eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+ if (newValue != null) {
+ mapEntries.put(key, newValue);
+ }
+ if (previousValue != null) {
+ previousValue.discard();
+ }
+ }
+ publish(eventsToPublish);
+ return CommitResult.OK;
+ }
+
/**
* Handles an rollback commit (ha!).
*