Refactor transaction support in preparation for migration to latest APIs
- Added a explicit transaction id type
- cli command now just returns the identifiers of in-progress transactions
- Removed redriveTransactions until a better alternative is provided
- Removed DatabaseUpdate and replaced its usage with MapUpdate
Change-Id: Ic4a14967072068834510cd8459fd2a6790e456ef
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 100941f..de22a75 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -37,6 +37,8 @@
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
@@ -382,9 +384,8 @@
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
boolean ok = false;
try {
- TransactionalMapUpdate<String, byte[]> transactionUpdate = commit
- .operation().transactionUpdate();
- for (MapUpdate<String, byte[]> update : transactionUpdate.batch()) {
+ Transaction transaction = commit.operation().transaction();
+ for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
if (preparedKeys.contains(key)) {
return PrepareResult.CONCURRENT_TRANSACTION;
@@ -403,8 +404,8 @@
// No violations detected. Add to pendingTranctions and mark
// modified keys as
// currently locked to updates.
- pendingTransactions.put(transactionUpdate.transactionId(), commit);
- transactionUpdate.batch().forEach(u -> preparedKeys.add(u.key()));
+ pendingTransactions.put(transaction.id(), commit);
+ transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
} finally {
@@ -429,16 +430,15 @@
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
}
- TransactionalMapUpdate<String, byte[]> transactionalUpdate = prepareCommit
- .operation().transactionUpdate();
- long totalReferencesToCommit = transactionalUpdate
- .batch()
+ Transaction transaction = prepareCommit.operation().transaction();
+ long totalReferencesToCommit = transaction
+ .updates()
.stream()
.filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.count();
CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
- for (MapUpdate<String, byte[]> update : transactionalUpdate.batch()) {
+ for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
MapEntryValue previousValue = mapEntries.remove(key);
MapEntryValue newValue = null;
@@ -473,8 +473,10 @@
if (prepareCommit == null) {
return RollbackResult.UNKNOWN_TRANSACTION_ID;
} else {
- prepareCommit.operation().transactionUpdate().batch()
- .forEach(u -> preparedKeys.remove(u.key()));
+ prepareCommit.operation()
+ .transaction()
+ .updates()
+ .forEach(u -> preparedKeys.remove(u.key()));
prepareCommit.close();
return RollbackResult.OK;
}
@@ -608,9 +610,8 @@
@Override
public byte[] value() {
- TransactionalMapUpdate<String, byte[]> update = completer.object()
- .operation().transactionUpdate();
- return update.valueForKey(key);
+ Transaction transaction = completer.object().operation().transaction();
+ return valueForKey(key, transaction);
}
@Override
@@ -622,5 +623,14 @@
public void discard() {
completer.countDown();
}
+
+ private byte[] valueForKey(String key, Transaction transaction) {
+ MapUpdate<String, byte[]> update = transaction.updates()
+ .stream()
+ .filter(u -> u.key().equals(key))
+ .findFirst()
+ .orElse(null);
+ return update == null ? null : update.value();
+ }
}
}