Step toward transactional API support on DocTree

for ONOS-7237

Change-Id: I021b20c8aac97f96c2ab0e8bc763aa821dc09643
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
index 5c21338..8562401 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
@@ -28,7 +28,7 @@
 import org.onosproject.store.service.DocumentPath;
 
 /**
- * Map update operation.
+ * DocumentTree node update operation.
  *
  * @param <V> map value type
  */
@@ -38,6 +38,8 @@
      * Type of database update operation.
      */
     public enum Type {
+        // FIXME revisit these, mismatch in description and actual implementation
+        // Also, type of update operations probably insufficient.
         /**
          * Creates an entry if the current version matches specified version.
          */
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncDocumentTree.java
index 71758a0..2d42d4f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncDocumentTree.java
@@ -16,12 +16,14 @@
 package org.onosproject.store.primitives.impl;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
@@ -158,26 +160,58 @@
 
     @Override
     public CompletableFuture<Version> begin(TransactionId transactionId) {
-        throw new UnsupportedOperationException();
+        return partitions().stream()
+            .map(p -> p.begin(transactionId))
+            // returning lowest Version
+            .reduce((f1, f2) -> f1.thenCombine(f2, Tools::min))
+            .orElse(Tools.exceptionalFuture(new IllegalStateException("Empty partitions?")));
     }
 
     @Override
     public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
-        throw new UnsupportedOperationException();
+        Map<AsyncDocumentTree<V>, List<NodeUpdate<V>>> perPart =
+                transactionLog.records().stream()
+                    .collect(Collectors.groupingBy(nu -> partition(nu.path())));
+
+        // must walk all partitions to ensure empty TransactionLog will
+        // be issued against no-op partitions in order for commit to succeed
+        return partitions().stream()
+                .map(p -> p.prepare(new TransactionLog<>(transactionLog.transactionId(),
+                                    transactionLog.version(),
+                                    perPart.getOrDefault(p, ImmutableList.of()))))
+                .reduce((f1, f2) -> f1.thenCombine(f2, Boolean::logicalAnd))
+                .orElse(CompletableFuture.completedFuture(true));
     }
 
     @Override
     public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
-        throw new UnsupportedOperationException();
+        // Note: cannot call prepareAndCommit on each partition,
+        // must check all partitions are prepare()-ed first.
+        return prepare(transactionLog)
+                .thenApply(prepOk -> {
+                    if (prepOk) {
+                        commit(transactionLog.transactionId());
+                        return true;
+                    } else {
+                        return false;
+                    }
+                });
+
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        throw new UnsupportedOperationException();
+        return CompletableFuture.allOf(partitions().stream()
+                                           .map(p -> p.commit(transactionId))
+                                           .toArray(CompletableFuture[]::new)
+                                       );
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        throw new UnsupportedOperationException();
+        return CompletableFuture.allOf(partitions().stream()
+                                       .map(p -> p.rollback(transactionId))
+                                       .toArray(CompletableFuture[]::new)
+                                   );
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
index 717f362..2633bf1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
@@ -405,6 +405,8 @@
             DocumentPath path = record.path();
             checkState(preparedKeys.remove(path), "path is not prepared");
 
+            // FIXME revisit this block, it never respects NodeUpdate type
+
             Versioned<byte[]> previousValue = null;
             try {
                 previousValue = docTree.removeNode(path);
@@ -428,7 +430,7 @@
             }
         }
 
-        DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<byte[]>(
+        DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<>(
                 DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
                 Type.TRANSACTION_END,
                 Optional.empty(),
@@ -522,7 +524,7 @@
          * @return the transaction commit log
          */
         TransactionLog<NodeUpdate<byte[]>> transactionLog() {
-            checkState(isPrepared());
+            checkState(isPrepared(), "Transaction not prepared");
             return transactionLog;
         }
 
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 057441e..a8c8a8b 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -56,6 +56,7 @@
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.file.Files.delete;
 import static java.nio.file.Files.walkFileTree;
 import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
@@ -842,4 +843,33 @@
                                         ZoneId.systemDefault());
     }
 
+    /**
+     * Returns smaller of the two Comparable values.
+     *
+     * @param l an argument
+     * @param r another argument
+     * @return the smaller of {@code l} or {@code r}
+     * @param <C> Comparable type
+     * @throws NullPointerException if any of the arguments were null.
+     */
+    public static <C extends Comparable<? super C>> C min(C l, C r) {
+        checkNotNull(l, "l cannot be null");
+        checkNotNull(r, "r cannot be null");
+        return l.compareTo(r) <= 0 ? l : r;
+    }
+
+    /**
+     * Returns larger of the two Comparable values.
+     *
+     * @param l an argument
+     * @param r another argument
+     * @return the larger of {@code l} or {@code r}
+     * @param <C> Comparable type
+     * @throws NullPointerException if any of the arguments were null.
+     */
+    public static <C extends Comparable<? super C>> C max(C l, C r) {
+        checkNotNull(l, "l cannot be null");
+        checkNotNull(r, "r cannot be null");
+        return l.compareTo(r) >= 0 ? l : r;
+    }
 }