ONOS-6381 Transactional event listeners

Change-Id: I8f279d78323dea467796e8d37e3117a407af9f76
diff --git a/apps/config/src/main/java/org/onosproject/config/impl/DistributedDynamicConfigStore.java b/apps/config/src/main/java/org/onosproject/config/impl/DistributedDynamicConfigStore.java
index c005aa8..8a84f06 100644
--- a/apps/config/src/main/java/org/onosproject/config/impl/DistributedDynamicConfigStore.java
+++ b/apps/config/src/main/java/org/onosproject/config/impl/DistributedDynamicConfigStore.java
@@ -5,7 +5,7 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
new file mode 100644
index 0000000..5c21338
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.onlab.util.ByteArraySizeHashPrinter;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.store.service.DocumentPath;
+
+/**
+ * Map update operation.
+ *
+ * @param <V> map value type
+ */
+public final class NodeUpdate<V> {
+
+    /**
+     * Type of database update operation.
+     */
+    public enum Type {
+        /**
+         * Creates an entry if the current version matches specified version.
+         */
+        CREATE_NODE,
+        /**
+         * Updates an entry if the current version matches specified version.
+         */
+        UPDATE_NODE,
+        /**
+         * Deletes an entry if the current version matches specified version.
+         */
+        DELETE_NODE
+    }
+
+    private Type type;
+    private DocumentPath path;
+    private V value;
+    private long version = -1;
+
+    /**
+     * Returns the type of update operation.
+     * @return type of update.
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the item path being updated.
+     * @return item path
+     */
+    public DocumentPath path() {
+        return path;
+    }
+
+    /**
+     * Returns the new value.
+     * @return item's target value.
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * Returns the expected current version in the database for the key.
+     * @return expected version.
+     */
+    public long version() {
+        return version;
+    }
+
+    /**
+     * Transforms this instance into an instance of different paramterized types.
+     *
+     * @param valueMapper transcoder to value type
+     * @return new instance
+     * @param <T> value type of returned instance
+     */
+    public <T> NodeUpdate<T> map(Function<V, T> valueMapper) {
+        return NodeUpdate.<T>newBuilder()
+                .withType(type)
+                //.withKey(keyMapper.apply(key))
+                .withValue(value == null ? null : valueMapper.apply(value))
+                .withVersion(version)
+                .build();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, path, value, version);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (object instanceof NodeUpdate) {
+            NodeUpdate that = (NodeUpdate) object;
+            return this.type == that.type
+                    && Objects.equals(this.path, that.path)
+                    && Objects.equals(this.value, that.value)
+                    && Objects.equals(this.version, that.version);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("type", type)
+                .add("path", path)
+                .add("value", value instanceof byte[] ?
+                        new ByteArraySizeHashPrinter((byte[]) value) : value)
+                .add("version", version)
+                .toString();
+    }
+
+    /**
+     * Creates a new builder instance.
+     *
+     * @param <V> value type
+     * @return builder.
+     */
+    public static <V> Builder<V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * NodeUpdate builder.
+     *
+     * @param <V> value type
+     */
+    public static final class Builder<V> {
+
+        private NodeUpdate<V> update = new NodeUpdate<>();
+
+        public NodeUpdate<V> build() {
+            validateInputs();
+            return update;
+        }
+
+        public Builder<V> withType(Type type) {
+            update.type = checkNotNull(type, "type cannot be null");
+            return this;
+        }
+
+        public Builder<V> withPath(DocumentPath key) {
+            update.path = checkNotNull(key, "key cannot be null");
+            return this;
+        }
+
+        public Builder<V> withValue(V value) {
+            update.value = value;
+            return this;
+        }
+
+        public Builder<V> withVersion(long version) {
+            update.version = version;
+            return this;
+        }
+
+        private void validateInputs() {
+            checkNotNull(update.type, "type must be specified");
+            switch (update.type) {
+                case CREATE_NODE:
+                    checkNotNull(update.path, "key must be specified");
+                    checkNotNull(update.value, "value must be specified.");
+                    break;
+                case UPDATE_NODE:
+                    checkNotNull(update.path, "key must be specified");
+                    checkNotNull(update.value, "value must be specified.");
+                    checkState(update.version >= 0, "version must be specified");
+                    break;
+                case DELETE_NODE:
+                    checkNotNull(update.path, "key must be specified");
+                    checkState(update.version >= 0, "version must be specified");
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown operation type");
+            }
+
+        }
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncDocumentTree.java b/core/api/src/main/java/org/onosproject/store/service/AsyncDocumentTree.java
index 1f0902c..46dcdd5 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncDocumentTree.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncDocumentTree.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.store.service;
 
+import org.onosproject.store.primitives.NodeUpdate;
+
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -29,7 +31,7 @@
  * @param <V> document tree value type
  */
 @NotThreadSafe
-public interface AsyncDocumentTree<V> extends DistributedPrimitive {
+public interface AsyncDocumentTree<V> extends DistributedPrimitive, Transactional<NodeUpdate<V>> {
 
     @Override
     default Type primitiveType() {
diff --git a/core/api/src/main/java/org/onosproject/store/service/DocumentTreeEvent.java b/core/api/src/main/java/org/onosproject/store/service/DocumentTreeEvent.java
index f640da8..81b28ac 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DocumentTreeEvent.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DocumentTreeEvent.java
@@ -44,7 +44,9 @@
         /**
          * Signifies an existing node being deleted.
          */
-        DELETED
+        DELETED,
+        TRANSACTION_START,
+        TRANSACTION_END
     }
 
     private final DocumentPath path;
@@ -77,6 +79,22 @@
         this.newValue = newValue;
         this.oldValue = oldValue;
     }
+    /**
+     * Constructs a new {@code DocumentTreeEvent}.
+     *
+     * @param path path to the node
+     * @param newValue optional new value; will be empty if node was deleted
+     * @param oldValue optional old value; will be empty if node was created
+     */
+    public DocumentTreeEvent(DocumentPath path,
+                             Optional<Versioned<V>> newValue,
+                             Optional<Versioned<V>> oldValue) {
+        this.path = path;
+        this.newValue = newValue;
+        this.oldValue = oldValue;
+        this.type = newValue != null ?
+                oldValue != null ? Type.UPDATED : Type.CREATED : Type.DELETED;
+    }
 
     /**
      * Returns the path to the changed node.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
index c9a9f31..8d4f9c3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
@@ -19,11 +19,15 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.DocumentPath;
 import org.onosproject.store.service.DocumentTreeEvent;
 import org.onosproject.store.service.DocumentTreeListener;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Maps;
@@ -108,6 +112,33 @@
                           .thenApply(v -> v == null ? null : v.map(serializer::decode));
     }
 
+
+    @Override
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return backingTree.begin(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
+        return backingTree.prepare(transactionLog.map(record -> record.map(serializer::encode)));
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return backingTree.commit(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return backingTree.rollback(transactionId);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
+        return backingTree.prepareAndCommit(transactionLog.map(record -> record.map(serializer::encode)));
+    }
+
+
     @Override
     public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
         synchronized (listeners) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
index 99f8df6..b8c1248 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
@@ -28,11 +28,18 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Match;
 import org.onlab.util.Tools;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.DocumentPath;
@@ -41,16 +48,23 @@
 import org.onosproject.store.service.IllegalDocumentModificationException;
 import org.onosproject.store.service.NoSuchDocumentPathException;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.ILLEGAL_MODIFICATION;
 import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.INVALID_PATH;
 import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.OK;
@@ -246,6 +260,56 @@
         events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
     }
 
+    @Override
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return proxy.<TransactionBegin, Long>invoke(
+                BEGIN,
+                SERIALIZER::encode,
+                new TransactionBegin(transactionId),
+                SERIALIZER::decode)
+                .thenApply(Version::new);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+        return proxy.<TransactionPrepare, PrepareResult>invoke(
+                PREPARE,
+                SERIALIZER::encode,
+                new TransactionPrepare(transactionLog),
+                SERIALIZER::decode)
+                .thenApply(v -> v == PrepareResult.OK);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+        return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
+                PREPARE_AND_COMMIT,
+                SERIALIZER::encode,
+                new TransactionPrepareAndCommit(transactionLog),
+                SERIALIZER::decode)
+                .thenApply(v -> v == PrepareResult.OK);
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return proxy.<TransactionCommit, CommitResult>invoke(
+                COMMIT,
+                SERIALIZER::encode,
+                new TransactionCommit(transactionId),
+                SERIALIZER::decode)
+                .thenApply(v -> null);
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return proxy.invoke(
+                ROLLBACK,
+                SERIALIZER::encode,
+                new TransactionRollback(transactionId),
+                SERIALIZER::decode)
+                .thenApply(v -> null);
+    }
+
     private class InternalListener implements DocumentTreeListener<byte[]> {
 
         private final DocumentPath path;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeOperations.java
index 718223e..d05215c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeOperations.java
@@ -24,8 +24,11 @@
 import io.atomix.protocols.raft.operation.OperationType;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Match;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -37,7 +40,12 @@
     GET("incrementAndGet", OperationType.QUERY),
     GET_CHILDREN("getAndIncrement", OperationType.QUERY),
     UPDATE("addAndGet", OperationType.COMMAND),
-    CLEAR("getAndAdd", OperationType.COMMAND);
+    CLEAR("getAndAdd", OperationType.COMMAND),
+    BEGIN("begin", OperationType.COMMAND),
+    PREPARE("prepare", OperationType.COMMAND),
+    PREPARE_AND_COMMIT("prepareAndCommit", OperationType.COMMAND),
+    COMMIT("commit", OperationType.COMMAND),
+    ROLLBACK("rollback", OperationType.COMMAND);
 
     private final String id;
     private final OperationType type;
@@ -66,6 +74,18 @@
             .register(Get.class)
             .register(GetChildren.class)
             .register(Update.class)
+            .register(TransactionBegin.class)
+            .register(TransactionPrepare.class)
+            .register(TransactionPrepareAndCommit.class)
+            .register(TransactionCommit.class)
+            .register(TransactionRollback.class)
+            .register(TransactionId.class)
+            .register(TransactionLog.class)
+            .register(PrepareResult.class)
+            .register(CommitResult.class)
+            .register(RollbackResult.class)
+            .register(NodeUpdate.class)
+            .register(NodeUpdate.Type.class)
             .register(DocumentPath.class)
             .register(Match.class)
             .register(Versioned.class)
@@ -74,13 +94,19 @@
             .build("AtomixDocumentTreeOperations");
 
     /**
-     * Abstract DocumentTree command.
+     * Base class for document tree operations.
+     */
+    public abstract static class DocumentTreeOperation {
+    }
+
+    /**
+     * Base class for document tree operations that serialize a {@link DocumentPath}.
      */
     @SuppressWarnings("serial")
-    public abstract static class DocumentTreeOperation {
+    public abstract static class PathOperation extends DocumentTreeOperation {
         private DocumentPath path;
 
-        DocumentTreeOperation(DocumentPath path) {
+        PathOperation(DocumentPath path) {
             this.path = path;
         }
 
@@ -93,7 +119,7 @@
      * DocumentTree#get query.
      */
     @SuppressWarnings("serial")
-    public static class Get extends DocumentTreeOperation {
+    public static class Get extends PathOperation {
         public Get() {
             super(null);
         }
@@ -114,7 +140,7 @@
      * DocumentTree#getChildren query.
      */
     @SuppressWarnings("serial")
-    public static class GetChildren extends DocumentTreeOperation {
+    public static class GetChildren extends PathOperation {
         public GetChildren() {
             super(null);
         }
@@ -135,7 +161,7 @@
      * DocumentTree update command.
      */
     @SuppressWarnings("serial")
-    public static class Update extends DocumentTreeOperation {
+    public static class Update extends PathOperation {
         private Optional<byte[]> value;
         private Match<byte[]> valueMatch;
         private Match<Long> versionMatch;
@@ -181,7 +207,7 @@
      * Change listen.
      */
     @SuppressWarnings("serial")
-    public static class Listen extends DocumentTreeOperation {
+    public static class Listen extends PathOperation {
         public Listen() {
             this(DocumentPath.from("root"));
         }
@@ -202,7 +228,7 @@
      * Change unlisten.
      */
     @SuppressWarnings("serial")
-    public static class Unlisten extends DocumentTreeOperation {
+    public static class Unlisten extends PathOperation {
         public Unlisten() {
             this(DocumentPath.from("root"));
         }
@@ -218,4 +244,129 @@
                     .toString();
         }
     }
+
+    /**
+     * Transaction begin command.
+     */
+    public static class TransactionBegin extends PathOperation {
+        private TransactionId transactionId;
+
+        public TransactionBegin() {
+            super(null);
+        }
+
+        public TransactionBegin(TransactionId transactionId) {
+            super(DocumentPath.from(transactionId.toString()));
+            this.transactionId = transactionId;
+        }
+
+        public TransactionId transactionId() {
+            return transactionId;
+        }
+    }
+
+    /**
+     * Transaction prepare command.
+     */
+    @SuppressWarnings("serial")
+    public static class TransactionPrepare extends PathOperation {
+        private TransactionLog<NodeUpdate<byte[]>> transactionLog;
+
+        public TransactionPrepare() {
+            super(null);
+        }
+
+        public TransactionPrepare(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+            super(DocumentPath.from(transactionLog.transactionId().toString()));
+            this.transactionLog = transactionLog;
+        }
+
+        public TransactionLog<NodeUpdate<byte[]>> transactionLog() {
+            return transactionLog;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("transactionLog", transactionLog)
+                    .toString();
+        }
+    }
+
+    /**
+     * Transaction prepareAndCommit command.
+     */
+    @SuppressWarnings("serial")
+    public static class TransactionPrepareAndCommit extends TransactionPrepare {
+        public TransactionPrepareAndCommit() {
+        }
+
+        public TransactionPrepareAndCommit(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+            super(transactionLog);
+        }
+    }
+
+    /**
+     * Transaction commit command.
+     */
+    @SuppressWarnings("serial")
+    public static class TransactionCommit extends PathOperation {
+        private TransactionId transactionId;
+
+        public TransactionCommit() {
+            super(null);
+        }
+
+        public TransactionCommit(TransactionId transactionId) {
+            super(DocumentPath.from(transactionId.toString()));
+            this.transactionId = transactionId;
+        }
+
+        /**
+         * Returns the transaction identifier.
+         * @return transaction id
+         */
+        public TransactionId transactionId() {
+            return transactionId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("transactionId", transactionId)
+                    .toString();
+        }
+    }
+
+    /**
+     * Transaction rollback command.
+     */
+    @SuppressWarnings("serial")
+    public static class TransactionRollback extends PathOperation {
+        private TransactionId transactionId;
+
+        public TransactionRollback() {
+            super(null);
+        }
+
+        public TransactionRollback(TransactionId transactionId) {
+            super(DocumentPath.from(transactionId.toString()));
+            this.transactionId = transactionId;
+        }
+
+        /**
+         * Returns the transaction identifier.
+         * @return transaction id
+         */
+        public TransactionId transactionId() {
+            return transactionId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("transactionId", transactionId)
+                    .toString();
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
index 9e87325..b87f784 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
@@ -19,10 +19,12 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -32,7 +34,9 @@
 import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 import io.atomix.protocols.raft.event.EventType;
 import io.atomix.protocols.raft.service.AbstractRaftService;
 import io.atomix.protocols.raft.service.Commit;
@@ -42,11 +46,19 @@
 import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Match;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
+
 import org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.DocumentPath;
@@ -57,15 +69,22 @@
 import org.onosproject.store.service.NoSuchDocumentPathException;
 import org.onosproject.store.service.Ordering;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
 
 /**
  * State Machine for {@link AtomixDocumentTree} resource.
@@ -90,9 +109,12 @@
             }, Listener.class)
             .register(Versioned.class)
             .register(DocumentPath.class)
-            .register(new HashMap().keySet().getClass())
+            .register(new LinkedHashMap().keySet().getClass())
             .register(TreeMap.class)
             .register(Ordering.class)
+            .register(TransactionScope.class)
+            .register(TransactionLog.class)
+            .register(TransactionId.class)
             .register(SessionListenCommits.class)
             .register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
                 @Override
@@ -113,6 +135,8 @@
     private Map<Long, SessionListenCommits> listeners = new HashMap<>();
     private AtomicLong versionCounter = new AtomicLong(0);
     private DocumentTree<byte[]> docTree;
+    private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+    private Set<DocumentPath> preparedKeys = Sets.newHashSet();
 
     public AtomixDocumentTreeService(Ordering ordering) {
         this.docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet, ordering);
@@ -123,6 +147,8 @@
         writer.writeLong(versionCounter.get());
         writer.writeObject(listeners, serializer::encode);
         writer.writeObject(docTree, serializer::encode);
+        writer.writeObject(preparedKeys, serializer::encode);
+        writer.writeObject(activeTransactions, serializer::encode);
     }
 
     @Override
@@ -130,6 +156,8 @@
         versionCounter = new AtomicLong(reader.readLong());
         listeners = reader.readObject(serializer::decode);
         docTree = reader.readObject(serializer::decode);
+        preparedKeys = reader.readObject(serializer::decode);
+        activeTransactions = reader.readObject(serializer::decode);
     }
 
     @Override
@@ -143,6 +171,21 @@
         // commands
         executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
         executor.register(CLEAR, this::clear);
+        executor.register(BEGIN, serializer::decode, this::begin, serializer::encode);
+        executor.register(PREPARE, serializer::decode, this::prepare, serializer::encode);
+        executor.register(PREPARE_AND_COMMIT, serializer::decode, this::prepareAndCommit, serializer::encode);
+        executor.register(COMMIT, serializer::decode, this::commit, serializer::encode);
+        executor.register(ROLLBACK, serializer::decode, this::rollback, serializer::encode);
+    }
+
+    /**
+     * Returns a boolean indicating whether the given path is currently locked by a transaction.
+     *
+     * @param path the path to check
+     * @return whether the given path is locked by a running transaction
+     */
+    private boolean isLocked(DocumentPath path) {
+        return preparedKeys.contains(path);
     }
 
     protected void listen(Commit<? extends Listen> commit) {
@@ -179,7 +222,12 @@
     protected DocumentTreeResult<Versioned<byte[]>> update(Commit<? extends Update> commit) {
         DocumentTreeResult<Versioned<byte[]>> result = null;
         DocumentPath path = commit.value().path();
-        boolean updated = false;
+
+        // If the path is locked by a transaction, return a WRITE_LOCK error.
+        if (isLocked(path)) {
+            return DocumentTreeResult.writeLock();
+        }
+
         Versioned<byte[]> currentValue = docTree.get(path);
         try {
             Match<Long> versionMatch = commit.value().versionMatch();
@@ -250,6 +298,251 @@
         }
     }
 
+    /**
+     * Handles a begin commit.
+     *
+     * @param commit transaction begin commit
+     * @return transaction state version
+     */
+    protected long begin(Commit<? extends TransactionBegin> commit) {
+        long version = commit.index();
+        activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
+        return version;
+    }
+
+    /**
+     * Handles an prepare commit.
+     *
+     * @param commit transaction prepare commit
+     * @return prepare result
+     */
+    protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
+        try {
+            TransactionLog<NodeUpdate<byte[]>> transactionLog = commit.value().transactionLog();
+            // Iterate through records in the transaction log and perform isolation checks.
+            for (NodeUpdate<byte[]> record : transactionLog.records()) {
+                DocumentPath path = record.path();
+
+                // If the prepared keys already contains the key contained within the record, that indicates a
+                // conflict with a concurrent transaction.
+                if (preparedKeys.contains(path)) {
+                    return PrepareResult.CONCURRENT_TRANSACTION;
+                }
+
+                // Read the existing value from the map.
+                Versioned<byte[]> existingValue = docTree.get(path);
+
+                // If the update is an UPDATE_NODE or DELETE_NODE, verify that versions match.
+                switch (record.type()) {
+                    case UPDATE_NODE:
+                    case DELETE_NODE:
+                        if (existingValue == null || existingValue.version() != record.version()) {
+                            return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                        }
+                    default:
+                        break;
+                }
+            }
+
+            // No violations detected. Mark modified keys locked for transactions.
+            transactionLog.records().forEach(record -> {
+                preparedKeys.add(record.path());
+            });
+
+            // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
+            // coordinator is communicating with another node. Transactions assume that the client is communicating
+            // with a single leader in order to limit the overhead of retaining tombstones.
+            TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
+            if (transactionScope == null) {
+                activeTransactions.put(
+                        transactionLog.transactionId(),
+                        new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
+                return PrepareResult.PARTIAL_FAILURE;
+            } else {
+                activeTransactions.put(
+                        transactionLog.transactionId(),
+                        transactionScope.prepared(commit));
+                return PrepareResult.OK;
+            }
+        } catch (Exception e) {
+            getLogger().warn("Failure applying {}", commit, e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    /**
+     * Handles an prepare and commit commit.
+     *
+     * @param commit transaction prepare and commit commit
+     * @return prepare result
+     */
+    protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
+        TransactionId transactionId = commit.value().transactionLog().transactionId();
+        PrepareResult prepareResult = prepare(commit);
+        TransactionScope transactionScope = activeTransactions.remove(transactionId);
+        if (prepareResult == PrepareResult.OK) {
+            transactionScope = transactionScope.prepared(commit);
+            commitTransaction(transactionScope);
+        }
+        return prepareResult;
+    }
+
+    /**
+     * Applies committed operations to the state machine.
+     */
+    private CommitResult commitTransaction(TransactionScope transactionScope) {
+        TransactionLog<NodeUpdate<byte[]>> transactionLog = transactionScope.transactionLog();
+
+        List<DocumentTreeEvent<byte[]>> eventsToPublish = Lists.newArrayList();
+        DocumentTreeEvent<byte[]> start = new DocumentTreeEvent<>(
+                DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
+                Type.TRANSACTION_START,
+                Optional.empty(),
+                Optional.empty());
+        eventsToPublish.add(start);
+
+        for (NodeUpdate<byte[]> record : transactionLog.records()) {
+            DocumentPath path = record.path();
+            checkState(preparedKeys.remove(path), "path is not prepared");
+
+            Versioned<byte[]> previousValue = null;
+            try {
+                previousValue = docTree.removeNode(path);
+            } catch (NoSuchDocumentPathException e) {
+                getLogger().info("Value is being inserted first time");
+            }
+
+            if (record.value() != null) {
+                if (docTree.create(path, record.value())) {
+                    Versioned<byte[]> newValue = docTree.get(path);
+                    eventsToPublish.add(new DocumentTreeEvent<>(
+                            path,
+                            Optional.ofNullable(newValue),
+                            Optional.ofNullable(previousValue)));
+                }
+            } else if (previousValue != null) {
+                eventsToPublish.add(new DocumentTreeEvent<>(
+                        path,
+                        Optional.empty(),
+                        Optional.of(previousValue)));
+            }
+        }
+
+        DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<byte[]>(
+                DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
+                Type.TRANSACTION_END,
+                Optional.empty(),
+                Optional.empty());
+        eventsToPublish.add(end);
+        publish(eventsToPublish);
+
+        return CommitResult.OK;
+    }
+
+    /**
+     * Handles an commit commit (ha!).
+     *
+     * @param commit transaction commit commit
+     * @return commit result
+     */
+    protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
+        TransactionId transactionId = commit.value().transactionId();
+        TransactionScope transactionScope = activeTransactions.remove(transactionId);
+        if (transactionScope == null) {
+            return CommitResult.UNKNOWN_TRANSACTION_ID;
+        }
+        try {
+            return commitTransaction(transactionScope);
+        } catch (Exception e) {
+            getLogger().warn("Failure applying {}", commit, e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    /**
+     * Handles an rollback commit (ha!).
+     *
+     * @param commit transaction rollback commit
+     * @return rollback result
+     */
+    protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
+        TransactionId transactionId = commit.value().transactionId();
+        TransactionScope transactionScope = activeTransactions.remove(transactionId);
+        if (transactionScope == null) {
+            return RollbackResult.UNKNOWN_TRANSACTION_ID;
+        } else if (!transactionScope.isPrepared()) {
+            return RollbackResult.OK;
+        } else {
+            transactionScope.transactionLog().records()
+                    .forEach(record -> {
+                        preparedKeys.remove(record.path());
+                    });
+            return RollbackResult.OK;
+        }
+
+    }
+
+    /**
+     * Map transaction scope.
+     */
+    private static final class TransactionScope {
+        private final long version;
+        private final TransactionLog<NodeUpdate<byte[]>> transactionLog;
+
+        private TransactionScope(long version) {
+            this(version, null);
+        }
+
+        private TransactionScope(long version, TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+            this.version = version;
+            this.transactionLog = transactionLog;
+        }
+
+        /**
+         * Returns the transaction version.
+         *
+         * @return the transaction version
+         */
+        long version() {
+            return version;
+        }
+
+        /**
+         * Returns whether this is a prepared transaction scope.
+         *
+         * @return whether this is a prepared transaction scope
+         */
+        boolean isPrepared() {
+            return transactionLog != null;
+        }
+
+        /**
+         * Returns the transaction commit log.
+         *
+         * @return the transaction commit log
+         */
+        TransactionLog<NodeUpdate<byte[]>> transactionLog() {
+            checkState(isPrepared());
+            return transactionLog;
+        }
+
+        /**
+         * Returns a new transaction scope with a prepare commit.
+         *
+         * @param commit the prepare commit
+         * @return new transaction scope updated with the prepare commit
+         */
+        TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
+            return new TransactionScope(version, commit.value().transactionLog());
+        }
+    }
+
+    private void publish(List<DocumentTreeEvent<byte[]>> events) {
+        listeners.values().forEach(session -> {
+            session.publish(CHANGE, events);
+        });
+    }
+
     private void notifyListeners(DocumentTreeEvent<byte[]> event) {
         listeners.values()
                 .stream()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeResult.java
index 3e30a00..bf544dd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeResult.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeResult.java
@@ -53,6 +53,10 @@
     }
 
     @SuppressWarnings("unchecked")
+    public static final DocumentTreeResult WRITE_LOCK =
+            new DocumentTreeResult(Status.WRITE_LOCK, null);
+
+    @SuppressWarnings("unchecked")
     public static final DocumentTreeResult INVALID_PATH =
             new DocumentTreeResult(Status.INVALID_PATH, null);
 
@@ -72,6 +76,17 @@
     }
 
     /**
+     * Returns a {@code WRITE_LOCK} error result.
+     *
+     * @param <V> the result value type
+     * @return write lock result
+     */
+    @SuppressWarnings("unchecked")
+    public static <V> DocumentTreeResult<V> writeLock() {
+        return WRITE_LOCK;
+    }
+
+    /**
      * Returns an {@code INVALID_PATH} result.
      *
      * @param <V> the result value type
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
index 5117934..18746a7 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
@@ -16,7 +16,9 @@
 
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -26,12 +28,16 @@
 import io.atomix.protocols.raft.proxy.RaftProxy;
 import io.atomix.protocols.raft.service.RaftService;
 import org.junit.Test;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.DocumentPath;
 import org.onosproject.store.service.DocumentTreeEvent;
 import org.onosproject.store.service.DocumentTreeListener;
 import org.onosproject.store.service.IllegalDocumentModificationException;
 import org.onosproject.store.service.NoSuchDocumentPathException;
 import org.onosproject.store.service.Ordering;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -409,6 +415,51 @@
         assertEquals(path("root.a.b.c"), event.path());
     }
 
+    @Test
+    public void testTransaction() throws Throwable {
+        String treeName = UUID.randomUUID().toString();
+        AtomixDocumentTree tree = newPrimitive(treeName);
+
+        byte[] value1 = "abc".getBytes();
+        byte[] value2 = "def".getBytes();
+
+        assertTrue(tree.create(path("root.a"), value1).join());
+        assertTrue(tree.create(path("root.b"), value2).join());
+
+        long aVersion = tree.get(path("root.a")).join().version();
+        long bVersion = tree.get(path("root.b")).join().version();
+
+        TransactionId transactionId = TransactionId.from("1");
+        Version transactionVersion = tree.begin(transactionId).join();
+        List<NodeUpdate<byte[]>> records = Arrays.asList(
+                NodeUpdate.<byte[]>newBuilder()
+                        .withType(NodeUpdate.Type.CREATE_NODE)
+                        .withPath(path("root.c"))
+                        .withValue(value1)
+                        .build(),
+                NodeUpdate.<byte[]>newBuilder()
+                        .withType(NodeUpdate.Type.UPDATE_NODE)
+                        .withPath(path("root.a"))
+                        .withValue(value2)
+                        .withVersion(aVersion)
+                        .build(),
+                NodeUpdate.<byte[]>newBuilder()
+                        .withType(NodeUpdate.Type.DELETE_NODE)
+                        .withPath(path("root.b"))
+                        .withVersion(bVersion)
+                        .build());
+        TransactionLog<NodeUpdate<byte[]>> transactionLog = new TransactionLog<>(
+                transactionId,
+                transactionVersion.value(),
+                records);
+        assertTrue(tree.prepare(transactionLog).join());
+        tree.commit(transactionId).join();
+
+        assertArrayEquals(value2, tree.get(path("root.a")).join().value());
+        assertNull(tree.get(path("root.b")).join());
+        assertArrayEquals(value1, tree.get(path("root.c")).join().value());
+    }
+
     private static class TestEventListener implements DocumentTreeListener<byte[]> {
 
         private final BlockingQueue<DocumentTreeEvent<byte[]>> queue;