ONOS-6381 Transactional event listeners
Change-Id: I8f279d78323dea467796e8d37e3117a407af9f76
diff --git a/apps/config/src/main/java/org/onosproject/config/impl/DistributedDynamicConfigStore.java b/apps/config/src/main/java/org/onosproject/config/impl/DistributedDynamicConfigStore.java
index c005aa8..8a84f06 100644
--- a/apps/config/src/main/java/org/onosproject/config/impl/DistributedDynamicConfigStore.java
+++ b/apps/config/src/main/java/org/onosproject/config/impl/DistributedDynamicConfigStore.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
new file mode 100644
index 0000000..5c21338
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/primitives/NodeUpdate.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.onlab.util.ByteArraySizeHashPrinter;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.store.service.DocumentPath;
+
+/**
+ * Map update operation.
+ *
+ * @param <V> map value type
+ */
+public final class NodeUpdate<V> {
+
+ /**
+ * Type of database update operation.
+ */
+ public enum Type {
+ /**
+ * Creates an entry if the current version matches specified version.
+ */
+ CREATE_NODE,
+ /**
+ * Updates an entry if the current version matches specified version.
+ */
+ UPDATE_NODE,
+ /**
+ * Deletes an entry if the current version matches specified version.
+ */
+ DELETE_NODE
+ }
+
+ private Type type;
+ private DocumentPath path;
+ private V value;
+ private long version = -1;
+
+ /**
+ * Returns the type of update operation.
+ * @return type of update.
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the item path being updated.
+ * @return item path
+ */
+ public DocumentPath path() {
+ return path;
+ }
+
+ /**
+ * Returns the new value.
+ * @return item's target value.
+ */
+ public V value() {
+ return value;
+ }
+
+ /**
+ * Returns the expected current version in the database for the key.
+ * @return expected version.
+ */
+ public long version() {
+ return version;
+ }
+
+ /**
+ * Transforms this instance into an instance of different paramterized types.
+ *
+ * @param valueMapper transcoder to value type
+ * @return new instance
+ * @param <T> value type of returned instance
+ */
+ public <T> NodeUpdate<T> map(Function<V, T> valueMapper) {
+ return NodeUpdate.<T>newBuilder()
+ .withType(type)
+ //.withKey(keyMapper.apply(key))
+ .withValue(value == null ? null : valueMapper.apply(value))
+ .withVersion(version)
+ .build();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, path, value, version);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof NodeUpdate) {
+ NodeUpdate that = (NodeUpdate) object;
+ return this.type == that.type
+ && Objects.equals(this.path, that.path)
+ && Objects.equals(this.value, that.value)
+ && Objects.equals(this.version, that.version);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("path", path)
+ .add("value", value instanceof byte[] ?
+ new ByteArraySizeHashPrinter((byte[]) value) : value)
+ .add("version", version)
+ .toString();
+ }
+
+ /**
+ * Creates a new builder instance.
+ *
+ * @param <V> value type
+ * @return builder.
+ */
+ public static <V> Builder<V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * NodeUpdate builder.
+ *
+ * @param <V> value type
+ */
+ public static final class Builder<V> {
+
+ private NodeUpdate<V> update = new NodeUpdate<>();
+
+ public NodeUpdate<V> build() {
+ validateInputs();
+ return update;
+ }
+
+ public Builder<V> withType(Type type) {
+ update.type = checkNotNull(type, "type cannot be null");
+ return this;
+ }
+
+ public Builder<V> withPath(DocumentPath key) {
+ update.path = checkNotNull(key, "key cannot be null");
+ return this;
+ }
+
+ public Builder<V> withValue(V value) {
+ update.value = value;
+ return this;
+ }
+
+ public Builder<V> withVersion(long version) {
+ update.version = version;
+ return this;
+ }
+
+ private void validateInputs() {
+ checkNotNull(update.type, "type must be specified");
+ switch (update.type) {
+ case CREATE_NODE:
+ checkNotNull(update.path, "key must be specified");
+ checkNotNull(update.value, "value must be specified.");
+ break;
+ case UPDATE_NODE:
+ checkNotNull(update.path, "key must be specified");
+ checkNotNull(update.value, "value must be specified.");
+ checkState(update.version >= 0, "version must be specified");
+ break;
+ case DELETE_NODE:
+ checkNotNull(update.path, "key must be specified");
+ checkState(update.version >= 0, "version must be specified");
+ break;
+ default:
+ throw new IllegalStateException("Unknown operation type");
+ }
+
+ }
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncDocumentTree.java b/core/api/src/main/java/org/onosproject/store/service/AsyncDocumentTree.java
index 1f0902c..46dcdd5 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncDocumentTree.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncDocumentTree.java
@@ -16,6 +16,8 @@
package org.onosproject.store.service;
+import org.onosproject.store.primitives.NodeUpdate;
+
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -29,7 +31,7 @@
* @param <V> document tree value type
*/
@NotThreadSafe
-public interface AsyncDocumentTree<V> extends DistributedPrimitive {
+public interface AsyncDocumentTree<V> extends DistributedPrimitive, Transactional<NodeUpdate<V>> {
@Override
default Type primitiveType() {
diff --git a/core/api/src/main/java/org/onosproject/store/service/DocumentTreeEvent.java b/core/api/src/main/java/org/onosproject/store/service/DocumentTreeEvent.java
index f640da8..81b28ac 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DocumentTreeEvent.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DocumentTreeEvent.java
@@ -44,7 +44,9 @@
/**
* Signifies an existing node being deleted.
*/
- DELETED
+ DELETED,
+ TRANSACTION_START,
+ TRANSACTION_END
}
private final DocumentPath path;
@@ -77,6 +79,22 @@
this.newValue = newValue;
this.oldValue = oldValue;
}
+ /**
+ * Constructs a new {@code DocumentTreeEvent}.
+ *
+ * @param path path to the node
+ * @param newValue optional new value; will be empty if node was deleted
+ * @param oldValue optional old value; will be empty if node was created
+ */
+ public DocumentTreeEvent(DocumentPath path,
+ Optional<Versioned<V>> newValue,
+ Optional<Versioned<V>> oldValue) {
+ this.path = path;
+ this.newValue = newValue;
+ this.oldValue = oldValue;
+ this.type = newValue != null ?
+ oldValue != null ? Type.UPDATED : Type.CREATED : Type.DELETED;
+ }
/**
* Returns the path to the changed node.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
index c9a9f31..8d4f9c3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
@@ -19,11 +19,15 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeListener;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
@@ -108,6 +112,33 @@
.thenApply(v -> v == null ? null : v.map(serializer::decode));
}
+
+ @Override
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return backingTree.begin(transactionId);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
+ return backingTree.prepare(transactionLog.map(record -> record.map(serializer::encode)));
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return backingTree.commit(transactionId);
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return backingTree.rollback(transactionId);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
+ return backingTree.prepareAndCommit(transactionLog.map(record -> record.map(serializer::encode)));
+ }
+
+
@Override
public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
synchronized (listeners) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
index 99f8df6..b8c1248 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
@@ -28,11 +28,18 @@
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onlab.util.Tools;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
@@ -41,16 +48,23 @@
import org.onosproject.store.service.IllegalDocumentModificationException;
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.ILLEGAL_MODIFICATION;
import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.INVALID_PATH;
import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.OK;
@@ -246,6 +260,56 @@
events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
}
+ @Override
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return proxy.<TransactionBegin, Long>invoke(
+ BEGIN,
+ SERIALIZER::encode,
+ new TransactionBegin(transactionId),
+ SERIALIZER::decode)
+ .thenApply(Version::new);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+ return proxy.<TransactionPrepare, PrepareResult>invoke(
+ PREPARE,
+ SERIALIZER::encode,
+ new TransactionPrepare(transactionLog),
+ SERIALIZER::decode)
+ .thenApply(v -> v == PrepareResult.OK);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+ return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
+ PREPARE_AND_COMMIT,
+ SERIALIZER::encode,
+ new TransactionPrepareAndCommit(transactionLog),
+ SERIALIZER::decode)
+ .thenApply(v -> v == PrepareResult.OK);
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return proxy.<TransactionCommit, CommitResult>invoke(
+ COMMIT,
+ SERIALIZER::encode,
+ new TransactionCommit(transactionId),
+ SERIALIZER::decode)
+ .thenApply(v -> null);
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return proxy.invoke(
+ ROLLBACK,
+ SERIALIZER::encode,
+ new TransactionRollback(transactionId),
+ SERIALIZER::decode)
+ .thenApply(v -> null);
+ }
+
private class InternalListener implements DocumentTreeListener<byte[]> {
private final DocumentPath path;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeOperations.java
index 718223e..d05215c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeOperations.java
@@ -24,8 +24,11 @@
import io.atomix.protocols.raft.operation.OperationType;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
/**
@@ -37,7 +40,12 @@
GET("incrementAndGet", OperationType.QUERY),
GET_CHILDREN("getAndIncrement", OperationType.QUERY),
UPDATE("addAndGet", OperationType.COMMAND),
- CLEAR("getAndAdd", OperationType.COMMAND);
+ CLEAR("getAndAdd", OperationType.COMMAND),
+ BEGIN("begin", OperationType.COMMAND),
+ PREPARE("prepare", OperationType.COMMAND),
+ PREPARE_AND_COMMIT("prepareAndCommit", OperationType.COMMAND),
+ COMMIT("commit", OperationType.COMMAND),
+ ROLLBACK("rollback", OperationType.COMMAND);
private final String id;
private final OperationType type;
@@ -66,6 +74,18 @@
.register(Get.class)
.register(GetChildren.class)
.register(Update.class)
+ .register(TransactionBegin.class)
+ .register(TransactionPrepare.class)
+ .register(TransactionPrepareAndCommit.class)
+ .register(TransactionCommit.class)
+ .register(TransactionRollback.class)
+ .register(TransactionId.class)
+ .register(TransactionLog.class)
+ .register(PrepareResult.class)
+ .register(CommitResult.class)
+ .register(RollbackResult.class)
+ .register(NodeUpdate.class)
+ .register(NodeUpdate.Type.class)
.register(DocumentPath.class)
.register(Match.class)
.register(Versioned.class)
@@ -74,13 +94,19 @@
.build("AtomixDocumentTreeOperations");
/**
- * Abstract DocumentTree command.
+ * Base class for document tree operations.
+ */
+ public abstract static class DocumentTreeOperation {
+ }
+
+ /**
+ * Base class for document tree operations that serialize a {@link DocumentPath}.
*/
@SuppressWarnings("serial")
- public abstract static class DocumentTreeOperation {
+ public abstract static class PathOperation extends DocumentTreeOperation {
private DocumentPath path;
- DocumentTreeOperation(DocumentPath path) {
+ PathOperation(DocumentPath path) {
this.path = path;
}
@@ -93,7 +119,7 @@
* DocumentTree#get query.
*/
@SuppressWarnings("serial")
- public static class Get extends DocumentTreeOperation {
+ public static class Get extends PathOperation {
public Get() {
super(null);
}
@@ -114,7 +140,7 @@
* DocumentTree#getChildren query.
*/
@SuppressWarnings("serial")
- public static class GetChildren extends DocumentTreeOperation {
+ public static class GetChildren extends PathOperation {
public GetChildren() {
super(null);
}
@@ -135,7 +161,7 @@
* DocumentTree update command.
*/
@SuppressWarnings("serial")
- public static class Update extends DocumentTreeOperation {
+ public static class Update extends PathOperation {
private Optional<byte[]> value;
private Match<byte[]> valueMatch;
private Match<Long> versionMatch;
@@ -181,7 +207,7 @@
* Change listen.
*/
@SuppressWarnings("serial")
- public static class Listen extends DocumentTreeOperation {
+ public static class Listen extends PathOperation {
public Listen() {
this(DocumentPath.from("root"));
}
@@ -202,7 +228,7 @@
* Change unlisten.
*/
@SuppressWarnings("serial")
- public static class Unlisten extends DocumentTreeOperation {
+ public static class Unlisten extends PathOperation {
public Unlisten() {
this(DocumentPath.from("root"));
}
@@ -218,4 +244,129 @@
.toString();
}
}
+
+ /**
+ * Transaction begin command.
+ */
+ public static class TransactionBegin extends PathOperation {
+ private TransactionId transactionId;
+
+ public TransactionBegin() {
+ super(null);
+ }
+
+ public TransactionBegin(TransactionId transactionId) {
+ super(DocumentPath.from(transactionId.toString()));
+ this.transactionId = transactionId;
+ }
+
+ public TransactionId transactionId() {
+ return transactionId;
+ }
+ }
+
+ /**
+ * Transaction prepare command.
+ */
+ @SuppressWarnings("serial")
+ public static class TransactionPrepare extends PathOperation {
+ private TransactionLog<NodeUpdate<byte[]>> transactionLog;
+
+ public TransactionPrepare() {
+ super(null);
+ }
+
+ public TransactionPrepare(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+ super(DocumentPath.from(transactionLog.transactionId().toString()));
+ this.transactionLog = transactionLog;
+ }
+
+ public TransactionLog<NodeUpdate<byte[]>> transactionLog() {
+ return transactionLog;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("transactionLog", transactionLog)
+ .toString();
+ }
+ }
+
+ /**
+ * Transaction prepareAndCommit command.
+ */
+ @SuppressWarnings("serial")
+ public static class TransactionPrepareAndCommit extends TransactionPrepare {
+ public TransactionPrepareAndCommit() {
+ }
+
+ public TransactionPrepareAndCommit(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+ super(transactionLog);
+ }
+ }
+
+ /**
+ * Transaction commit command.
+ */
+ @SuppressWarnings("serial")
+ public static class TransactionCommit extends PathOperation {
+ private TransactionId transactionId;
+
+ public TransactionCommit() {
+ super(null);
+ }
+
+ public TransactionCommit(TransactionId transactionId) {
+ super(DocumentPath.from(transactionId.toString()));
+ this.transactionId = transactionId;
+ }
+
+ /**
+ * Returns the transaction identifier.
+ * @return transaction id
+ */
+ public TransactionId transactionId() {
+ return transactionId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("transactionId", transactionId)
+ .toString();
+ }
+ }
+
+ /**
+ * Transaction rollback command.
+ */
+ @SuppressWarnings("serial")
+ public static class TransactionRollback extends PathOperation {
+ private TransactionId transactionId;
+
+ public TransactionRollback() {
+ super(null);
+ }
+
+ public TransactionRollback(TransactionId transactionId) {
+ super(DocumentPath.from(transactionId.toString()));
+ this.transactionId = transactionId;
+ }
+
+ /**
+ * Returns the transaction identifier.
+ * @return transaction id
+ */
+ public TransactionId transactionId() {
+ return transactionId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("transactionId", transactionId)
+ .toString();
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
index 9e87325..b87f784 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
@@ -19,10 +19,12 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -32,7 +34,9 @@
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
import io.atomix.protocols.raft.event.EventType;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
@@ -42,11 +46,19 @@
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
+
import org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DocumentPath;
@@ -57,15 +69,22 @@
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Ordering;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
+import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
/**
* State Machine for {@link AtomixDocumentTree} resource.
@@ -90,9 +109,12 @@
}, Listener.class)
.register(Versioned.class)
.register(DocumentPath.class)
- .register(new HashMap().keySet().getClass())
+ .register(new LinkedHashMap().keySet().getClass())
.register(TreeMap.class)
.register(Ordering.class)
+ .register(TransactionScope.class)
+ .register(TransactionLog.class)
+ .register(TransactionId.class)
.register(SessionListenCommits.class)
.register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
@Override
@@ -113,6 +135,8 @@
private Map<Long, SessionListenCommits> listeners = new HashMap<>();
private AtomicLong versionCounter = new AtomicLong(0);
private DocumentTree<byte[]> docTree;
+ private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+ private Set<DocumentPath> preparedKeys = Sets.newHashSet();
public AtomixDocumentTreeService(Ordering ordering) {
this.docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet, ordering);
@@ -123,6 +147,8 @@
writer.writeLong(versionCounter.get());
writer.writeObject(listeners, serializer::encode);
writer.writeObject(docTree, serializer::encode);
+ writer.writeObject(preparedKeys, serializer::encode);
+ writer.writeObject(activeTransactions, serializer::encode);
}
@Override
@@ -130,6 +156,8 @@
versionCounter = new AtomicLong(reader.readLong());
listeners = reader.readObject(serializer::decode);
docTree = reader.readObject(serializer::decode);
+ preparedKeys = reader.readObject(serializer::decode);
+ activeTransactions = reader.readObject(serializer::decode);
}
@Override
@@ -143,6 +171,21 @@
// commands
executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
executor.register(CLEAR, this::clear);
+ executor.register(BEGIN, serializer::decode, this::begin, serializer::encode);
+ executor.register(PREPARE, serializer::decode, this::prepare, serializer::encode);
+ executor.register(PREPARE_AND_COMMIT, serializer::decode, this::prepareAndCommit, serializer::encode);
+ executor.register(COMMIT, serializer::decode, this::commit, serializer::encode);
+ executor.register(ROLLBACK, serializer::decode, this::rollback, serializer::encode);
+ }
+
+ /**
+ * Returns a boolean indicating whether the given path is currently locked by a transaction.
+ *
+ * @param path the path to check
+ * @return whether the given path is locked by a running transaction
+ */
+ private boolean isLocked(DocumentPath path) {
+ return preparedKeys.contains(path);
}
protected void listen(Commit<? extends Listen> commit) {
@@ -179,7 +222,12 @@
protected DocumentTreeResult<Versioned<byte[]>> update(Commit<? extends Update> commit) {
DocumentTreeResult<Versioned<byte[]>> result = null;
DocumentPath path = commit.value().path();
- boolean updated = false;
+
+ // If the path is locked by a transaction, return a WRITE_LOCK error.
+ if (isLocked(path)) {
+ return DocumentTreeResult.writeLock();
+ }
+
Versioned<byte[]> currentValue = docTree.get(path);
try {
Match<Long> versionMatch = commit.value().versionMatch();
@@ -250,6 +298,251 @@
}
}
+ /**
+ * Handles a begin commit.
+ *
+ * @param commit transaction begin commit
+ * @return transaction state version
+ */
+ protected long begin(Commit<? extends TransactionBegin> commit) {
+ long version = commit.index();
+ activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
+ return version;
+ }
+
+ /**
+ * Handles an prepare commit.
+ *
+ * @param commit transaction prepare commit
+ * @return prepare result
+ */
+ protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
+ try {
+ TransactionLog<NodeUpdate<byte[]>> transactionLog = commit.value().transactionLog();
+ // Iterate through records in the transaction log and perform isolation checks.
+ for (NodeUpdate<byte[]> record : transactionLog.records()) {
+ DocumentPath path = record.path();
+
+ // If the prepared keys already contains the key contained within the record, that indicates a
+ // conflict with a concurrent transaction.
+ if (preparedKeys.contains(path)) {
+ return PrepareResult.CONCURRENT_TRANSACTION;
+ }
+
+ // Read the existing value from the map.
+ Versioned<byte[]> existingValue = docTree.get(path);
+
+ // If the update is an UPDATE_NODE or DELETE_NODE, verify that versions match.
+ switch (record.type()) {
+ case UPDATE_NODE:
+ case DELETE_NODE:
+ if (existingValue == null || existingValue.version() != record.version()) {
+ return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+ }
+ default:
+ break;
+ }
+ }
+
+ // No violations detected. Mark modified keys locked for transactions.
+ transactionLog.records().forEach(record -> {
+ preparedKeys.add(record.path());
+ });
+
+ // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
+ // coordinator is communicating with another node. Transactions assume that the client is communicating
+ // with a single leader in order to limit the overhead of retaining tombstones.
+ TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
+ if (transactionScope == null) {
+ activeTransactions.put(
+ transactionLog.transactionId(),
+ new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
+ return PrepareResult.PARTIAL_FAILURE;
+ } else {
+ activeTransactions.put(
+ transactionLog.transactionId(),
+ transactionScope.prepared(commit));
+ return PrepareResult.OK;
+ }
+ } catch (Exception e) {
+ getLogger().warn("Failure applying {}", commit, e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Handles an prepare and commit commit.
+ *
+ * @param commit transaction prepare and commit commit
+ * @return prepare result
+ */
+ protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
+ TransactionId transactionId = commit.value().transactionLog().transactionId();
+ PrepareResult prepareResult = prepare(commit);
+ TransactionScope transactionScope = activeTransactions.remove(transactionId);
+ if (prepareResult == PrepareResult.OK) {
+ transactionScope = transactionScope.prepared(commit);
+ commitTransaction(transactionScope);
+ }
+ return prepareResult;
+ }
+
+ /**
+ * Applies committed operations to the state machine.
+ */
+ private CommitResult commitTransaction(TransactionScope transactionScope) {
+ TransactionLog<NodeUpdate<byte[]>> transactionLog = transactionScope.transactionLog();
+
+ List<DocumentTreeEvent<byte[]>> eventsToPublish = Lists.newArrayList();
+ DocumentTreeEvent<byte[]> start = new DocumentTreeEvent<>(
+ DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
+ Type.TRANSACTION_START,
+ Optional.empty(),
+ Optional.empty());
+ eventsToPublish.add(start);
+
+ for (NodeUpdate<byte[]> record : transactionLog.records()) {
+ DocumentPath path = record.path();
+ checkState(preparedKeys.remove(path), "path is not prepared");
+
+ Versioned<byte[]> previousValue = null;
+ try {
+ previousValue = docTree.removeNode(path);
+ } catch (NoSuchDocumentPathException e) {
+ getLogger().info("Value is being inserted first time");
+ }
+
+ if (record.value() != null) {
+ if (docTree.create(path, record.value())) {
+ Versioned<byte[]> newValue = docTree.get(path);
+ eventsToPublish.add(new DocumentTreeEvent<>(
+ path,
+ Optional.ofNullable(newValue),
+ Optional.ofNullable(previousValue)));
+ }
+ } else if (previousValue != null) {
+ eventsToPublish.add(new DocumentTreeEvent<>(
+ path,
+ Optional.empty(),
+ Optional.of(previousValue)));
+ }
+ }
+
+ DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<byte[]>(
+ DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
+ Type.TRANSACTION_END,
+ Optional.empty(),
+ Optional.empty());
+ eventsToPublish.add(end);
+ publish(eventsToPublish);
+
+ return CommitResult.OK;
+ }
+
+ /**
+ * Handles an commit commit (ha!).
+ *
+ * @param commit transaction commit commit
+ * @return commit result
+ */
+ protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
+ TransactionId transactionId = commit.value().transactionId();
+ TransactionScope transactionScope = activeTransactions.remove(transactionId);
+ if (transactionScope == null) {
+ return CommitResult.UNKNOWN_TRANSACTION_ID;
+ }
+ try {
+ return commitTransaction(transactionScope);
+ } catch (Exception e) {
+ getLogger().warn("Failure applying {}", commit, e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Handles an rollback commit (ha!).
+ *
+ * @param commit transaction rollback commit
+ * @return rollback result
+ */
+ protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
+ TransactionId transactionId = commit.value().transactionId();
+ TransactionScope transactionScope = activeTransactions.remove(transactionId);
+ if (transactionScope == null) {
+ return RollbackResult.UNKNOWN_TRANSACTION_ID;
+ } else if (!transactionScope.isPrepared()) {
+ return RollbackResult.OK;
+ } else {
+ transactionScope.transactionLog().records()
+ .forEach(record -> {
+ preparedKeys.remove(record.path());
+ });
+ return RollbackResult.OK;
+ }
+
+ }
+
+ /**
+ * Map transaction scope.
+ */
+ private static final class TransactionScope {
+ private final long version;
+ private final TransactionLog<NodeUpdate<byte[]>> transactionLog;
+
+ private TransactionScope(long version) {
+ this(version, null);
+ }
+
+ private TransactionScope(long version, TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+ this.version = version;
+ this.transactionLog = transactionLog;
+ }
+
+ /**
+ * Returns the transaction version.
+ *
+ * @return the transaction version
+ */
+ long version() {
+ return version;
+ }
+
+ /**
+ * Returns whether this is a prepared transaction scope.
+ *
+ * @return whether this is a prepared transaction scope
+ */
+ boolean isPrepared() {
+ return transactionLog != null;
+ }
+
+ /**
+ * Returns the transaction commit log.
+ *
+ * @return the transaction commit log
+ */
+ TransactionLog<NodeUpdate<byte[]>> transactionLog() {
+ checkState(isPrepared());
+ return transactionLog;
+ }
+
+ /**
+ * Returns a new transaction scope with a prepare commit.
+ *
+ * @param commit the prepare commit
+ * @return new transaction scope updated with the prepare commit
+ */
+ TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
+ return new TransactionScope(version, commit.value().transactionLog());
+ }
+ }
+
+ private void publish(List<DocumentTreeEvent<byte[]>> events) {
+ listeners.values().forEach(session -> {
+ session.publish(CHANGE, events);
+ });
+ }
+
private void notifyListeners(DocumentTreeEvent<byte[]> event) {
listeners.values()
.stream()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeResult.java
index 3e30a00..bf544dd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeResult.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeResult.java
@@ -53,6 +53,10 @@
}
@SuppressWarnings("unchecked")
+ public static final DocumentTreeResult WRITE_LOCK =
+ new DocumentTreeResult(Status.WRITE_LOCK, null);
+
+ @SuppressWarnings("unchecked")
public static final DocumentTreeResult INVALID_PATH =
new DocumentTreeResult(Status.INVALID_PATH, null);
@@ -72,6 +76,17 @@
}
/**
+ * Returns a {@code WRITE_LOCK} error result.
+ *
+ * @param <V> the result value type
+ * @return write lock result
+ */
+ @SuppressWarnings("unchecked")
+ public static <V> DocumentTreeResult<V> writeLock() {
+ return WRITE_LOCK;
+ }
+
+ /**
* Returns an {@code INVALID_PATH} result.
*
* @param <V> the result value type
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
index 5117934..18746a7 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
@@ -16,7 +16,9 @@
package org.onosproject.store.primitives.resources.impl;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
@@ -26,12 +28,16 @@
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.protocols.raft.service.RaftService;
import org.junit.Test;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeListener;
import org.onosproject.store.service.IllegalDocumentModificationException;
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Ordering;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import static org.junit.Assert.assertArrayEquals;
@@ -409,6 +415,51 @@
assertEquals(path("root.a.b.c"), event.path());
}
+ @Test
+ public void testTransaction() throws Throwable {
+ String treeName = UUID.randomUUID().toString();
+ AtomixDocumentTree tree = newPrimitive(treeName);
+
+ byte[] value1 = "abc".getBytes();
+ byte[] value2 = "def".getBytes();
+
+ assertTrue(tree.create(path("root.a"), value1).join());
+ assertTrue(tree.create(path("root.b"), value2).join());
+
+ long aVersion = tree.get(path("root.a")).join().version();
+ long bVersion = tree.get(path("root.b")).join().version();
+
+ TransactionId transactionId = TransactionId.from("1");
+ Version transactionVersion = tree.begin(transactionId).join();
+ List<NodeUpdate<byte[]>> records = Arrays.asList(
+ NodeUpdate.<byte[]>newBuilder()
+ .withType(NodeUpdate.Type.CREATE_NODE)
+ .withPath(path("root.c"))
+ .withValue(value1)
+ .build(),
+ NodeUpdate.<byte[]>newBuilder()
+ .withType(NodeUpdate.Type.UPDATE_NODE)
+ .withPath(path("root.a"))
+ .withValue(value2)
+ .withVersion(aVersion)
+ .build(),
+ NodeUpdate.<byte[]>newBuilder()
+ .withType(NodeUpdate.Type.DELETE_NODE)
+ .withPath(path("root.b"))
+ .withVersion(bVersion)
+ .build());
+ TransactionLog<NodeUpdate<byte[]>> transactionLog = new TransactionLog<>(
+ transactionId,
+ transactionVersion.value(),
+ records);
+ assertTrue(tree.prepare(transactionLog).join());
+ tree.commit(transactionId).join();
+
+ assertArrayEquals(value2, tree.get(path("root.a")).join().value());
+ assertNull(tree.get(path("root.b")).join());
+ assertArrayEquals(value1, tree.get(path("root.c")).join().value());
+ }
+
private static class TestEventListener implements DocumentTreeListener<byte[]> {
private final BlockingQueue<DocumentTreeEvent<byte[]>> queue;