Step toward transactional API support on DocTree
for ONOS-7237
Change-Id: I021b20c8aac97f96c2ab0e8bc763aa821dc09643
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
index 5c21338..8562401 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
@@ -28,7 +28,7 @@
import org.onosproject.store.service.DocumentPath;
/**
- * Map update operation.
+ * DocumentTree node update operation.
*
* @param <V> map value type
*/
@@ -38,6 +38,8 @@
* Type of database update operation.
*/
public enum Type {
+ // FIXME revisit these, mismatch in description and actual implementation
+ // Also, type of update operations probably insufficient.
/**
* Creates an entry if the current version matches specified version.
*/
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncDocumentTree.java
index 71758a0..2d42d4f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncDocumentTree.java
@@ -16,12 +16,14 @@
package org.onosproject.store.primitives.impl;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
@@ -158,26 +160,58 @@
@Override
public CompletableFuture<Version> begin(TransactionId transactionId) {
- throw new UnsupportedOperationException();
+ return partitions().stream()
+ .map(p -> p.begin(transactionId))
+ // returning lowest Version
+ .reduce((f1, f2) -> f1.thenCombine(f2, Tools::min))
+ .orElse(Tools.exceptionalFuture(new IllegalStateException("Empty partitions?")));
}
@Override
public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
- throw new UnsupportedOperationException();
+ Map<AsyncDocumentTree<V>, List<NodeUpdate<V>>> perPart =
+ transactionLog.records().stream()
+ .collect(Collectors.groupingBy(nu -> partition(nu.path())));
+
+ // must walk all partitions to ensure empty TransactionLog will
+ // be issued against no-op partitions in order for commit to succeed
+ return partitions().stream()
+ .map(p -> p.prepare(new TransactionLog<>(transactionLog.transactionId(),
+ transactionLog.version(),
+ perPart.getOrDefault(p, ImmutableList.of()))))
+ .reduce((f1, f2) -> f1.thenCombine(f2, Boolean::logicalAnd))
+ .orElse(CompletableFuture.completedFuture(true));
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
- throw new UnsupportedOperationException();
+ // Note: cannot call prepareAndCommit on each partition,
+ // must check all partitions are prepare()-ed first.
+ return prepare(transactionLog)
+ .thenApply(prepOk -> {
+ if (prepOk) {
+ commit(transactionLog.transactionId());
+ return true;
+ } else {
+ return false;
+ }
+ });
+
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
- throw new UnsupportedOperationException();
+ return CompletableFuture.allOf(partitions().stream()
+ .map(p -> p.commit(transactionId))
+ .toArray(CompletableFuture[]::new)
+ );
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
- throw new UnsupportedOperationException();
+ return CompletableFuture.allOf(partitions().stream()
+ .map(p -> p.rollback(transactionId))
+ .toArray(CompletableFuture[]::new)
+ );
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
index 717f362..2633bf1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
@@ -405,6 +405,8 @@
DocumentPath path = record.path();
checkState(preparedKeys.remove(path), "path is not prepared");
+ // FIXME revisit this block, it never respects NodeUpdate type
+
Versioned<byte[]> previousValue = null;
try {
previousValue = docTree.removeNode(path);
@@ -428,7 +430,7 @@
}
}
- DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<byte[]>(
+ DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<>(
DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
Type.TRANSACTION_END,
Optional.empty(),
@@ -522,7 +524,7 @@
* @return the transaction commit log
*/
TransactionLog<NodeUpdate<byte[]>> transactionLog() {
- checkState(isPrepared());
+ checkState(isPrepared(), "Transaction not prepared");
return transactionLog;
}
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 057441e..a8c8a8b 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -56,6 +56,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.file.Files.delete;
import static java.nio.file.Files.walkFileTree;
import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
@@ -842,4 +843,33 @@
ZoneId.systemDefault());
}
+ /**
+ * Returns smaller of the two Comparable values.
+ *
+ * @param l an argument
+ * @param r another argument
+ * @return the smaller of {@code l} or {@code r}
+ * @param <C> Comparable type
+ * @throws NullPointerException if any of the arguments were null.
+ */
+ public static <C extends Comparable<? super C>> C min(C l, C r) {
+ checkNotNull(l, "l cannot be null");
+ checkNotNull(r, "r cannot be null");
+ return l.compareTo(r) <= 0 ? l : r;
+ }
+
+ /**
+ * Returns larger of the two Comparable values.
+ *
+ * @param l an argument
+ * @param r another argument
+ * @return the larger of {@code l} or {@code r}
+ * @param <C> Comparable type
+ * @throws NullPointerException if any of the arguments were null.
+ */
+ public static <C extends Comparable<? super C>> C max(C l, C r) {
+ checkNotNull(l, "l cannot be null");
+ checkNotNull(r, "r cannot be null");
+ return l.compareTo(r) >= 0 ? l : r;
+ }
}