Performance improvements
- Fast path of transactions updating a single key.
- Bug fix in StoragePartitionClient where we were always creating a CachingMap
Change-Id: Ide117fba34fd12a9ff4aabd5fb7a21952bae672b
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index be9ead4..cd61eb9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -76,7 +76,7 @@
public CompletableFuture<CommitStatus> commit() {
final MeteringAgent.Context timer = monitor.startTimer("commit");
return transactionCoordinator.commit(transactionId, txParticipants)
- .whenComplete((r, e) -> timer.stop(e));
+ .whenComplete((r, e) -> timer.stop(e));
}
@Override
@@ -89,7 +89,7 @@
Serializer serializer) {
// FIXME: Do not create duplicates.
DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<K, V>(mapName,
- creator.<K, V>newAsyncConsistentMap(mapName, serializer),
+ DistributedPrimitives.newMeteredMap(creator.<K, V>newAsyncConsistentMap(mapName, serializer)),
this,
serializer);
txParticipants.add(txMap);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
index 1aaf297..2d80970 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
@@ -180,8 +180,13 @@
}
@Override
- public boolean hasPendingUpdates() {
- return updates().size() > 0;
+ public CompletableFuture<Boolean> prepareAndCommit() {
+ return backingMap.prepareAndCommit(new MapTransaction<>(txContext.transactionId(), updates()));
+ }
+
+ @Override
+ public int totalUpdates() {
+ return updates().size();
}
protected List<MapUpdate<K, V>> updates() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index f1095d9..8b6b9aa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -178,6 +178,11 @@
}
@Override
+ public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+ return delegateMap.prepareAndCommit(transaction);
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("delegateMap", delegateMap)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
index c397f30..0a34189 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
@@ -25,13 +25,16 @@
import java.util.function.Function;
import java.util.function.Predicate;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
+
import org.onosproject.utils.MeteringAgent;
/**
@@ -59,6 +62,10 @@
private static final String ENTRY_SET = "entrySet";
private static final String REPLACE = "replace";
private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
+ private static final String PREPARE = "prepare";
+ private static final String COMMIT = "commit";
+ private static final String ROLLBACK = "rollback";
+ private static final String PREPARE_AND_COMMIT = "prepareAndCommit";
private static final String ADD_LISTENER = "addListener";
private static final String REMOVE_LISTENER = "removeListener";
private static final String NOTIFY_LISTENER = "notifyListener";
@@ -240,6 +247,34 @@
}
}
+ @Override
+ public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+ final MeteringAgent.Context timer = monitor.startTimer(PREPARE);
+ return super.prepare(transaction)
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ final MeteringAgent.Context timer = monitor.startTimer(COMMIT);
+ return super.commit(transactionId)
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ final MeteringAgent.Context timer = monitor.startTimer(ROLLBACK);
+ return super.rollback(transactionId)
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+ final MeteringAgent.Context timer = monitor.startTimer(PREPARE_AND_COMMIT);
+ return super.prepareAndCommit(transaction)
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
private class InternalMeteredMapEventListener implements MapEventListener<K, V> {
private final MapEventListener<K, V> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 6715ba7..0bc90e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -236,6 +236,24 @@
.toArray(CompletableFuture[]::new));
}
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+ Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
+ transaction.updates().forEach(update -> {
+ AsyncConsistentMap<K, V> map = getMap(update.key());
+ updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
+ });
+ Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
+ Maps.transformValues(updatesGroupedByMap,
+ list -> new MapTransaction<>(transaction.transactionId(), list));
+
+ return Tools.allOf(transactionsByMap.entrySet()
+ .stream()
+ .map(e -> e.getKey().prepareAndCommit(e.getValue()))
+ .collect(Collectors.toList()))
+ .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+ }
+
/**
* Returns the map (partition) to which the specified key maps.
* @param key key
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index c70f4e4..8fd0a06 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -108,7 +108,7 @@
value -> value == null ? null : serializer.encode(value),
bytes -> serializer.decode(bytes));
- return DistributedPrimitives.newCachingMap(transcodedMap);
+ return transcodedMap;
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 1904894..3670aac 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -44,20 +44,32 @@
*/
CompletableFuture<CommitStatus> commit(TransactionId transactionId,
Set<TransactionParticipant> transactionParticipants) {
- if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) {
- return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
- }
+ int totalUpdates = transactionParticipants.stream()
+ .map(TransactionParticipant::totalUpdates)
+ .reduce(Math::addExact)
+ .orElse(0);
- CompletableFuture<CommitStatus> status = transactions.put(transactionId, Transaction.State.PREPARING)
+ if (totalUpdates == 0) {
+ return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
+ } else if (totalUpdates == 1) {
+ return transactionParticipants.stream()
+ .filter(p -> p.totalUpdates() == 1)
+ .findFirst()
+ .get()
+ .prepareAndCommit()
+ .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
+ } else {
+ CompletableFuture<CommitStatus> status = transactions.put(transactionId, Transaction.State.PREPARING)
.thenCompose(v -> this.doPrepare(transactionParticipants))
.thenCompose(result -> result
- ? transactions.put(transactionId, Transaction.State.COMMITTING)
- .thenCompose(v -> doCommit(transactionParticipants))
- .thenApply(v -> CommitStatus.SUCCESS)
- : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
- .thenCompose(v -> doRollback(transactionParticipants))
- .thenApply(v -> CommitStatus.FAILURE));
- return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
+ ? transactions.put(transactionId, Transaction.State.COMMITTING)
+ .thenCompose(v -> doCommit(transactionParticipants))
+ .thenApply(v -> CommitStatus.SUCCESS)
+ : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
+ .thenCompose(v -> doRollback(transactionParticipants))
+ .thenApply(v -> CommitStatus.FAILURE));
+ return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
+ }
}
private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
index 66e1f3a..c8308af 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
@@ -26,7 +26,21 @@
* Returns if this participant has updates that need to be committed.
* @return {@code true} if yes; {@code false} otherwise
*/
- boolean hasPendingUpdates();
+ default boolean hasPendingUpdates() {
+ return totalUpdates() > 0;
+ }
+
+ /**
+ * Returns the number of updates that need to committed for this participant.
+ * @return update count.
+ */
+ int totalUpdates();
+
+ /**
+ * Executes the prepare and commit steps in a single go.
+ * @return {@code true} is successful i.e updates are committed; {@code false} otherwise
+ */
+ CompletableFuture<Boolean> prepareAndCommit();
/**
* Executes the prepare phase.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index a2989ac..b679925 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -213,6 +213,11 @@
return backingMap.rollback(transactionId);
}
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K1, V1> transaction) {
+ return backingMap.prepareAndCommit(transaction.map(keyEncoder, valueEncoder));
+ }
+
private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
private final MapEventListener<K1, V1> listener;