ONOS-6381 Transactional event listeners
Change-Id: I8f279d78323dea467796e8d37e3117a407af9f76
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
index 9e87325..b87f784 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
@@ -19,10 +19,12 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -32,7 +34,9 @@
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
import io.atomix.protocols.raft.event.EventType;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
@@ -42,11 +46,19 @@
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
+
import org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DocumentPath;
@@ -57,15 +69,22 @@
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Ordering;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
+import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
-import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
/**
* State Machine for {@link AtomixDocumentTree} resource.
@@ -90,9 +109,12 @@
}, Listener.class)
.register(Versioned.class)
.register(DocumentPath.class)
- .register(new HashMap().keySet().getClass())
+ .register(new LinkedHashMap().keySet().getClass())
.register(TreeMap.class)
.register(Ordering.class)
+ .register(TransactionScope.class)
+ .register(TransactionLog.class)
+ .register(TransactionId.class)
.register(SessionListenCommits.class)
.register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
@Override
@@ -113,6 +135,8 @@
private Map<Long, SessionListenCommits> listeners = new HashMap<>();
private AtomicLong versionCounter = new AtomicLong(0);
private DocumentTree<byte[]> docTree;
+ private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+ private Set<DocumentPath> preparedKeys = Sets.newHashSet();
public AtomixDocumentTreeService(Ordering ordering) {
this.docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet, ordering);
@@ -123,6 +147,8 @@
writer.writeLong(versionCounter.get());
writer.writeObject(listeners, serializer::encode);
writer.writeObject(docTree, serializer::encode);
+ writer.writeObject(preparedKeys, serializer::encode);
+ writer.writeObject(activeTransactions, serializer::encode);
}
@Override
@@ -130,6 +156,8 @@
versionCounter = new AtomicLong(reader.readLong());
listeners = reader.readObject(serializer::decode);
docTree = reader.readObject(serializer::decode);
+ preparedKeys = reader.readObject(serializer::decode);
+ activeTransactions = reader.readObject(serializer::decode);
}
@Override
@@ -143,6 +171,21 @@
// commands
executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
executor.register(CLEAR, this::clear);
+ executor.register(BEGIN, serializer::decode, this::begin, serializer::encode);
+ executor.register(PREPARE, serializer::decode, this::prepare, serializer::encode);
+ executor.register(PREPARE_AND_COMMIT, serializer::decode, this::prepareAndCommit, serializer::encode);
+ executor.register(COMMIT, serializer::decode, this::commit, serializer::encode);
+ executor.register(ROLLBACK, serializer::decode, this::rollback, serializer::encode);
+ }
+
+ /**
+ * Returns a boolean indicating whether the given path is currently locked by a transaction.
+ *
+ * @param path the path to check
+ * @return whether the given path is locked by a running transaction
+ */
+ private boolean isLocked(DocumentPath path) {
+ return preparedKeys.contains(path);
}
protected void listen(Commit<? extends Listen> commit) {
@@ -179,7 +222,12 @@
protected DocumentTreeResult<Versioned<byte[]>> update(Commit<? extends Update> commit) {
DocumentTreeResult<Versioned<byte[]>> result = null;
DocumentPath path = commit.value().path();
- boolean updated = false;
+
+ // If the path is locked by a transaction, return a WRITE_LOCK error.
+ if (isLocked(path)) {
+ return DocumentTreeResult.writeLock();
+ }
+
Versioned<byte[]> currentValue = docTree.get(path);
try {
Match<Long> versionMatch = commit.value().versionMatch();
@@ -250,6 +298,251 @@
}
}
+ /**
+ * Handles a begin commit.
+ *
+ * @param commit transaction begin commit
+ * @return transaction state version
+ */
+ protected long begin(Commit<? extends TransactionBegin> commit) {
+ long version = commit.index();
+ activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
+ return version;
+ }
+
+ /**
+ * Handles an prepare commit.
+ *
+ * @param commit transaction prepare commit
+ * @return prepare result
+ */
+ protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
+ try {
+ TransactionLog<NodeUpdate<byte[]>> transactionLog = commit.value().transactionLog();
+ // Iterate through records in the transaction log and perform isolation checks.
+ for (NodeUpdate<byte[]> record : transactionLog.records()) {
+ DocumentPath path = record.path();
+
+ // If the prepared keys already contains the key contained within the record, that indicates a
+ // conflict with a concurrent transaction.
+ if (preparedKeys.contains(path)) {
+ return PrepareResult.CONCURRENT_TRANSACTION;
+ }
+
+ // Read the existing value from the map.
+ Versioned<byte[]> existingValue = docTree.get(path);
+
+ // If the update is an UPDATE_NODE or DELETE_NODE, verify that versions match.
+ switch (record.type()) {
+ case UPDATE_NODE:
+ case DELETE_NODE:
+ if (existingValue == null || existingValue.version() != record.version()) {
+ return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+ }
+ default:
+ break;
+ }
+ }
+
+ // No violations detected. Mark modified keys locked for transactions.
+ transactionLog.records().forEach(record -> {
+ preparedKeys.add(record.path());
+ });
+
+ // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
+ // coordinator is communicating with another node. Transactions assume that the client is communicating
+ // with a single leader in order to limit the overhead of retaining tombstones.
+ TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
+ if (transactionScope == null) {
+ activeTransactions.put(
+ transactionLog.transactionId(),
+ new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
+ return PrepareResult.PARTIAL_FAILURE;
+ } else {
+ activeTransactions.put(
+ transactionLog.transactionId(),
+ transactionScope.prepared(commit));
+ return PrepareResult.OK;
+ }
+ } catch (Exception e) {
+ getLogger().warn("Failure applying {}", commit, e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Handles an prepare and commit commit.
+ *
+ * @param commit transaction prepare and commit commit
+ * @return prepare result
+ */
+ protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
+ TransactionId transactionId = commit.value().transactionLog().transactionId();
+ PrepareResult prepareResult = prepare(commit);
+ TransactionScope transactionScope = activeTransactions.remove(transactionId);
+ if (prepareResult == PrepareResult.OK) {
+ transactionScope = transactionScope.prepared(commit);
+ commitTransaction(transactionScope);
+ }
+ return prepareResult;
+ }
+
+ /**
+ * Applies committed operations to the state machine.
+ */
+ private CommitResult commitTransaction(TransactionScope transactionScope) {
+ TransactionLog<NodeUpdate<byte[]>> transactionLog = transactionScope.transactionLog();
+
+ List<DocumentTreeEvent<byte[]>> eventsToPublish = Lists.newArrayList();
+ DocumentTreeEvent<byte[]> start = new DocumentTreeEvent<>(
+ DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
+ Type.TRANSACTION_START,
+ Optional.empty(),
+ Optional.empty());
+ eventsToPublish.add(start);
+
+ for (NodeUpdate<byte[]> record : transactionLog.records()) {
+ DocumentPath path = record.path();
+ checkState(preparedKeys.remove(path), "path is not prepared");
+
+ Versioned<byte[]> previousValue = null;
+ try {
+ previousValue = docTree.removeNode(path);
+ } catch (NoSuchDocumentPathException e) {
+ getLogger().info("Value is being inserted first time");
+ }
+
+ if (record.value() != null) {
+ if (docTree.create(path, record.value())) {
+ Versioned<byte[]> newValue = docTree.get(path);
+ eventsToPublish.add(new DocumentTreeEvent<>(
+ path,
+ Optional.ofNullable(newValue),
+ Optional.ofNullable(previousValue)));
+ }
+ } else if (previousValue != null) {
+ eventsToPublish.add(new DocumentTreeEvent<>(
+ path,
+ Optional.empty(),
+ Optional.of(previousValue)));
+ }
+ }
+
+ DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<byte[]>(
+ DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
+ Type.TRANSACTION_END,
+ Optional.empty(),
+ Optional.empty());
+ eventsToPublish.add(end);
+ publish(eventsToPublish);
+
+ return CommitResult.OK;
+ }
+
+ /**
+ * Handles an commit commit (ha!).
+ *
+ * @param commit transaction commit commit
+ * @return commit result
+ */
+ protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
+ TransactionId transactionId = commit.value().transactionId();
+ TransactionScope transactionScope = activeTransactions.remove(transactionId);
+ if (transactionScope == null) {
+ return CommitResult.UNKNOWN_TRANSACTION_ID;
+ }
+ try {
+ return commitTransaction(transactionScope);
+ } catch (Exception e) {
+ getLogger().warn("Failure applying {}", commit, e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Handles an rollback commit (ha!).
+ *
+ * @param commit transaction rollback commit
+ * @return rollback result
+ */
+ protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
+ TransactionId transactionId = commit.value().transactionId();
+ TransactionScope transactionScope = activeTransactions.remove(transactionId);
+ if (transactionScope == null) {
+ return RollbackResult.UNKNOWN_TRANSACTION_ID;
+ } else if (!transactionScope.isPrepared()) {
+ return RollbackResult.OK;
+ } else {
+ transactionScope.transactionLog().records()
+ .forEach(record -> {
+ preparedKeys.remove(record.path());
+ });
+ return RollbackResult.OK;
+ }
+
+ }
+
+ /**
+ * Map transaction scope.
+ */
+ private static final class TransactionScope {
+ private final long version;
+ private final TransactionLog<NodeUpdate<byte[]>> transactionLog;
+
+ private TransactionScope(long version) {
+ this(version, null);
+ }
+
+ private TransactionScope(long version, TransactionLog<NodeUpdate<byte[]>> transactionLog) {
+ this.version = version;
+ this.transactionLog = transactionLog;
+ }
+
+ /**
+ * Returns the transaction version.
+ *
+ * @return the transaction version
+ */
+ long version() {
+ return version;
+ }
+
+ /**
+ * Returns whether this is a prepared transaction scope.
+ *
+ * @return whether this is a prepared transaction scope
+ */
+ boolean isPrepared() {
+ return transactionLog != null;
+ }
+
+ /**
+ * Returns the transaction commit log.
+ *
+ * @return the transaction commit log
+ */
+ TransactionLog<NodeUpdate<byte[]>> transactionLog() {
+ checkState(isPrepared());
+ return transactionLog;
+ }
+
+ /**
+ * Returns a new transaction scope with a prepare commit.
+ *
+ * @param commit the prepare commit
+ * @return new transaction scope updated with the prepare commit
+ */
+ TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
+ return new TransactionScope(version, commit.value().transactionLog());
+ }
+ }
+
+ private void publish(List<DocumentTreeEvent<byte[]>> events) {
+ listeners.values().forEach(session -> {
+ session.publish(CHANGE, events);
+ });
+ }
+
private void notifyListeners(DocumentTreeEvent<byte[]> event) {
listeners.values()
.stream()