Added distributed transaction support through a two phase commit protocol
Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index ad049e6..178f49f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -27,7 +27,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Lists;
@@ -129,24 +130,27 @@
}
@Override
- public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).put(tableName, key, value);
}
@Override
- public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key);
}
@Override
- public CompletableFuture<Void> clear(String tableName) {
+ public CompletableFuture<Result<Void>> clear(String tableName) {
+ AtomicBoolean isLocked = new AtomicBoolean(false);
checkState(isOpen.get(), DB_NOT_OPEN);
return CompletableFuture.allOf(partitions
.stream()
- .map(p -> p.clear(tableName))
- .toArray(CompletableFuture[]::new));
+ .map(p -> p.clear(tableName)
+ .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
}
@Override
@@ -183,59 +187,86 @@
}
@Override
- public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
}
@Override
- public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key, value);
}
@Override
- public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key, version);
}
@Override
- public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ public CompletableFuture<Result<Boolean>> replace(
+ String tableName, String key, byte[] oldValue, byte[] newValue) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
}
@Override
- public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ public CompletableFuture<Result<Boolean>> replace(
+ String tableName, String key, long oldVersion, byte[] newValue) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
}
@Override
- public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
- for (UpdateOperation<String, byte[]> update : updates) {
- Database partition = partitioner.getPartition(update.tableName(), update.key());
- List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
- if (partitionUpdates == null) {
- partitionUpdates = Lists.newArrayList();
- perPartitionUpdates.put(partition, partitionUpdates);
- }
- partitionUpdates.add(update);
- }
- if (perPartitionUpdates.size() > 1) {
- // TODO
- throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+ public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+ Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ if (subTransactions.isEmpty()) {
+ return CompletableFuture.completedFuture(true);
+ } else if (subTransactions.size() == 1) {
+ Entry<Database, Transaction> entry =
+ subTransactions.entrySet().iterator().next();
+ return entry.getKey().prepareAndCommit(entry.getValue());
} else {
- Entry<Database, List<UpdateOperation<String, byte[]>>> only =
- perPartitionUpdates.entrySet().iterator().next();
- return only.getKey().atomicBatchUpdate(only.getValue());
+ return new TransactionManager(this).execute(transaction);
}
}
@Override
+ public CompletableFuture<Boolean> prepare(Transaction transaction) {
+ Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ AtomicBoolean status = new AtomicBoolean(true);
+ return CompletableFuture.allOf(subTransactions.entrySet()
+ .stream()
+ .map(entry -> entry
+ .getKey()
+ .prepare(entry.getValue())
+ .thenApply(v -> status.compareAndSet(true, v)))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> status.get());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> commit(Transaction transaction) {
+ Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ return CompletableFuture.allOf(subTransactions.entrySet()
+ .stream()
+ .map(entry -> entry.getKey().commit(entry.getValue()))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> rollback(Transaction transaction) {
+ Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ return CompletableFuture.allOf(subTransactions.entrySet()
+ .stream()
+ .map(entry -> entry.getKey().rollback(entry.getValue()))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> true);
+ }
+
+ @Override
public CompletableFuture<Database> open() {
return CompletableFuture.allOf(partitions
.stream()
@@ -243,7 +274,8 @@
.toArray(CompletableFuture[]::new))
.thenApply(v -> {
isOpen.set(true);
- return this; });
+ return this;
+ });
}
@Override
@@ -279,4 +311,19 @@
public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
throw new UnsupportedOperationException();
}
-}
\ No newline at end of file
+
+ private Map<Database, Transaction> createSubTransactions(
+ Transaction transaction) {
+ Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
+ for (DatabaseUpdate update : transaction.updates()) {
+ Database partition = partitioner.getPartition(update.tableName(), update.key());
+ List<DatabaseUpdate> partitionUpdates =
+ perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
+ partitionUpdates.add(update);
+ }
+ Map<Database, Transaction> subTransactions = Maps.newHashMap();
+ perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
+
+ return subTransactions;
+ }
+}