ONOS-6381 Transactional event listeners

Change-Id: I8f279d78323dea467796e8d37e3117a407af9f76
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
index 99f8df6..b8c1248 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
@@ -28,11 +28,18 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Match;
 import org.onlab.util.Tools;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.DocumentPath;
@@ -41,16 +48,23 @@
 import org.onosproject.store.service.IllegalDocumentModificationException;
 import org.onosproject.store.service.NoSuchDocumentPathException;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.ILLEGAL_MODIFICATION;
 import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.INVALID_PATH;
 import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.OK;
@@ -246,6 +260,56 @@
         events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
     }
 
+    @Override
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return proxy.<TransactionBegin, Long>invoke(
+                BEGIN,
+                SERIALIZER::encode,
+                new TransactionBegin(transactionId),
+                SERIALIZER::decode)
+                .thenApply(Version::new);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+        return proxy.<TransactionPrepare, PrepareResult>invoke(
+                PREPARE,
+                SERIALIZER::encode,
+                new TransactionPrepare(transactionLog),
+                SERIALIZER::decode)
+                .thenApply(v -> v == PrepareResult.OK);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+        return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
+                PREPARE_AND_COMMIT,
+                SERIALIZER::encode,
+                new TransactionPrepareAndCommit(transactionLog),
+                SERIALIZER::decode)
+                .thenApply(v -> v == PrepareResult.OK);
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return proxy.<TransactionCommit, CommitResult>invoke(
+                COMMIT,
+                SERIALIZER::encode,
+                new TransactionCommit(transactionId),
+                SERIALIZER::decode)
+                .thenApply(v -> null);
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return proxy.invoke(
+                ROLLBACK,
+                SERIALIZER::encode,
+                new TransactionRollback(transactionId),
+                SERIALIZER::decode)
+                .thenApply(v -> null);
+    }
+
     private class InternalListener implements DocumentTreeListener<byte[]> {
 
         private final DocumentPath path;