Added distributed transaction support through a two phase commit protocol
Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
index 21e3bb4..7d6ad4d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
@@ -35,6 +35,12 @@
}
/**
+ * ConsistentMap update conflicts with an in flight transaction.
+ */
+ public static class ConcurrentModification extends ConsistentMapException {
+ }
+
+ /**
* ConsistentMap operation interrupted.
*/
public static class Interrupted extends ConsistentMapException {
diff --git a/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
new file mode 100644
index 0000000..fd4373e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Database update operation.
+ *
+ */
+public final class DatabaseUpdate {
+
+ /**
+ * Type of database update operation.
+ */
+ public static enum Type {
+ /**
+ * Insert/Update entry without any checks.
+ */
+ PUT,
+ /**
+ * Insert an entry iff there is no existing entry for that key.
+ */
+ PUT_IF_ABSENT,
+
+ /**
+ * Update entry if the current version matches specified version.
+ */
+ PUT_IF_VERSION_MATCH,
+
+ /**
+ * Update entry if the current value matches specified value.
+ */
+ PUT_IF_VALUE_MATCH,
+
+ /**
+ * Remove entry without any checks.
+ */
+ REMOVE,
+
+ /**
+ * Remove entry if the current version matches specified version.
+ */
+ REMOVE_IF_VERSION_MATCH,
+
+ /**
+ * Remove entry if the current value matches specified value.
+ */
+ REMOVE_IF_VALUE_MATCH,
+ }
+
+ private Type type;
+ private String tableName;
+ private String key;
+ private byte[] value;
+ private byte[] currentValue;
+ private long currentVersion = -1;
+
+ /**
+ * Returns the type of update operation.
+ * @return type of update.
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the tableName being updated.
+ * @return table name.
+ */
+ public String tableName() {
+ return tableName;
+ }
+
+ /**
+ * Returns the item key being updated.
+ * @return item key
+ */
+ public String key() {
+ return key;
+ }
+
+ /**
+ * Returns the new value.
+ * @return item's target value.
+ */
+ public byte[] value() {
+ return value;
+ }
+
+ /**
+ * Returns the expected current value in the database value for the key.
+ * @return current value in database.
+ */
+ public byte[] currentValue() {
+ return currentValue;
+ }
+
+ /**
+ * Returns the expected current version in the database for the key.
+ * @return expected version.
+ */
+ public long currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("tableName", tableName)
+ .add("key", key)
+ .add("value", value)
+ .add("currentValue", currentValue)
+ .add("currentVersion", currentVersion)
+ .toString();
+ }
+
+ /**
+ * Creates a new builder instance.
+ *
+ * @return builder.
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * DatabaseUpdate builder.
+ *
+ */
+ public static final class Builder {
+
+ private DatabaseUpdate update = new DatabaseUpdate();
+
+ public DatabaseUpdate build() {
+ validateInputs();
+ return update;
+ }
+
+ public Builder withType(Type type) {
+ update.type = checkNotNull(type, "type cannot be null");
+ return this;
+ }
+
+ public Builder withTableName(String tableName) {
+ update.tableName = checkNotNull(tableName, "tableName cannot be null");
+ return this;
+ }
+
+ public Builder withKey(String key) {
+ update.key = checkNotNull(key, "key cannot be null");
+ return this;
+ }
+
+ public Builder withCurrentValue(byte[] value) {
+ update.currentValue = checkNotNull(value, "currentValue cannot be null");
+ return this;
+ }
+
+ public Builder withValue(byte[] value) {
+ update.value = checkNotNull(value, "value cannot be null");
+ return this;
+ }
+
+ public Builder withCurrentVersion(long version) {
+ checkArgument(version >= 0, "version cannot be negative");
+ update.currentVersion = version;
+ return this;
+ }
+
+ private void validateInputs() {
+ checkNotNull(update.type, "type must be specified");
+ checkNotNull(update.tableName, "table name must be specified");
+ checkNotNull(update.key, "key must be specified");
+ switch (update.type) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ checkNotNull(update.value, "value must be specified.");
+ break;
+ case PUT_IF_VERSION_MATCH:
+ checkNotNull(update.value, "value must be specified.");
+ checkState(update.currentVersion >= 0, "current version must be specified");
+ break;
+ case PUT_IF_VALUE_MATCH:
+ checkNotNull(update.value, "value must be specified.");
+ checkNotNull(update.currentValue, "currentValue must be specified.");
+ break;
+ case REMOVE:
+ break;
+ case REMOVE_IF_VERSION_MATCH:
+ checkState(update.currentVersion >= 0, "current version must be specified");
+ break;
+ case REMOVE_IF_VALUE_MATCH:
+ checkNotNull(update.currentValue, "currentValue must be specified.");
+ break;
+ default:
+ throw new IllegalStateException("Unknown operation type");
+ }
+ }
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
index 572867c..eb129fe 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.service;
+import java.util.Collection;
import java.util.List;
/**
@@ -35,4 +36,16 @@
* @return list of map information
*/
List<MapInfo> getMapInfo();
+
+ /**
+ * Returns all the transactions in the system.
+ *
+ * @return collection of transactions
+ */
+ Collection<Transaction> getTransactions();
+
+ /**
+ * Redrives stuck transactions while removing those that are done.
+ */
+ void redriveTransactions();
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index e165a95..5ea0420 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -29,13 +29,6 @@
public interface StorageService {
/**
- * Creates a new transaction context.
- *
- * @return transaction context
- */
- TransactionContext createTransactionContext();
-
- /**
* Creates a new EventuallyConsistentMapBuilder.
*
* @param <K> key type
@@ -45,11 +38,11 @@
<K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
/**
- * Creates a new EventuallyConsistentMapBuilder.
+ * Creates a new ConsistentMapBuilder.
*
* @param <K> key type
* @param <V> value type
- * @return builder for an eventually consistent map
+ * @return builder for a consistent map
*/
<K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
@@ -60,4 +53,11 @@
* @return builder for an distributed set
*/
<E> SetBuilder<E> setBuilder();
-}
\ No newline at end of file
+
+ /**
+ * Creates a new transaction context.
+ *
+ * @return transaction context
+ */
+ TransactionContext createTransactionContext();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Transaction.java b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
new file mode 100644
index 0000000..514e1ab
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.List;
+
+/**
+ * An immutable transaction object.
+ */
+public interface Transaction {
+
+ public enum State {
+ /**
+ * Indicates a new transaction that is about to be prepared. All transactions
+ * start their life in this state.
+ */
+ PREPARING,
+
+ /**
+ * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+ */
+ PREPARED,
+
+ /**
+ * Indicates a transaction that is about to be committed.
+ */
+ COMMITTING,
+
+ /**
+ * Indicates a transaction that has successfully committed.
+ */
+ COMMITTED,
+
+ /**
+ * Indicates a transaction that is about to be rolled back.
+ */
+ ROLLINGBACK,
+
+ /**
+ * Indicates a transaction that has been rolled back and all locks are released.
+ */
+ ROLLEDBACK
+ }
+
+ /**
+ * Returns the transaction Id.
+ *
+ * @return transaction id
+ */
+ long id();
+
+ /**
+ * Returns the list of updates that are part of this transaction.
+ *
+ * @return list of database updates
+ */
+ List<DatabaseUpdate> updates();
+
+ /**
+ * Returns the current state of this transaction.
+ *
+ * @return transaction state
+ */
+ State state();
+
+ /**
+ * Returns true if this transaction has completed execution.
+ *
+ * @return true is yes, false otherwise
+ */
+ public default boolean isDone() {
+ return state() == State.COMMITTED || state() == State.ROLLEDBACK;
+ }
+
+ /**
+ * Returns a new transaction that is created by transitioning this one to the specified state.
+ *
+ * @param newState destination state
+ * @return a new transaction instance similar to the current one but its state set to specified state
+ */
+ Transaction transition(State newState);
+
+ /**
+ * Returns the system time when the transaction was last updated.
+ *
+ * @return last update time
+ */
+ public long lastUpdated();
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
index b404bae..94942e2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
@@ -19,21 +19,31 @@
/**
* Provides a context for transactional operations.
* <p>
- * A transaction context provides a boundary within which transactions
- * are run. It also is a place where all modifications made within a transaction
- * are cached until the point when the transaction commits or aborts. It thus ensures
- * isolation of work happening with in the transaction boundary.
- * <p>
* A transaction context is a vehicle for grouping operations into a unit with the
* properties of atomicity, isolation, and durability. Transactions also provide the
* ability to maintain an application's invariants or integrity constraints,
* supporting the property of consistency. Together these properties are known as ACID.
+ * <p>
+ * A transaction context provides a boundary within which transactions
+ * are run. It also is a place where all modifications made within a transaction
+ * are cached until the point when the transaction commits or aborts. It thus ensures
+ * isolation of work happening with in the transaction boundary. Within a transaction
+ * context isolation level is REPEATABLE_READS i.e. only data that is committed can be read.
+ * The only uncommitted data that can be read is the data modified by the current transaction.
*/
public interface TransactionContext {
/**
+ * Returns the unique transactionId.
+ *
+ * @return transaction id
+ */
+ long transactionId();
+
+ /**
* Returns if this transaction context is open.
- * @return true if open, false otherwise.
+ *
+ * @return true if open, false otherwise
*/
boolean isOpen();
@@ -45,22 +55,24 @@
/**
* Commits a transaction that was previously started thereby making its changes permanent
* and externally visible.
- * @throws TransactionException if transaction fails to commit.
+ *
+ * @throws TransactionException if transaction fails to commit
*/
void commit();
/**
- * Rolls back the current transaction, discarding all its changes.
+ * Aborts any changes made in this transaction context and discarding all locally cached updates.
*/
- void rollback();
+ void abort();
/**
- * Creates a new transactional map.
+ * Returns a transactional map data structure with the specified name.
+ *
* @param <K> key type
* @param <V> value type
- * @param mapName name of the transactional map.
- * @param serializer serializer to use for encoding/decoding keys and vaulues.
- * @return new Transactional Map.
+ * @param mapName name of the transactional map
+ * @param serializer serializer to use for encoding/decoding keys and values of the map
+ * @return Transactional Map
*/
- <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName, Serializer serializer);
+ <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, Serializer serializer);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionException.java b/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
index 8261fbd..6135394 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
@@ -41,8 +41,14 @@
}
/**
- * Transaction failure due to optimistic concurrency failure.
+ * Transaction failure due to optimistic concurrency violation.
*/
public static class OptimisticConcurrencyFailure extends TransactionException {
}
+
+ /**
+ * Transaction failure due to a conflicting transaction in progress.
+ */
+ public static class ConcurrentModification extends TransactionException {
+ }
}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java b/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
index c59600c..ebc6611 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
@@ -16,15 +16,12 @@
package org.onosproject.store.service;
-import java.util.Collection;
-import java.util.Set;
-import java.util.Map.Entry;
/**
* Transactional Map data structure.
* <p>
- * A TransactionalMap is created by invoking {@link TransactionContext#createTransactionalMap createTransactionalMap}
- * method. All operations performed on this map with in a transaction boundary are invisible externally
+ * A TransactionalMap is created by invoking {@link TransactionContext#getTransactionalMap getTransactionalMap}
+ * method. All operations performed on this map within a transaction boundary are invisible externally
* until the point when the transaction commits. A commit usually succeeds in the absence of conflicts.
*
* @param <K> type of key.
@@ -33,36 +30,6 @@
public interface TransactionalMap<K, V> {
/**
- * Returns the number of entries in the map.
- *
- * @return map size.
- */
- int size();
-
- /**
- * Returns true if the map is empty.
- *
- * @return true if map has no entries, false otherwise.
- */
- boolean isEmpty();
-
- /**
- * Returns true if this map contains a mapping for the specified key.
- *
- * @param key key
- * @return true if map contains key, false otherwise.
- */
- boolean containsKey(K key);
-
- /**
- * Returns true if this map contains the specified value.
- *
- * @param value value
- * @return true if map contains value, false otherwise.
- */
- boolean containsValue(V value);
-
- /**
* Returns the value to which the specified key is mapped, or null if this
* map contains no mapping for the key.
*
@@ -94,45 +61,6 @@
V remove(K key);
/**
- * Removes all of the mappings from this map (optional operation).
- * The map will be empty after this call returns.
- */
- void clear();
-
- /**
- * Returns a Set view of the keys contained in this map.
- * This method differs from the behavior of java.util.Map.keySet() in that
- * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
- * Attempts to modify the returned set, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return a set of the keys contained in this map
- */
- Set<K> keySet();
-
- /**
- * Returns the collection of values contained in this map.
- * This method differs from the behavior of java.util.Map.values() in that
- * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
- * Attempts to modify the returned collection, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return a collection of the values contained in this map
- */
- Collection<V> values();
-
- /**
- * Returns the set of entries contained in this map.
- * This method differs from the behavior of java.util.Map.entrySet() in that
- * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
- * Attempts to modify the returned set, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return set of entries contained in this map.
- */
- Set<Entry<K, V>> entrySet();
-
- /**
* If the specified key is not already associated with a value
* associates it with the given value and returns null, else returns the current value.
*
diff --git a/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java b/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
deleted file mode 100644
index b4b05fc..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Database update operation.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-public class UpdateOperation<K, V> {
-
- /**
- * Type of database update operation.
- */
- public static enum Type {
- PUT,
- PUT_IF_ABSENT,
- PUT_IF_VERSION_MATCH,
- PUT_IF_VALUE_MATCH,
- REMOVE,
- REMOVE_IF_VERSION_MATCH,
- REMOVE_IF_VALUE_MATCH,
- }
-
- private Type type;
- private String tableName;
- private K key;
- private V value;
- private V currentValue;
- private long currentVersion = -1;
-
- /**
- * Returns the type of update operation.
- * @return type of update.
- */
- public Type type() {
- return type;
- }
-
- /**
- * Returns the tableName being updated.
- * @return table name.
- */
- public String tableName() {
- return tableName;
- }
-
- /**
- * Returns the item key being updated.
- * @return item key
- */
- public K key() {
- return key;
- }
-
- /**
- * Returns the new value.
- * @return item's target value.
- */
- public V value() {
- return value;
- }
-
- /**
- * Returns the expected current value in the database value for the key.
- * @return current value in database.
- */
- public V currentValue() {
- return currentValue;
- }
-
- /**
- * Returns the expected current version in the database for the key.
- * @return expected version.
- */
- public long currentVersion() {
- return currentVersion;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("type", type)
- .add("tableName", tableName)
- .add("key", key)
- .add("value", value)
- .add("currentValue", currentValue)
- .add("currentVersion", currentVersion)
- .toString();
- }
-
- /**
- * Creates a new builder instance.
- * @param <K> key type.
- * @param <V> value type.
- *
- * @return builder.
- */
- public static <K, V> Builder<K, V> newBuilder() {
- return new Builder<>();
- }
-
- /**
- * UpdatOperation builder.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
- public static final class Builder<K, V> {
-
- private UpdateOperation<K, V> operation = new UpdateOperation<>();
-
- public UpdateOperation<K, V> build() {
- validateInputs();
- return operation;
- }
-
- public Builder<K, V> withType(Type type) {
- operation.type = checkNotNull(type, "type cannot be null");
- return this;
- }
-
- public Builder<K, V> withTableName(String tableName) {
- operation.tableName = checkNotNull(tableName, "tableName cannot be null");
- return this;
- }
-
- public Builder<K, V> withKey(K key) {
- operation.key = checkNotNull(key, "key cannot be null");
- return this;
- }
-
- public Builder<K, V> withCurrentValue(V value) {
- operation.currentValue = checkNotNull(value, "currentValue cannot be null");
- return this;
- }
-
- public Builder<K, V> withValue(V value) {
- operation.value = checkNotNull(value, "value cannot be null");
- return this;
- }
-
- public Builder<K, V> withCurrentVersion(long version) {
- checkArgument(version >= 0, "version cannot be negative");
- operation.currentVersion = version;
- return this;
- }
-
- private void validateInputs() {
- checkNotNull(operation.type, "type must be specified");
- checkNotNull(operation.tableName, "table name must be specified");
- checkNotNull(operation.key, "key must be specified");
- switch (operation.type) {
- case PUT:
- case PUT_IF_ABSENT:
- checkNotNull(operation.value, "value must be specified.");
- break;
- case PUT_IF_VERSION_MATCH:
- checkNotNull(operation.value, "value must be specified.");
- checkState(operation.currentVersion >= 0, "current version must be specified");
- break;
- case PUT_IF_VALUE_MATCH:
- checkNotNull(operation.value, "value must be specified.");
- checkNotNull(operation.currentValue, "currentValue must be specified.");
- break;
- case REMOVE:
- break;
- case REMOVE_IF_VERSION_MATCH:
- checkState(operation.currentVersion >= 0, "current version must be specified");
- break;
- case REMOVE_IF_VALUE_MATCH:
- checkNotNull(operation.currentValue, "currentValue must be specified.");
- break;
- default:
- throw new IllegalStateException("Unknown operation type");
- }
- }
- }
-}