[ONOS-6342] Refactor transaction architecture to support a shared cache for transactional primitives
Change-Id: I2a17965100895f5aa4d2202028047bb980c11d26
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index be5f05a..82b3919 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Laboratory
+ * Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,88 +16,263 @@
package org.onosproject.store.primitives.impl;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Transactional;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.TransactionContext;
+import org.onosproject.store.service.TransactionException;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
/**
- * An immutable transaction object.
+ * Manages a transaction within the context of a single primitive.
+ * <p>
+ * The {@code Transaction} object is used to manage the transaction for a single partition primitive that implements
+ * the {@link Transactional} interface. It's used as a proxy for {@link TransactionContext}s to manage the transaction
+ * as it relates to a single piece of atomic state.
*/
-public class Transaction {
+public class Transaction<T> {
+ /**
+ * Transaction state.
+ * <p>
+ * The transaction state is used to indicate the phase within which the transaction is currently running.
+ */
enum State {
+
/**
- * Indicates a new transaction that is about to be prepared. All transactions
- * start their life in this state.
+ * Active transaction state.
+ * <p>
+ * The {@code ACTIVE} state represents a transaction in progress. Active transactions may or may not affect
+ * concurrently running transactions depending on the transaction's isolation level.
+ */
+ ACTIVE,
+
+ /**
+ * Preparing transaction state.
+ * <p>
+ * Once a transaction commitment begins, it enters the {@code PREPARING} phase of the two-phase commit protocol.
*/
PREPARING,
/**
- * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+ * Prepared transaction state.
+ * <p>
+ * Once the first phase of the two-phase commit protocol is complete, the transaction's state is set to
+ * {@code PREPARED}.
*/
PREPARED,
/**
- * Indicates a transaction that is about to be committed.
+ * Committing transaction state.
+ * <p>
+ * The {@code COMMITTING} state represents a transaction within the second phase of the two-phase commit
+ * protocol.
*/
COMMITTING,
/**
- * Indicates a transaction that has successfully committed.
+ * Committed transaction state.
+ * <p>
+ * Once the second phase of the two-phase commit protocol is complete, the transaction's state is set to
+ * {@code COMMITTED}.
*/
COMMITTED,
/**
- * Indicates a transaction that is about to be rolled back.
+ * Rolling back transaction state.
+ * <p>
+ * In the event of a two-phase lock failure, when the transaction is rolled back it will enter the
+ * {@code ROLLING_BACK} state while the rollback is in progress.
*/
- ROLLINGBACK,
+ ROLLING_BACK,
/**
- * Indicates a transaction that has been rolled back and all locks are released.
+ * Rolled back transaction state.
+ * <p>
+ * Once a transaction has been rolled back, it will enter the {@code ROLLED_BACK} state.
*/
- ROLLEDBACK
+ ROLLED_BACK,
}
- private final TransactionId transactionId;
- private final List<MapUpdate<String, byte[]>> updates;
- private final State state;
+ private static final String TX_OPEN_ERROR = "transaction already open";
+ private static final String TX_CLOSED_ERROR = "transaction not open";
+ private static final String TX_INACTIVE_ERROR = "transaction is not active";
+ private static final String TX_UNPREPARED_ERROR = "transaction has not been prepared";
- public Transaction(TransactionId transactionId, List<MapUpdate<String, byte[]>> updates) {
- this(transactionId, updates, State.PREPARING);
- }
+ protected final TransactionId transactionId;
+ protected final Transactional<T> transactionalObject;
+ private final AtomicBoolean open = new AtomicBoolean();
+ private volatile State state = State.ACTIVE;
- private Transaction(TransactionId transactionId,
- List<MapUpdate<String, byte[]>> updates,
- State state) {
+ public Transaction(TransactionId transactionId, Transactional<T> transactionalObject) {
this.transactionId = transactionId;
- this.updates = ImmutableList.copyOf(updates);
- this.state = state;
+ this.transactionalObject = transactionalObject;
}
- public TransactionId id() {
+ /**
+ * Returns the transaction identifier.
+ *
+ * @return the transaction identifier
+ */
+ public TransactionId transactionId() {
return transactionId;
}
- public List<MapUpdate<String, byte[]>> updates() {
- return updates;
- }
-
+ /**
+ * Returns the current transaction state.
+ *
+ * @return the current transaction state
+ */
public State state() {
return state;
}
- public Transaction transition(State newState) {
- return new Transaction(transactionId, updates, newState);
+ /**
+ * Returns a boolean indicating whether the transaction is open.
+ *
+ * @return indicates whether the transaction is open
+ */
+ public boolean isOpen() {
+ return open.get();
+ }
+
+ /**
+ * Opens the transaction, throwing an {@link IllegalStateException} if it's already open.
+ */
+ protected void open() {
+ if (!open.compareAndSet(false, true)) {
+ throw new IllegalStateException(TX_OPEN_ERROR);
+ }
+ }
+
+ /**
+ * Checks that the transaction is open and throws an {@link IllegalStateException} if not.
+ */
+ protected void checkOpen() {
+ checkState(isOpen(), TX_CLOSED_ERROR);
+ }
+
+ /**
+ * Checks that the transaction state is {@code ACTIVE} and throws an {@link IllegalStateException} if not.
+ */
+ protected void checkActive() {
+ checkState(state == State.ACTIVE, TX_INACTIVE_ERROR);
+ }
+
+ /**
+ * Checks that the transaction state is {@code PREPARED} and throws an {@link IllegalStateException} if not.
+ */
+ protected void checkPrepared() {
+ checkState(state == State.PREPARED, TX_UNPREPARED_ERROR);
+ }
+
+ /**
+ * Updates the transaction state.
+ *
+ * @param state the updated transaction state
+ */
+ protected void setState(State state) {
+ this.state = state;
+ }
+
+ /**
+ * Begins the transaction.
+ * <p>
+ * Locks are acquired when the transaction is begun to prevent concurrent transactions from operating on the shared
+ * resource to which this transaction relates.
+ *
+ * @return a completable future to be completed once the transaction has been started
+ */
+ public CompletableFuture<Version> begin() {
+ open();
+ return transactionalObject.begin(transactionId);
+ }
+
+ /**
+ * Prepares the transaction.
+ * <p>
+ * When preparing the transaction, the given list of updates for the shared resource will be prepared, and
+ * concurrent modification checks will be performed. The returned future may be completed with a
+ * {@link TransactionException} if a concurrent modification is detected for an isolation level that does
+ * not allow such modifications.
+ *
+ * @param updates the transaction updates
+ * @return a completable future to be completed once the transaction has been prepared
+ */
+ public CompletableFuture<Boolean> prepare(List<T> updates) {
+ checkOpen();
+ checkActive();
+ setState(State.PREPARING);
+ return transactionalObject.prepare(new TransactionLog<T>(transactionId, updates))
+ .thenApply(succeeded -> {
+ setState(State.PREPARED);
+ return succeeded;
+ });
+ }
+
+ /**
+ * Prepares and commits the transaction in a single atomic operation.
+ * <p>
+ * Both the prepare and commit phases of the protocol must be executed within a single atomic operation. This method
+ * is used to optimize committing transactions that operate only on a single partition within a single primitive.
+ *
+ * @param updates the transaction updates
+ * @return a completable future to be completed once the transaction has been prepared
+ */
+ public CompletableFuture<Boolean> prepareAndCommit(List<T> updates) {
+ checkOpen();
+ checkActive();
+ setState(State.PREPARING);
+ return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, updates))
+ .thenApply(succeeded -> {
+ setState(State.COMMITTED);
+ return succeeded;
+ });
+ }
+
+ /**
+ * Commits the transaction.
+ * <p>
+ * Performs the second phase of the two-phase commit protocol, committing the previously
+ * {@link #prepare(List) prepared} updates.
+ *
+ * @return a completable future to be completed once the transaction has been committed
+ */
+ public CompletableFuture<Void> commit() {
+ checkOpen();
+ checkPrepared();
+ setState(State.COMMITTING);
+ return transactionalObject.commit(transactionId).thenRun(() -> {
+ setState(State.COMMITTED);
+ });
+ }
+
+ /**
+ * Rolls back the transaction.
+ * <p>
+ * Rolls back the first phase of the two-phase commit protocol, cancelling prepared updates.
+ *
+ * @return a completable future to be completed once the transaction has been rolled back
+ */
+ public CompletableFuture<Void> rollback() {
+ checkOpen();
+ checkPrepared();
+ setState(State.ROLLING_BACK);
+ return transactionalObject.rollback(transactionId).thenRun(() -> {
+ setState(State.ROLLED_BACK);
+ });
}
@Override
public String toString() {
- return MoreObjects.toStringHelper(getClass())
+ return toStringHelper(this)
.add("transactionId", transactionId)
- .add("updates", updates)
.add("state", state)
.toString();
}