Added distributed transaction support through a two phase commit protocol
Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java b/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java
new file mode 100644
index 0000000..6013a38
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cli.net;
+
+import java.util.Collection;
+
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onlab.util.Tools;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.store.service.StorageAdminService;
+import org.onosproject.store.service.Transaction;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * CLI to work with database transactions in the system.
+ */
+@Command(scope = "onos", name = "transactions",
+ description = "Utility for viewing and redriving database transactions")
+public class TransactionsCommand extends AbstractShellCommand {
+
+ @Option(name = "-r", aliases = "--redrive",
+ description = "Redrive stuck transactions while removing those that are done",
+ required = false, multiValued = false)
+ private boolean redrive = false;
+
+ private static final String FMT = "%-20s %-15s %-10s";
+
+ /**
+ * Displays transactions as text.
+ *
+ * @param transactions transactions
+ */
+ private void displayTransactions(Collection<Transaction> transactions) {
+ print("---------------------------------------------");
+ print(FMT, "Id", "State", "Updated");
+ print("---------------------------------------------");
+ transactions.forEach(txn -> print(FMT, txn.id(), txn.state(), Tools.timeAgo(txn.lastUpdated())));
+ if (transactions.size() > 0) {
+ print("---------------------------------------------");
+ }
+ }
+
+ /**
+ * Converts collection of transactions into a JSON object.
+ *
+ * @param transactions transactions
+ */
+ private JsonNode json(Collection<Transaction> transactions) {
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode txns = mapper.createArrayNode();
+
+ // Create a JSON node for each transaction
+ transactions.stream().forEach(txn -> {
+ ObjectNode txnNode = mapper.createObjectNode();
+ txnNode.put("id", txn.id())
+ .put("state", txn.state().toString())
+ .put("lastUpdated", txn.lastUpdated());
+ txns.add(txnNode);
+ });
+
+ return txns;
+ }
+
+ @Override
+ protected void execute() {
+ StorageAdminService storageAdminService = get(StorageAdminService.class);
+
+ if (redrive) {
+ storageAdminService.redriveTransactions();
+ return;
+ }
+
+ Collection<Transaction> transactions = storageAdminService.getTransactions();
+ if (outputJson()) {
+ print("%s", json(transactions));
+ } else {
+ displayTransactions(transactions);
+ }
+ }
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 4c0497b..98328b6 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -233,6 +233,9 @@
<action class="org.onosproject.cli.net.MapsListCommand"/>
</command>
<command>
+ <action class="org.onosproject.cli.net.TransactionsCommand"/>
+ </command>
+ <command>
<action class="org.onosproject.cli.net.ClusterDevicesCommand"/>
<completers>
<ref component-id="clusterIdCompleter"/>
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
index 21e3bb4..7d6ad4d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
@@ -35,6 +35,12 @@
}
/**
+ * ConsistentMap update conflicts with an in flight transaction.
+ */
+ public static class ConcurrentModification extends ConsistentMapException {
+ }
+
+ /**
* ConsistentMap operation interrupted.
*/
public static class Interrupted extends ConsistentMapException {
diff --git a/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
new file mode 100644
index 0000000..fd4373e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Database update operation.
+ *
+ */
+public final class DatabaseUpdate {
+
+ /**
+ * Type of database update operation.
+ */
+ public static enum Type {
+ /**
+ * Insert/Update entry without any checks.
+ */
+ PUT,
+ /**
+ * Insert an entry iff there is no existing entry for that key.
+ */
+ PUT_IF_ABSENT,
+
+ /**
+ * Update entry if the current version matches specified version.
+ */
+ PUT_IF_VERSION_MATCH,
+
+ /**
+ * Update entry if the current value matches specified value.
+ */
+ PUT_IF_VALUE_MATCH,
+
+ /**
+ * Remove entry without any checks.
+ */
+ REMOVE,
+
+ /**
+ * Remove entry if the current version matches specified version.
+ */
+ REMOVE_IF_VERSION_MATCH,
+
+ /**
+ * Remove entry if the current value matches specified value.
+ */
+ REMOVE_IF_VALUE_MATCH,
+ }
+
+ private Type type;
+ private String tableName;
+ private String key;
+ private byte[] value;
+ private byte[] currentValue;
+ private long currentVersion = -1;
+
+ /**
+ * Returns the type of update operation.
+ * @return type of update.
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the tableName being updated.
+ * @return table name.
+ */
+ public String tableName() {
+ return tableName;
+ }
+
+ /**
+ * Returns the item key being updated.
+ * @return item key
+ */
+ public String key() {
+ return key;
+ }
+
+ /**
+ * Returns the new value.
+ * @return item's target value.
+ */
+ public byte[] value() {
+ return value;
+ }
+
+ /**
+ * Returns the expected current value in the database value for the key.
+ * @return current value in database.
+ */
+ public byte[] currentValue() {
+ return currentValue;
+ }
+
+ /**
+ * Returns the expected current version in the database for the key.
+ * @return expected version.
+ */
+ public long currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("tableName", tableName)
+ .add("key", key)
+ .add("value", value)
+ .add("currentValue", currentValue)
+ .add("currentVersion", currentVersion)
+ .toString();
+ }
+
+ /**
+ * Creates a new builder instance.
+ *
+ * @return builder.
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * DatabaseUpdate builder.
+ *
+ */
+ public static final class Builder {
+
+ private DatabaseUpdate update = new DatabaseUpdate();
+
+ public DatabaseUpdate build() {
+ validateInputs();
+ return update;
+ }
+
+ public Builder withType(Type type) {
+ update.type = checkNotNull(type, "type cannot be null");
+ return this;
+ }
+
+ public Builder withTableName(String tableName) {
+ update.tableName = checkNotNull(tableName, "tableName cannot be null");
+ return this;
+ }
+
+ public Builder withKey(String key) {
+ update.key = checkNotNull(key, "key cannot be null");
+ return this;
+ }
+
+ public Builder withCurrentValue(byte[] value) {
+ update.currentValue = checkNotNull(value, "currentValue cannot be null");
+ return this;
+ }
+
+ public Builder withValue(byte[] value) {
+ update.value = checkNotNull(value, "value cannot be null");
+ return this;
+ }
+
+ public Builder withCurrentVersion(long version) {
+ checkArgument(version >= 0, "version cannot be negative");
+ update.currentVersion = version;
+ return this;
+ }
+
+ private void validateInputs() {
+ checkNotNull(update.type, "type must be specified");
+ checkNotNull(update.tableName, "table name must be specified");
+ checkNotNull(update.key, "key must be specified");
+ switch (update.type) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ checkNotNull(update.value, "value must be specified.");
+ break;
+ case PUT_IF_VERSION_MATCH:
+ checkNotNull(update.value, "value must be specified.");
+ checkState(update.currentVersion >= 0, "current version must be specified");
+ break;
+ case PUT_IF_VALUE_MATCH:
+ checkNotNull(update.value, "value must be specified.");
+ checkNotNull(update.currentValue, "currentValue must be specified.");
+ break;
+ case REMOVE:
+ break;
+ case REMOVE_IF_VERSION_MATCH:
+ checkState(update.currentVersion >= 0, "current version must be specified");
+ break;
+ case REMOVE_IF_VALUE_MATCH:
+ checkNotNull(update.currentValue, "currentValue must be specified.");
+ break;
+ default:
+ throw new IllegalStateException("Unknown operation type");
+ }
+ }
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
index 572867c..eb129fe 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.service;
+import java.util.Collection;
import java.util.List;
/**
@@ -35,4 +36,16 @@
* @return list of map information
*/
List<MapInfo> getMapInfo();
+
+ /**
+ * Returns all the transactions in the system.
+ *
+ * @return collection of transactions
+ */
+ Collection<Transaction> getTransactions();
+
+ /**
+ * Redrives stuck transactions while removing those that are done.
+ */
+ void redriveTransactions();
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index e165a95..5ea0420 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -29,13 +29,6 @@
public interface StorageService {
/**
- * Creates a new transaction context.
- *
- * @return transaction context
- */
- TransactionContext createTransactionContext();
-
- /**
* Creates a new EventuallyConsistentMapBuilder.
*
* @param <K> key type
@@ -45,11 +38,11 @@
<K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
/**
- * Creates a new EventuallyConsistentMapBuilder.
+ * Creates a new ConsistentMapBuilder.
*
* @param <K> key type
* @param <V> value type
- * @return builder for an eventually consistent map
+ * @return builder for a consistent map
*/
<K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
@@ -60,4 +53,11 @@
* @return builder for an distributed set
*/
<E> SetBuilder<E> setBuilder();
-}
\ No newline at end of file
+
+ /**
+ * Creates a new transaction context.
+ *
+ * @return transaction context
+ */
+ TransactionContext createTransactionContext();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Transaction.java b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
new file mode 100644
index 0000000..514e1ab
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.List;
+
+/**
+ * An immutable transaction object.
+ */
+public interface Transaction {
+
+ public enum State {
+ /**
+ * Indicates a new transaction that is about to be prepared. All transactions
+ * start their life in this state.
+ */
+ PREPARING,
+
+ /**
+ * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+ */
+ PREPARED,
+
+ /**
+ * Indicates a transaction that is about to be committed.
+ */
+ COMMITTING,
+
+ /**
+ * Indicates a transaction that has successfully committed.
+ */
+ COMMITTED,
+
+ /**
+ * Indicates a transaction that is about to be rolled back.
+ */
+ ROLLINGBACK,
+
+ /**
+ * Indicates a transaction that has been rolled back and all locks are released.
+ */
+ ROLLEDBACK
+ }
+
+ /**
+ * Returns the transaction Id.
+ *
+ * @return transaction id
+ */
+ long id();
+
+ /**
+ * Returns the list of updates that are part of this transaction.
+ *
+ * @return list of database updates
+ */
+ List<DatabaseUpdate> updates();
+
+ /**
+ * Returns the current state of this transaction.
+ *
+ * @return transaction state
+ */
+ State state();
+
+ /**
+ * Returns true if this transaction has completed execution.
+ *
+ * @return true is yes, false otherwise
+ */
+ public default boolean isDone() {
+ return state() == State.COMMITTED || state() == State.ROLLEDBACK;
+ }
+
+ /**
+ * Returns a new transaction that is created by transitioning this one to the specified state.
+ *
+ * @param newState destination state
+ * @return a new transaction instance similar to the current one but its state set to specified state
+ */
+ Transaction transition(State newState);
+
+ /**
+ * Returns the system time when the transaction was last updated.
+ *
+ * @return last update time
+ */
+ public long lastUpdated();
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
index b404bae..94942e2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
@@ -19,21 +19,31 @@
/**
* Provides a context for transactional operations.
* <p>
- * A transaction context provides a boundary within which transactions
- * are run. It also is a place where all modifications made within a transaction
- * are cached until the point when the transaction commits or aborts. It thus ensures
- * isolation of work happening with in the transaction boundary.
- * <p>
* A transaction context is a vehicle for grouping operations into a unit with the
* properties of atomicity, isolation, and durability. Transactions also provide the
* ability to maintain an application's invariants or integrity constraints,
* supporting the property of consistency. Together these properties are known as ACID.
+ * <p>
+ * A transaction context provides a boundary within which transactions
+ * are run. It also is a place where all modifications made within a transaction
+ * are cached until the point when the transaction commits or aborts. It thus ensures
+ * isolation of work happening with in the transaction boundary. Within a transaction
+ * context isolation level is REPEATABLE_READS i.e. only data that is committed can be read.
+ * The only uncommitted data that can be read is the data modified by the current transaction.
*/
public interface TransactionContext {
/**
+ * Returns the unique transactionId.
+ *
+ * @return transaction id
+ */
+ long transactionId();
+
+ /**
* Returns if this transaction context is open.
- * @return true if open, false otherwise.
+ *
+ * @return true if open, false otherwise
*/
boolean isOpen();
@@ -45,22 +55,24 @@
/**
* Commits a transaction that was previously started thereby making its changes permanent
* and externally visible.
- * @throws TransactionException if transaction fails to commit.
+ *
+ * @throws TransactionException if transaction fails to commit
*/
void commit();
/**
- * Rolls back the current transaction, discarding all its changes.
+ * Aborts any changes made in this transaction context and discarding all locally cached updates.
*/
- void rollback();
+ void abort();
/**
- * Creates a new transactional map.
+ * Returns a transactional map data structure with the specified name.
+ *
* @param <K> key type
* @param <V> value type
- * @param mapName name of the transactional map.
- * @param serializer serializer to use for encoding/decoding keys and vaulues.
- * @return new Transactional Map.
+ * @param mapName name of the transactional map
+ * @param serializer serializer to use for encoding/decoding keys and values of the map
+ * @return Transactional Map
*/
- <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName, Serializer serializer);
+ <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, Serializer serializer);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionException.java b/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
index 8261fbd..6135394 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
@@ -41,8 +41,14 @@
}
/**
- * Transaction failure due to optimistic concurrency failure.
+ * Transaction failure due to optimistic concurrency violation.
*/
public static class OptimisticConcurrencyFailure extends TransactionException {
}
+
+ /**
+ * Transaction failure due to a conflicting transaction in progress.
+ */
+ public static class ConcurrentModification extends TransactionException {
+ }
}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java b/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
index c59600c..ebc6611 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
@@ -16,15 +16,12 @@
package org.onosproject.store.service;
-import java.util.Collection;
-import java.util.Set;
-import java.util.Map.Entry;
/**
* Transactional Map data structure.
* <p>
- * A TransactionalMap is created by invoking {@link TransactionContext#createTransactionalMap createTransactionalMap}
- * method. All operations performed on this map with in a transaction boundary are invisible externally
+ * A TransactionalMap is created by invoking {@link TransactionContext#getTransactionalMap getTransactionalMap}
+ * method. All operations performed on this map within a transaction boundary are invisible externally
* until the point when the transaction commits. A commit usually succeeds in the absence of conflicts.
*
* @param <K> type of key.
@@ -33,36 +30,6 @@
public interface TransactionalMap<K, V> {
/**
- * Returns the number of entries in the map.
- *
- * @return map size.
- */
- int size();
-
- /**
- * Returns true if the map is empty.
- *
- * @return true if map has no entries, false otherwise.
- */
- boolean isEmpty();
-
- /**
- * Returns true if this map contains a mapping for the specified key.
- *
- * @param key key
- * @return true if map contains key, false otherwise.
- */
- boolean containsKey(K key);
-
- /**
- * Returns true if this map contains the specified value.
- *
- * @param value value
- * @return true if map contains value, false otherwise.
- */
- boolean containsValue(V value);
-
- /**
* Returns the value to which the specified key is mapped, or null if this
* map contains no mapping for the key.
*
@@ -94,45 +61,6 @@
V remove(K key);
/**
- * Removes all of the mappings from this map (optional operation).
- * The map will be empty after this call returns.
- */
- void clear();
-
- /**
- * Returns a Set view of the keys contained in this map.
- * This method differs from the behavior of java.util.Map.keySet() in that
- * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
- * Attempts to modify the returned set, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return a set of the keys contained in this map
- */
- Set<K> keySet();
-
- /**
- * Returns the collection of values contained in this map.
- * This method differs from the behavior of java.util.Map.values() in that
- * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
- * Attempts to modify the returned collection, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return a collection of the values contained in this map
- */
- Collection<V> values();
-
- /**
- * Returns the set of entries contained in this map.
- * This method differs from the behavior of java.util.Map.entrySet() in that
- * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
- * Attempts to modify the returned set, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return set of entries contained in this map.
- */
- Set<Entry<K, V>> entrySet();
-
- /**
* If the specified key is not already associated with a value
* associates it with the given value and returns null, else returns the current value.
*
diff --git a/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java b/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
deleted file mode 100644
index b4b05fc..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Database update operation.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-public class UpdateOperation<K, V> {
-
- /**
- * Type of database update operation.
- */
- public static enum Type {
- PUT,
- PUT_IF_ABSENT,
- PUT_IF_VERSION_MATCH,
- PUT_IF_VALUE_MATCH,
- REMOVE,
- REMOVE_IF_VERSION_MATCH,
- REMOVE_IF_VALUE_MATCH,
- }
-
- private Type type;
- private String tableName;
- private K key;
- private V value;
- private V currentValue;
- private long currentVersion = -1;
-
- /**
- * Returns the type of update operation.
- * @return type of update.
- */
- public Type type() {
- return type;
- }
-
- /**
- * Returns the tableName being updated.
- * @return table name.
- */
- public String tableName() {
- return tableName;
- }
-
- /**
- * Returns the item key being updated.
- * @return item key
- */
- public K key() {
- return key;
- }
-
- /**
- * Returns the new value.
- * @return item's target value.
- */
- public V value() {
- return value;
- }
-
- /**
- * Returns the expected current value in the database value for the key.
- * @return current value in database.
- */
- public V currentValue() {
- return currentValue;
- }
-
- /**
- * Returns the expected current version in the database for the key.
- * @return expected version.
- */
- public long currentVersion() {
- return currentVersion;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("type", type)
- .add("tableName", tableName)
- .add("key", key)
- .add("value", value)
- .add("currentValue", currentValue)
- .add("currentVersion", currentVersion)
- .toString();
- }
-
- /**
- * Creates a new builder instance.
- * @param <K> key type.
- * @param <V> value type.
- *
- * @return builder.
- */
- public static <K, V> Builder<K, V> newBuilder() {
- return new Builder<>();
- }
-
- /**
- * UpdatOperation builder.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
- public static final class Builder<K, V> {
-
- private UpdateOperation<K, V> operation = new UpdateOperation<>();
-
- public UpdateOperation<K, V> build() {
- validateInputs();
- return operation;
- }
-
- public Builder<K, V> withType(Type type) {
- operation.type = checkNotNull(type, "type cannot be null");
- return this;
- }
-
- public Builder<K, V> withTableName(String tableName) {
- operation.tableName = checkNotNull(tableName, "tableName cannot be null");
- return this;
- }
-
- public Builder<K, V> withKey(K key) {
- operation.key = checkNotNull(key, "key cannot be null");
- return this;
- }
-
- public Builder<K, V> withCurrentValue(V value) {
- operation.currentValue = checkNotNull(value, "currentValue cannot be null");
- return this;
- }
-
- public Builder<K, V> withValue(V value) {
- operation.value = checkNotNull(value, "value cannot be null");
- return this;
- }
-
- public Builder<K, V> withCurrentVersion(long version) {
- checkArgument(version >= 0, "version cannot be negative");
- operation.currentVersion = version;
- return this;
- }
-
- private void validateInputs() {
- checkNotNull(operation.type, "type must be specified");
- checkNotNull(operation.tableName, "table name must be specified");
- checkNotNull(operation.key, "key must be specified");
- switch (operation.type) {
- case PUT:
- case PUT_IF_ABSENT:
- checkNotNull(operation.value, "value must be specified.");
- break;
- case PUT_IF_VERSION_MATCH:
- checkNotNull(operation.value, "value must be specified.");
- checkState(operation.currentVersion >= 0, "current version must be specified");
- break;
- case PUT_IF_VALUE_MATCH:
- checkNotNull(operation.value, "value must be specified.");
- checkNotNull(operation.currentValue, "currentValue must be specified.");
- break;
- case REMOVE:
- break;
- case REMOVE_IF_VERSION_MATCH:
- checkState(operation.currentVersion >= 0, "current version must be specified");
- break;
- case REMOVE_IF_VALUE_MATCH:
- checkNotNull(operation.currentValue, "currentValue must be specified.");
- break;
- default:
- throw new IllegalStateException("Unknown operation type");
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 010276a..363f330 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -34,6 +34,7 @@
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -41,6 +42,7 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.DistributedClusterStore;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -53,11 +55,13 @@
import org.onosproject.store.service.SetBuilder;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContext;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -92,6 +96,9 @@
private PartitionedDatabase partitionedDatabase;
private Database inMemoryDatabase;
+ private TransactionManager transactionManager;
+ private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -188,6 +195,7 @@
Thread.currentThread().interrupt();
log.warn("Failed to complete database initialization.");
}
+ transactionManager = new TransactionManager(partitionedDatabase);
log.info("Started");
}
@@ -218,7 +226,7 @@
@Override
public TransactionContext createTransactionContext() {
- return new DefaultTransactionContext(partitionedDatabase);
+ return new DefaultTransactionContext(partitionedDatabase, transactionIdGenerator.getNewId());
}
@Override
@@ -331,6 +339,11 @@
.collect(Collectors.toList());
}
+ @Override
+ public Collection<Transaction> getTransactions() {
+ return complete(transactionManager.getTransactions());
+ }
+
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -343,4 +356,9 @@
throw new ConsistentMapException(e.getCause());
}
}
+
+ @Override
+ public void redriveTransactions() {
+ getTransactions().stream().forEach(transactionManager::execute);
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 3ea06fb..3578525 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -17,12 +17,11 @@
package org.onosproject.store.consistent.impl;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
/**
@@ -87,7 +86,7 @@
* @param value The value to set.
* @return A completable future to be completed with the result once complete.
*/
- CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
+ CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value);
/**
* Removes a value from the table.
@@ -96,7 +95,7 @@
* @param key The key to remove.
* @return A completable future to be completed with the result once complete.
*/
- CompletableFuture<Versioned<V>> remove(String tableName, K key);
+ CompletableFuture<Result<Versioned<V>>> remove(String tableName, K key);
/**
* Clears the table.
@@ -104,7 +103,7 @@
* @param tableName table name
* @return A completable future to be completed with the result once complete.
*/
- CompletableFuture<Void> clear(String tableName);
+ CompletableFuture<Result<Void>> clear(String tableName);
/**
* Gets a set of keys in the table.
@@ -138,7 +137,7 @@
* @param value The value to set if the given key does not exist.
* @return A completable future to be completed with the result once complete.
*/
- CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
+ CompletableFuture<Result<Versioned<V>>> putIfAbsent(String tableName, K key, V value);
/**
* Removes a key and if the existing value for that key matches the specified value.
@@ -148,7 +147,7 @@
* @param value The value to remove.
* @return A completable future to be completed with the result once complete.
*/
- CompletableFuture<Boolean> remove(String tableName, K key, V value);
+ CompletableFuture<Result<Boolean>> remove(String tableName, K key, V value);
/**
* Removes a key and if the existing version for that key matches the specified version.
@@ -158,7 +157,7 @@
* @param version The expected version.
* @return A completable future to be completed with the result once complete.
*/
- CompletableFuture<Boolean> remove(String tableName, K key, long version);
+ CompletableFuture<Result<Boolean>> remove(String tableName, K key, long version);
/**
* Replaces the entry for the specified key only if currently mapped to the specified value.
@@ -169,7 +168,7 @@
* @param newValue The value with which to replace the given key and value.
* @return A completable future to be completed with the result once complete.
*/
- CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
+ CompletableFuture<Result<Boolean>> replace(String tableName, K key, V oldValue, V newValue);
/**
* Replaces the entry for the specified key only if currently mapped to the specified version.
@@ -180,14 +179,42 @@
* @param newValue The value with which to replace the given key and version.
* @return A completable future to be completed with the result once complete.
*/
- CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
+ CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
/**
- * Perform a atomic batch update operation i.e. either all operations in batch succeed or
- * none do and no state changes are made.
+ * Prepare and commit the specified transaction.
*
- * @param updates list of updates to apply atomically.
- * @return A completable future to be completed with the result once complete.
+ * @param transaction transaction to commit (after preparation)
+ * @return A completable future to be completed with the result once complete
*/
- CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
+ CompletableFuture<Boolean> prepareAndCommit(Transaction transaction);
+
+ /**
+ * Prepare the specified transaction for commit. A successful prepare implies
+ * all the affected resources are locked thus ensuring no concurrent updates can interfere.
+ *
+ * @param transaction transaction to prepare (for commit)
+ * @return A completable future to be completed with the result once complete. The future is completed
+ * with true if the transaction is successfully prepared i.e. all pre-conditions are met and
+ * applicable resources locked.
+ */
+ CompletableFuture<Boolean> prepare(Transaction transaction);
+
+ /**
+ * Commit the specified transaction. A successful commit implies
+ * all the updates are applied, are now durable and are now visible externally.
+ *
+ * @param transaction transaction to commit
+ * @return A completable future to be completed with the result once complete
+ */
+ CompletableFuture<Boolean> commit(Transaction transaction);
+
+ /**
+ * Rollback the specified transaction. A successful rollback implies
+ * all previously acquired locks for the affected resources are released.
+ *
+ * @param transaction transaction to rollback
+ * @return A completable future to be completed with the result once complete
+ */
+ CompletableFuture<Boolean> rollback(Transaction transaction);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index 237c85b..f6fa6d1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -23,6 +23,8 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.cluster.internal.MemberInfo;
@@ -63,8 +65,14 @@
private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
.nextId(KryoNamespace.FLOATING_ID)
.register(Versioned.class)
+ .register(DatabaseUpdate.class)
+ .register(DatabaseUpdate.Type.class)
.register(Pair.class)
.register(ImmutablePair.class)
+ .register(Result.class)
+ .register(Result.Status.class)
+ .register(DefaultTransaction.class)
+ .register(Transaction.State.class)
.build();
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 097261d..6685dde 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -17,11 +17,10 @@
package org.onosproject.store.consistent.impl;
import java.util.Collection;
-import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.state.Command;
@@ -62,13 +61,13 @@
Versioned<V> get(String tableName, K key);
@Command
- Versioned<V> put(String tableName, K key, V value);
+ Result<Versioned<V>> put(String tableName, K key, V value);
@Command
- Versioned<V> remove(String tableName, K key);
+ Result<Versioned<V>> remove(String tableName, K key);
@Command
- void clear(String tableName);
+ Result<Void> clear(String tableName);
@Query
Set<K> keySet(String tableName);
@@ -80,20 +79,29 @@
Set<Entry<K, Versioned<V>>> entrySet(String tableName);
@Command
- Versioned<V> putIfAbsent(String tableName, K key, V value);
+ Result<Versioned<V>> putIfAbsent(String tableName, K key, V value);
@Command
- boolean remove(String tableName, K key, V value);
+ Result<Boolean> remove(String tableName, K key, V value);
@Command
- boolean remove(String tableName, K key, long version);
+ Result<Boolean> remove(String tableName, K key, long version);
@Command
- boolean replace(String tableName, K key, V oldValue, V newValue);
+ Result<Boolean> replace(String tableName, K key, V oldValue, V newValue);
@Command
- boolean replace(String tableName, K key, long oldVersion, V newValue);
+ Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
@Command
- boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+ boolean prepareAndCommit(Transaction transaction);
+
+ @Command
+ boolean prepare(Transaction transaction);
+
+ @Command
+ boolean commit(Transaction transaction);
+
+ @Command
+ boolean rollback(Transaction transaction);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index c86a6ea..09f612e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -28,6 +28,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
@@ -108,6 +109,7 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
+ .thenApply(this::unwrapResult)
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@@ -116,13 +118,14 @@
public CompletableFuture<Versioned<V>> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
return database.remove(name, keyCache.getUnchecked(key))
+ .thenApply(this::unwrapResult)
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public CompletableFuture<Void> clear() {
- return database.clear(name);
+ return database.clear(name).thenApply(this::unwrapResult);
}
@Override
@@ -154,23 +157,27 @@
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return database.putIfAbsent(
- name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v ->
- v != null ?
- new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
+ return database.putIfAbsent(name,
+ keyCache.getUnchecked(key),
+ serializer.encode(value))
+ .thenApply(this::unwrapResult)
+ .thenApply(v -> v != null ?
+ new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public CompletableFuture<Boolean> remove(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
+ return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
+ .thenApply(this::unwrapResult);
}
@Override
public CompletableFuture<Boolean> remove(K key, long version) {
checkNotNull(key, ERROR_NULL_KEY);
- return database.remove(name, keyCache.getUnchecked(key), version);
+ return database.remove(name, keyCache.getUnchecked(key), version)
+ .thenApply(this::unwrapResult);
}
@@ -179,14 +186,16 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
- return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
+ return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
+ .thenApply(this::unwrapResult);
}
@Override
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
- return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
+ return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue))
+ .thenApply(this::unwrapResult);
}
private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
@@ -197,4 +206,14 @@
e.getValue().version(),
e.getValue().creationTime()));
}
+
+ private <T> T unwrapResult(Result<T> result) {
+ if (result.status() == Result.Status.LOCKED) {
+ throw new ConsistentMapException.ConcurrentModification();
+ } else if (result.success()) {
+ return result.value();
+ } else {
+ throw new IllegalStateException("Must not be here");
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index c14e57c..8ed7670 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -23,13 +23,12 @@
import net.kuujo.copycat.util.concurrent.Futures;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
/**
@@ -39,7 +38,7 @@
private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
private DatabaseProxy<String, byte[]> proxy;
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public DefaultDatabase(ResourceContext context) {
super(context);
this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
@@ -91,17 +90,17 @@
}
@Override
- public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.put(tableName, key, value));
}
@Override
- public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
return checkOpen(() -> proxy.remove(tableName, key));
}
@Override
- public CompletableFuture<Void> clear(String tableName) {
+ public CompletableFuture<Result<Void>> clear(String tableName) {
return checkOpen(() -> proxy.clear(tableName));
}
@@ -121,33 +120,48 @@
}
@Override
- public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
}
@Override
- public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.remove(tableName, key, value));
}
@Override
- public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
return checkOpen(() -> proxy.remove(tableName, key, version));
}
@Override
- public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ public CompletableFuture<Result<Boolean>> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
}
@Override
- public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ public CompletableFuture<Result<Boolean>> replace(String tableName, String key, long oldVersion, byte[] newValue) {
return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
}
@Override
- public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
- return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+ public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+ return checkOpen(() -> proxy.prepareAndCommit(transaction));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(Transaction transaction) {
+ return checkOpen(() -> proxy.prepare(transaction));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> commit(Transaction transaction) {
+ return checkOpen(() -> proxy.commit(transaction));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> rollback(Transaction transaction) {
+ return checkOpen(() -> proxy.rollback(transaction));
}
@Override
@@ -180,4 +194,4 @@
}
return false;
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index e63a3d8..c190a28 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,43 +18,58 @@
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DatabaseUpdate.Type;
+import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
/**
* Default database state.
- *
- * @param <K> key type
- * @param <V> value type
*/
-public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
-
+public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
private Long nextVersion;
- private Map<String, Map<K, Versioned<V>>> tables;
+ private Map<String, Map<String, Versioned<byte[]>>> tables;
+
+ /**
+ * This locks map has a structure similar to the "tables" map above and
+ * holds all the provisional updates made during a transaction's prepare phase.
+ * The entry value is represented as the tuple: (transactionId, newValue)
+ * If newValue == null that signifies this update is attempting to
+ * delete the existing value.
+ * This map also serves as a lock on the entries that are being updated.
+ * The presence of a entry in this map indicates that element is
+ * participating in a transaction and is currently locked for updates.
+ */
+ private Map<String, Map<String, Pair<Long, byte[]>>> locks;
@Initializer
@Override
- public void init(StateContext<DatabaseState<K, V>> context) {
+ public void init(StateContext<DatabaseState<String, byte[]>> context) {
tables = context.get("tables");
if (tables == null) {
- tables = new HashMap<>();
+ tables = Maps.newConcurrentMap();
context.put("tables", tables);
}
+ locks = context.get("locks");
+ if (locks == null) {
+ locks = Maps.newConcurrentMap();
+ context.put("locks", locks);
+ }
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
@@ -62,15 +77,6 @@
}
}
- private Map<K, Versioned<V>> getTableMap(String tableName) {
- Map<K, Versioned<V>> table = tables.get(tableName);
- if (table == null) {
- table = new HashMap<>();
- tables.put(tableName, table);
- }
- return table;
- }
-
@Override
public Set<String> tableNames() {
return new HashSet<>(tables.keySet());
@@ -87,47 +93,55 @@
}
@Override
- public boolean containsKey(String tableName, K key) {
+ public boolean containsKey(String tableName, String key) {
return getTableMap(tableName).containsKey(key);
}
@Override
- public boolean containsValue(String tableName, V value) {
- return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+ public boolean containsValue(String tableName, byte[] value) {
+ return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
}
@Override
- public Versioned<V> get(String tableName, K key) {
+ public Versioned<byte[]> get(String tableName, String key) {
return getTableMap(tableName).get(key);
}
@Override
- public Versioned<V> put(String tableName, K key, V value) {
- return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+ public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ return isLockedForUpdates(tableName, key)
+ ? Result.locked()
+ : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
}
@Override
- public Versioned<V> remove(String tableName, K key) {
- return getTableMap(tableName).remove(key);
+ public Result<Versioned<byte[]>> remove(String tableName, String key) {
+ return isLockedForUpdates(tableName, key)
+ ? Result.locked()
+ : Result.ok(getTableMap(tableName).remove(key));
}
@Override
- public void clear(String tableName) {
+ public Result<Void> clear(String tableName) {
+ if (areTransactionsInProgress(tableName)) {
+ return Result.locked();
+ }
getTableMap(tableName).clear();
+ return Result.ok(null);
}
@Override
- public Set<K> keySet(String tableName) {
+ public Set<String> keySet(String tableName) {
return ImmutableSet.copyOf(getTableMap(tableName).keySet());
}
@Override
- public Collection<Versioned<V>> values(String tableName) {
+ public Collection<Versioned<byte[]>> values(String tableName) {
return ImmutableList.copyOf(getTableMap(tableName).values());
}
@Override
- public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+ public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
return ImmutableSet.copyOf(getTableMap(tableName)
.entrySet()
.stream()
@@ -136,93 +150,113 @@
}
@Override
- public Versioned<V> putIfAbsent(String tableName, K key, V value) {
- Versioned<V> existingValue = getTableMap(tableName).get(key);
- return existingValue != null ? existingValue : put(tableName, key, value);
- }
-
- @Override
- public boolean remove(String tableName, K key, V value) {
- Versioned<V> existing = getTableMap(tableName).get(key);
- if (existing != null && checkEquality(existing.value(), value)) {
- getTableMap(tableName).remove(key);
- return true;
+ public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
}
- return false;
+ Versioned<byte[]> existingValue = get(tableName, key);
+ Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
+ return Result.ok(currentValue);
}
@Override
- public boolean remove(String tableName, K key, long version) {
- Versioned<V> existing = getTableMap(tableName).get(key);
+ public Result<Boolean> remove(String tableName, String key, byte[] value) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> existing = get(tableName, key);
+ if (existing != null && Arrays.equals(existing.value(), value)) {
+ getTableMap(tableName).remove(key);
+ return Result.ok(true);
+ }
+ return Result.ok(false);
+ }
+
+ @Override
+ public Result<Boolean> remove(String tableName, String key, long version) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> existing = get(tableName, key);
if (existing != null && existing.version() == version) {
remove(tableName, key);
- return true;
+ return Result.ok(true);
}
- return false;
+ return Result.ok(false);
}
@Override
- public boolean replace(String tableName, K key, V oldValue, V newValue) {
- Versioned<V> existing = getTableMap(tableName).get(key);
- if (existing != null && checkEquality(existing.value(), oldValue)) {
+ public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> existing = get(tableName, key);
+ if (existing != null && Arrays.equals(existing.value(), oldValue)) {
put(tableName, key, newValue);
- return true;
+ return Result.ok(true);
}
- return false;
+ return Result.ok(false);
}
@Override
- public boolean replace(String tableName, K key, long oldVersion, V newValue) {
- Versioned<V> existing = getTableMap(tableName).get(key);
+ public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> existing = get(tableName, key);
if (existing != null && existing.version() == oldVersion) {
put(tableName, key, newValue);
+ return Result.ok(true);
+ }
+ return Result.ok(false);
+ }
+
+ @Override
+ public boolean prepareAndCommit(Transaction transaction) {
+ if (prepare(transaction)) {
+ return commit(transaction);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean prepare(Transaction transaction) {
+ if (transaction.updates().stream().anyMatch(update ->
+ isLockedByAnotherTransaction(update.tableName(),
+ update.key(),
+ transaction.id()))) {
+ return false;
+ }
+
+ if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
+ transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
return true;
}
return false;
}
@Override
- public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
- if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
- return false;
- } else {
- updates.stream().forEach(this::doUpdate);
- return true;
- }
+ public boolean commit(Transaction transaction) {
+ transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
+ return true;
}
- private void doUpdate(UpdateOperation<K, V> update) {
- String tableName = update.tableName();
- K key = update.key();
- switch (update.type()) {
- case PUT:
- put(tableName, key, update.value());
- return;
- case REMOVE:
- remove(tableName, key);
- return;
- case PUT_IF_ABSENT:
- putIfAbsent(tableName, key, update.value());
- return;
- case PUT_IF_VERSION_MATCH:
- replace(tableName, key, update.currentValue(), update.value());
- return;
- case PUT_IF_VALUE_MATCH:
- replace(tableName, key, update.currentVersion(), update.value());
- return;
- case REMOVE_IF_VERSION_MATCH:
- remove(tableName, key, update.currentVersion());
- return;
- case REMOVE_IF_VALUE_MATCH:
- remove(tableName, key, update.currentValue());
- return;
- default:
- throw new IllegalStateException("Unsupported type: " + update.type());
- }
+ @Override
+ public boolean rollback(Transaction transaction) {
+ transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
+ return true;
}
- private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
- Versioned<V> existingEntry = get(update.tableName(), update.key());
+ private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
+ return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+ }
+
+ private Map<String, Pair<Long, byte[]>> getLockMap(String tableName) {
+ return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+ }
+
+ private boolean isUpdatePossible(DatabaseUpdate update) {
+ Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
switch (update.type()) {
case PUT:
case REMOVE:
@@ -232,20 +266,85 @@
case PUT_IF_VERSION_MATCH:
return existingEntry != null && existingEntry.version() == update.currentVersion();
case PUT_IF_VALUE_MATCH:
- return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
+ return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
case REMOVE_IF_VERSION_MATCH:
return existingEntry == null || existingEntry.version() == update.currentVersion();
case REMOVE_IF_VALUE_MATCH:
- return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
+ return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
}
- private boolean checkEquality(V value1, V value2) {
- if (value1 instanceof byte[]) {
- return Arrays.equals((byte[]) value1, (byte[]) value2);
+ private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ Map<String, Pair<Long, byte[]>> lockMap = getLockMap(update.tableName());
+ switch (update.type()) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ case PUT_IF_VERSION_MATCH:
+ case PUT_IF_VALUE_MATCH:
+ lockMap.put(update.key(), Pair.of(transactionId, update.value()));
+ break;
+ case REMOVE:
+ case REMOVE_IF_VERSION_MATCH:
+ case REMOVE_IF_VALUE_MATCH:
+ lockMap.put(update.key(), null);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type: " + update.type());
}
- return value1.equals(value2);
+ }
+
+ private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ String tableName = update.tableName();
+ String key = update.key();
+ Type type = update.type();
+ Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+ if (Objects.equal(transactionId, provisionalUpdate.getLeft())) {
+ getLockMap(tableName).remove(key);
+ } else {
+ return;
+ }
+
+ switch (type) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ case PUT_IF_VERSION_MATCH:
+ case PUT_IF_VALUE_MATCH:
+ put(tableName, key, provisionalUpdate.getRight());
+ break;
+ case REMOVE:
+ case REMOVE_IF_VERSION_MATCH:
+ case REMOVE_IF_VALUE_MATCH:
+ remove(tableName, key);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ String tableName = update.tableName();
+ String key = update.key();
+ Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+ if (provisionalUpdate == null) {
+ return;
+ }
+ if (Objects.equal(transactionId, provisionalUpdate.getLeft())) {
+ getLockMap(tableName).remove(key);
+ }
+ }
+
+ private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
+ Pair<Long, byte[]> update = getLockMap(tableName).get(key);
+ return update != null && !Objects.equal(transactionId, update.getLeft());
+ }
+
+ private boolean isLockedForUpdates(String tableName, String key) {
+ return getLockMap(tableName).containsKey(key);
+ }
+
+ private boolean areTransactionsInProgress(String tableName) {
+ return !getLockMap(tableName).isEmpty();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index e6eac62..97d5ef3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
+
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
@@ -46,6 +47,7 @@
return backingMap.isEmpty();
}
+ @SuppressWarnings("unchecked")
@Override
public boolean contains(Object o) {
return backingMap.containsKey((E) o);
@@ -71,6 +73,7 @@
return backingMap.putIfAbsent(e, true) == null;
}
+ @SuppressWarnings("unchecked")
@Override
public boolean remove(Object o) {
return backingMap.remove((E) o, true);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java
new file mode 100644
index 0000000..2ff7a2d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.List;
+
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A Default transaction implementation.
+ */
+public class DefaultTransaction implements Transaction {
+
+ private final long transactionId;
+ private final List<DatabaseUpdate> updates;
+ private final State state;
+ private final long lastUpdated;
+
+ public DefaultTransaction(long transactionId, List<DatabaseUpdate> updates) {
+ this(transactionId, updates, State.PREPARING, System.currentTimeMillis());
+ }
+
+ private DefaultTransaction(long transactionId, List<DatabaseUpdate> updates, State state, long lastUpdated) {
+ this.transactionId = transactionId;
+ this.updates = ImmutableList.copyOf(updates);
+ this.state = state;
+ this.lastUpdated = lastUpdated;
+ }
+
+ @Override
+ public long id() {
+ return transactionId;
+ }
+
+ @Override
+ public List<DatabaseUpdate> updates() {
+ return updates;
+ }
+
+ @Override
+ public State state() {
+ return state;
+ }
+
+ @Override
+ public Transaction transition(State newState) {
+ return new DefaultTransaction(transactionId, updates, newState, System.currentTimeMillis());
+ }
+
+ @Override
+ public long lastUpdated() {
+ return lastUpdated;
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index d21543a..9d13833 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -18,19 +18,14 @@
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import static com.google.common.base.Preconditions.*;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionException;
import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.UpdateOperation;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -40,80 +35,69 @@
*/
public class DefaultTransactionContext implements TransactionContext {
- private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap();
+ private static final String TX_NOT_OPEN_ERROR = "Transaction Context is not open";
+
+ @SuppressWarnings("rawtypes")
+ private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap();
private boolean isOpen = false;
private final Database database;
- private static final String TX_NOT_OPEN_ERROR = "Transaction is not open";
- private static final int TRANSACTION_TIMEOUT_MILLIS = 2000;
+ private final long transactionId;
- DefaultTransactionContext(Database database) {
- this.database = checkNotNull(database, "Database must not be null");
+ public DefaultTransactionContext(Database database, long transactionId) {
+ this.database = checkNotNull(database);
+ this.transactionId = transactionId;
+ }
+
+ @Override
+ public long transactionId() {
+ return transactionId;
}
@Override
public void begin() {
+ checkState(!isOpen, "Transaction Context is already open");
isOpen = true;
}
@Override
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ @Override
@SuppressWarnings("unchecked")
- public <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName,
+ public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
Serializer serializer) {
- checkNotNull(mapName, "map name is null");
- checkNotNull(serializer, "serializer is null");
checkState(isOpen, TX_NOT_OPEN_ERROR);
- if (!txMaps.containsKey(mapName)) {
- ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, database, serializer);
- DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
- txMaps.put(mapName, txMap);
- }
- return txMaps.get(mapName);
+ checkNotNull(mapName);
+ checkNotNull(serializer);
+ return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
+ name,
+ new DefaultConsistentMap<>(name, database, serializer),
+ this,
+ serializer));
}
@SuppressWarnings("unchecked")
@Override
public void commit() {
checkState(isOpen, TX_NOT_OPEN_ERROR);
- List<UpdateOperation<String, byte[]>> allUpdates =
- Lists.newLinkedList();
try {
+ List<DatabaseUpdate> updates = Lists.newLinkedList();
txMaps.values()
- .stream()
- .forEach(m -> {
- allUpdates.addAll(m.prepareDatabaseUpdates());
- });
-
- if (!complete(database.atomicBatchUpdate(allUpdates))) {
- throw new TransactionException.OptimisticConcurrencyFailure();
- }
+ .forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
+ database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
+ } catch (Exception e) {
+ abort();
+ throw new TransactionException(e);
} finally {
isOpen = false;
}
}
@Override
- public void rollback() {
+ public void abort() {
checkState(isOpen, TX_NOT_OPEN_ERROR);
- txMaps.values()
- .stream()
- .forEach(m -> m.rollback());
+ txMaps.values().forEach(m -> m.rollback());
}
-
- @Override
- public boolean isOpen() {
- return false;
- }
-
- private static <T> T complete(CompletableFuture<T> future) {
- try {
- return future.get(TRANSACTION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new TransactionException.Interrupted();
- } catch (TimeoutException e) {
- throw new TransactionException.Timeout();
- } catch (ExecutionException e) {
- throw new TransactionException(e.getCause());
- }
- }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
index c98c336..91151b0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
@@ -16,23 +16,24 @@
package org.onosproject.store.consistent.impl;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
import java.util.Set;
import org.onlab.util.HexString;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
import static com.google.common.base.Preconditions.*;
+import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -55,6 +56,23 @@
private final Map<K, V> writeCache = Maps.newConcurrentMap();
private final Set<K> deleteSet = Sets.newConcurrentHashSet();
+ private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+ private static final String ERROR_NULL_KEY = "Null key is not allowed";
+
+ private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+ .softValues()
+ .build(new CacheLoader<K, String>() {
+
+ @Override
+ public String load(K key) {
+ return HexString.toHexString(serializer.encode(key));
+ }
+ });
+
+ protected K dK(String key) {
+ return serializer.decode(HexString.fromHexString(key));
+ }
+
public DefaultTransactionalMap(
String name,
ConsistentMap<K, V> backingMap,
@@ -69,15 +87,15 @@
@Override
public V get(K key) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+ checkNotNull(key, ERROR_NULL_KEY);
if (deleteSet.contains(key)) {
return null;
- } else if (writeCache.containsKey(key)) {
- return writeCache.get(key);
+ }
+ V latest = writeCache.get(key);
+ if (latest != null) {
+ return latest;
} else {
- if (!readCache.containsKey(key)) {
- readCache.put(key, backingMap.get(key));
- }
- Versioned<V> v = readCache.get(key);
+ Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k));
return v != null ? v.value() : null;
}
}
@@ -85,25 +103,31 @@
@Override
public V put(K key, V value) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- Versioned<V> original = readCache.get(key);
- V recentUpdate = writeCache.put(key, value);
+ checkNotNull(value, ERROR_NULL_VALUE);
+
+ V latest = get(key);
+ writeCache.put(key, value);
deleteSet.remove(key);
- return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
+ return latest;
}
@Override
public V remove(K key) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- Versioned<V> original = readCache.get(key);
- V recentUpdate = writeCache.remove(key);
- deleteSet.add(key);
- return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
+ V latest = get(key);
+ if (latest != null) {
+ writeCache.remove(key);
+ deleteSet.add(key);
+ }
+ return latest;
}
@Override
public boolean remove(K key, V value) {
- V currentValue = get(key);
- if (value.equals(currentValue)) {
+ checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ V latest = get(key);
+ if (Objects.equal(value, latest)) {
remove(key);
return true;
}
@@ -112,8 +136,11 @@
@Override
public boolean replace(K key, V oldValue, V newValue) {
- V currentValue = get(key);
- if (oldValue.equals(currentValue)) {
+ checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+ checkNotNull(oldValue, ERROR_NULL_VALUE);
+ checkNotNull(newValue, ERROR_NULL_VALUE);
+ V latest = get(key);
+ if (Objects.equal(oldValue, latest)) {
put(key, newValue);
return true;
}
@@ -121,70 +148,25 @@
}
@Override
- public int size() {
- // TODO
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isEmpty() {
- return size() == 0;
- }
-
- @Override
- public boolean containsKey(K key) {
- return get(key) != null;
- }
-
- @Override
- public boolean containsValue(V value) {
- // TODO
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void clear() {
- // TODO
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<K> keySet() {
- // TODO
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Collection<V> values() {
- // TODO
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<Entry<K, V>> entrySet() {
- // TODO
- throw new UnsupportedOperationException();
- }
-
- @Override
public V putIfAbsent(K key, V value) {
- V currentValue = get(key);
- if (currentValue == null) {
+ checkState(txContext.isOpen(), TX_CLOSED_ERROR);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ V latest = get(key);
+ if (latest == null) {
put(key, value);
- return null;
}
- return currentValue;
+ return latest;
}
- protected List<UpdateOperation<String, byte[]>> prepareDatabaseUpdates() {
- List<UpdateOperation<K, V>> updates = Lists.newLinkedList();
+ protected List<DatabaseUpdate> prepareDatabaseUpdates() {
+ List<DatabaseUpdate> updates = Lists.newLinkedList();
deleteSet.forEach(key -> {
Versioned<V> original = readCache.get(key);
if (original != null) {
- updates.add(UpdateOperation.<K, V>newBuilder()
+ updates.add(DatabaseUpdate.newBuilder()
.withTableName(name)
- .withType(UpdateOperation.Type.REMOVE_IF_VERSION_MATCH)
- .withKey(key)
+ .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withKey(keyCache.getUnchecked(key))
.withCurrentVersion(original.version())
.build());
}
@@ -192,44 +174,23 @@
writeCache.forEach((key, value) -> {
Versioned<V> original = readCache.get(key);
if (original == null) {
- updates.add(UpdateOperation.<K, V>newBuilder()
+ updates.add(DatabaseUpdate.newBuilder()
.withTableName(name)
- .withType(UpdateOperation.Type.PUT_IF_ABSENT)
- .withKey(key)
- .withValue(value)
+ .withType(DatabaseUpdate.Type.PUT_IF_ABSENT)
+ .withKey(keyCache.getUnchecked(key))
+ .withValue(serializer.encode(value))
.build());
} else {
- updates.add(UpdateOperation.<K, V>newBuilder()
+ updates.add(DatabaseUpdate.newBuilder()
.withTableName(name)
- .withType(UpdateOperation.Type.PUT_IF_VERSION_MATCH)
- .withKey(key)
+ .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey(keyCache.getUnchecked(key))
.withCurrentVersion(original.version())
- .withValue(value)
+ .withValue(serializer.encode(value))
.build());
}
});
- return updates.stream().map(this::toRawUpdateOperation).collect(Collectors.toList());
- }
-
- private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
-
- UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
-
- rawUpdate = rawUpdate.withKey(HexString.toHexString(serializer.encode(update.key())))
- .withCurrentVersion(update.currentVersion())
- .withType(update.type());
-
- rawUpdate = rawUpdate.withTableName(update.tableName());
-
- if (update.value() != null) {
- rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
- }
-
- if (update.currentValue() != null) {
- rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
- }
-
- return rawUpdate.build();
+ return updates;
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index ad049e6..178f49f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -27,7 +27,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Lists;
@@ -129,24 +130,27 @@
}
@Override
- public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).put(tableName, key, value);
}
@Override
- public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key);
}
@Override
- public CompletableFuture<Void> clear(String tableName) {
+ public CompletableFuture<Result<Void>> clear(String tableName) {
+ AtomicBoolean isLocked = new AtomicBoolean(false);
checkState(isOpen.get(), DB_NOT_OPEN);
return CompletableFuture.allOf(partitions
.stream()
- .map(p -> p.clear(tableName))
- .toArray(CompletableFuture[]::new));
+ .map(p -> p.clear(tableName)
+ .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
}
@Override
@@ -183,59 +187,86 @@
}
@Override
- public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
}
@Override
- public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key, value);
}
@Override
- public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key, version);
}
@Override
- public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ public CompletableFuture<Result<Boolean>> replace(
+ String tableName, String key, byte[] oldValue, byte[] newValue) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
}
@Override
- public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ public CompletableFuture<Result<Boolean>> replace(
+ String tableName, String key, long oldVersion, byte[] newValue) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
}
@Override
- public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
- for (UpdateOperation<String, byte[]> update : updates) {
- Database partition = partitioner.getPartition(update.tableName(), update.key());
- List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
- if (partitionUpdates == null) {
- partitionUpdates = Lists.newArrayList();
- perPartitionUpdates.put(partition, partitionUpdates);
- }
- partitionUpdates.add(update);
- }
- if (perPartitionUpdates.size() > 1) {
- // TODO
- throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+ public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+ Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ if (subTransactions.isEmpty()) {
+ return CompletableFuture.completedFuture(true);
+ } else if (subTransactions.size() == 1) {
+ Entry<Database, Transaction> entry =
+ subTransactions.entrySet().iterator().next();
+ return entry.getKey().prepareAndCommit(entry.getValue());
} else {
- Entry<Database, List<UpdateOperation<String, byte[]>>> only =
- perPartitionUpdates.entrySet().iterator().next();
- return only.getKey().atomicBatchUpdate(only.getValue());
+ return new TransactionManager(this).execute(transaction);
}
}
@Override
+ public CompletableFuture<Boolean> prepare(Transaction transaction) {
+ Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ AtomicBoolean status = new AtomicBoolean(true);
+ return CompletableFuture.allOf(subTransactions.entrySet()
+ .stream()
+ .map(entry -> entry
+ .getKey()
+ .prepare(entry.getValue())
+ .thenApply(v -> status.compareAndSet(true, v)))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> status.get());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> commit(Transaction transaction) {
+ Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ return CompletableFuture.allOf(subTransactions.entrySet()
+ .stream()
+ .map(entry -> entry.getKey().commit(entry.getValue()))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> rollback(Transaction transaction) {
+ Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+ return CompletableFuture.allOf(subTransactions.entrySet()
+ .stream()
+ .map(entry -> entry.getKey().rollback(entry.getValue()))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> true);
+ }
+
+ @Override
public CompletableFuture<Database> open() {
return CompletableFuture.allOf(partitions
.stream()
@@ -243,7 +274,8 @@
.toArray(CompletableFuture[]::new))
.thenApply(v -> {
isOpen.set(true);
- return this; });
+ return this;
+ });
}
@Override
@@ -279,4 +311,19 @@
public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
throw new UnsupportedOperationException();
}
-}
\ No newline at end of file
+
+ private Map<Database, Transaction> createSubTransactions(
+ Transaction transaction) {
+ Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
+ for (DatabaseUpdate update : transaction.updates()) {
+ Database partition = partitioner.getPartition(update.tableName(), update.key());
+ List<DatabaseUpdate> partitionUpdates =
+ perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
+ partitionUpdates.add(update);
+ }
+ Map<Database, Transaction> subTransactions = Maps.newHashMap();
+ perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
+
+ return subTransactions;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java
new file mode 100644
index 0000000..548174a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Result of a database update operation.
+ *
+ * @param <V> return value type
+ */
+public final class Result<V> {
+
+ public enum Status {
+ /**
+ * Indicates a successful update.
+ */
+ OK,
+
+ /**
+ * Indicates a failure due to underlying state being locked by another transaction.
+ */
+ LOCKED
+ }
+
+ private final Status status;
+ private final V value;
+
+ /**
+ * Creates a new Result instance with the specified value with status set to Status.OK.
+ *
+ * @param <V> result value type
+ * @param value result value
+ * @return Result instance
+ */
+ public static <V> Result<V> ok(V value) {
+ return new Result<>(value, Status.OK);
+ }
+
+ /**
+ * Creates a new Result instance with status set to Status.LOCKED.
+ *
+ * @param <V> result value type
+ * @return Result instance
+ */
+ public static <V> Result<V> locked() {
+ return new Result<>(null, Status.LOCKED);
+ }
+
+ private Result(V value, Status status) {
+ this.value = value;
+ this.status = status;
+ }
+
+ /**
+ * Returns true if this result indicates a successful execution i.e status is Status.OK.
+ *
+ * @return true if successful, false otherwise
+ */
+ public boolean success() {
+ return status == Status.OK;
+ }
+
+ /**
+ * Returns the status of database update operation.
+ * @return database update status
+ */
+ public Status status() {
+ return status;
+ }
+
+ /**
+ * Returns the return value for the update.
+ * @return value returned by database update. If the status is another
+ * other than Status.OK, this returns a null
+ */
+ public V value() {
+ return value;
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
index adc5477..e8daeff 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
@@ -36,4 +36,4 @@
public Database getPartition(String tableName, String key) {
return partitions.get(hash(tableName) % partitions.size());
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
new file mode 100644
index 0000000..865934b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.Transaction.State;
+
+/**
+ * Agent that runs the two phase commit protocol.
+ */
+public class TransactionManager {
+
+ private final Database database;
+ private final AsyncConsistentMap<Long, Transaction> transactions;
+
+ private final Serializer serializer = new Serializer() {
+
+ private KryoNamespace kryo = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .nextId(KryoNamespace.FLOATING_ID)
+ .register(Versioned.class)
+ .register(DatabaseUpdate.class)
+ .register(DatabaseUpdate.Type.class)
+ .register(DefaultTransaction.class)
+ .register(Transaction.State.class)
+ .register(Pair.class)
+ .register(ImmutablePair.class)
+ .build();
+
+ @Override
+ public <T> byte[] encode(T object) {
+ return kryo.serialize(object);
+ }
+
+ @Override
+ public <T> T decode(byte[] bytes) {
+ return kryo.deserialize(bytes);
+ }
+ };
+
+ /**
+ * Constructs a new TransactionManager for the specified database instance.
+ *
+ * @param database database
+ */
+ public TransactionManager(Database database) {
+ this.database = checkNotNull(database, "database cannot be null");
+ this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer);
+ }
+
+ /**
+ * Executes the specified transaction by employing a two phase commit protocol.
+ *
+ * @param transaction transaction to commit
+ * @return transaction result. Result value true indicates a successful commit, false
+ * indicates abort
+ */
+ public CompletableFuture<Boolean> execute(Transaction transaction) {
+ // clean up if this transaction in already in a terminal state.
+ if (transaction.state() == Transaction.State.COMMITTED ||
+ transaction.state() == Transaction.State.ROLLEDBACK) {
+ return transactions.remove(transaction.id()).thenApply(v -> true);
+ } else if (transaction.state() == Transaction.State.COMMITTING) {
+ return commit(transaction);
+ } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
+ return rollback(transaction);
+ } else {
+ return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
+ }
+ }
+
+
+ /**
+ * Returns all transactions in the system.
+ *
+ * @return future for a collection of transactions
+ */
+ public CompletableFuture<Collection<Transaction>> getTransactions() {
+ return transactions.values().thenApply(c -> {
+ Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
+ return txns;
+ });
+ }
+
+ private CompletableFuture<Boolean> prepare(Transaction transaction) {
+ return transactions.put(transaction.id(), transaction)
+ .thenCompose(v -> database.prepare(transaction))
+ .thenCompose(status -> transactions.put(
+ transaction.id(),
+ transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
+ .thenApply(v -> status));
+ }
+
+ private CompletableFuture<Boolean> commit(Transaction transaction) {
+ return database.commit(transaction)
+ .thenCompose(v -> transactions.put(
+ transaction.id(),
+ transaction.transition(Transaction.State.COMMITTED)))
+ .thenApply(v -> true);
+ }
+
+ private CompletableFuture<Boolean> rollback(Transaction transaction) {
+ return database.rollback(transaction)
+ .thenCompose(v -> transactions.put(
+ transaction.id(),
+ transaction.transition(Transaction.State.ROLLEDBACK)))
+ .thenApply(v -> true);
+ }
+}
\ No newline at end of file