Refactor transaction support in preparation for migration to latest APIs
- Added a explicit transaction id type
- cli command now just returns the identifiers of in-progress transactions
- Removed redriveTransactions until a better alternative is provided
- Removed DatabaseUpdate and replaced its usage with MapUpdate
Change-Id: Ic4a14967072068834510cd8459fd2a6790e456ef
diff --git a/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java b/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java
index 6013a38..75da2be 100644
--- a/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/TransactionsCommand.java
@@ -18,81 +18,41 @@
import java.util.Collection;
import org.apache.karaf.shell.commands.Command;
-import org.apache.karaf.shell.commands.Option;
-import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.StorageAdminService;
-import org.onosproject.store.service.Transaction;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
/**
- * CLI to work with database transactions in the system.
+ * CLI to view in-progress database transactions in the system.
*/
@Command(scope = "onos", name = "transactions",
- description = "Utility for viewing and redriving database transactions")
+ description = "Utility for listing pending/inprogress transactions")
public class TransactionsCommand extends AbstractShellCommand {
- @Option(name = "-r", aliases = "--redrive",
- description = "Redrive stuck transactions while removing those that are done",
- required = false, multiValued = false)
- private boolean redrive = false;
-
- private static final String FMT = "%-20s %-15s %-10s";
-
- /**
- * Displays transactions as text.
- *
- * @param transactions transactions
- */
- private void displayTransactions(Collection<Transaction> transactions) {
- print("---------------------------------------------");
- print(FMT, "Id", "State", "Updated");
- print("---------------------------------------------");
- transactions.forEach(txn -> print(FMT, txn.id(), txn.state(), Tools.timeAgo(txn.lastUpdated())));
- if (transactions.size() > 0) {
- print("---------------------------------------------");
- }
- }
-
/**
* Converts collection of transactions into a JSON object.
*
- * @param transactions transactions
+ * @param transactionIds transaction identifiers
*/
- private JsonNode json(Collection<Transaction> transactions) {
+ private JsonNode json(Collection<TransactionId> transactionIds) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode txns = mapper.createArrayNode();
-
- // Create a JSON node for each transaction
- transactions.stream().forEach(txn -> {
- ObjectNode txnNode = mapper.createObjectNode();
- txnNode.put("id", txn.id())
- .put("state", txn.state().toString())
- .put("lastUpdated", txn.lastUpdated());
- txns.add(txnNode);
- });
-
+ transactionIds.forEach(id -> txns.add(id.toString()));
return txns;
}
@Override
protected void execute() {
StorageAdminService storageAdminService = get(StorageAdminService.class);
-
- if (redrive) {
- storageAdminService.redriveTransactions();
- return;
- }
-
- Collection<Transaction> transactions = storageAdminService.getTransactions();
+ Collection<TransactionId> transactionIds = storageAdminService.getPendingTransactions();
if (outputJson()) {
- print("%s", json(transactions));
+ print("%s", json(transactionIds));
} else {
- displayTransactions(transactions);
+ transactionIds.forEach(id -> print("%s", id.toString()));
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java b/core/api/src/main/java/org/onosproject/store/primitives/TransactionId.java
similarity index 95%
rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java
rename to core/api/src/main/java/org/onosproject/store/primitives/TransactionId.java
index e378580..21cf24b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/TransactionId.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.store.primitives.resources.impl;
+package org.onosproject.store.primitives;
import com.google.common.base.Objects;
diff --git a/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
deleted file mode 100644
index a62d382..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Database update operation.
- *
- */
-public final class DatabaseUpdate {
-
- /**
- * Type of database update operation.
- */
- public enum Type {
- /**
- * Insert/Update entry without any checks.
- */
- PUT,
- /**
- * Insert an entry iff there is no existing entry for that key.
- */
- PUT_IF_ABSENT,
-
- /**
- * Update entry if the current version matches specified version.
- */
- PUT_IF_VERSION_MATCH,
-
- /**
- * Update entry if the current value matches specified value.
- */
- PUT_IF_VALUE_MATCH,
-
- /**
- * Remove entry without any checks.
- */
- REMOVE,
-
- /**
- * Remove entry if the current version matches specified version.
- */
- REMOVE_IF_VERSION_MATCH,
-
- /**
- * Remove entry if the current value matches specified value.
- */
- REMOVE_IF_VALUE_MATCH,
- }
-
- private Type type;
- private String mapName;
- private String key;
- private byte[] value;
- private byte[] currentValue;
- private long currentVersion = -1;
-
- /**
- * Returns the type of update operation.
- * @return type of update.
- */
- public Type type() {
- return type;
- }
-
- /**
- * Returns the name of map being updated.
- * @return map name.
- */
- public String mapName() {
- return mapName;
- }
-
- /**
- * Returns the item key being updated.
- * @return item key
- */
- public String key() {
- return key;
- }
-
- /**
- * Returns the new value.
- * @return item's target value.
- */
- public byte[] value() {
- return value;
- }
-
- /**
- * Returns the expected current value in the database value for the key.
- * @return current value in database.
- */
- public byte[] currentValue() {
- return currentValue;
- }
-
- /**
- * Returns the expected current version in the database for the key.
- * @return expected version.
- */
- public long currentVersion() {
- return currentVersion;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("type", type)
- .add("mapName", mapName)
- .add("key", key)
- .add("value", value)
- .add("currentValue", currentValue)
- .add("currentVersion", currentVersion)
- .toString();
- }
-
- /**
- * Creates a new builder instance.
- *
- * @return builder.
- */
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * DatabaseUpdate builder.
- *
- */
- public static final class Builder {
-
- private DatabaseUpdate update = new DatabaseUpdate();
-
- public DatabaseUpdate build() {
- validateInputs();
- return update;
- }
-
- public Builder withType(Type type) {
- update.type = checkNotNull(type, "type cannot be null");
- return this;
- }
-
- public Builder withMapName(String mapName) {
- update.mapName = checkNotNull(mapName, "mapName cannot be null");
- return this;
- }
-
- public Builder withKey(String key) {
- update.key = checkNotNull(key, "key cannot be null");
- return this;
- }
-
- public Builder withCurrentValue(byte[] value) {
- update.currentValue = checkNotNull(value, "currentValue cannot be null");
- return this;
- }
-
- public Builder withValue(byte[] value) {
- update.value = checkNotNull(value, "value cannot be null");
- return this;
- }
-
- public Builder withCurrentVersion(long version) {
- checkArgument(version >= 0, "version cannot be negative");
- update.currentVersion = version;
- return this;
- }
-
- private void validateInputs() {
- checkNotNull(update.type, "type must be specified");
- checkNotNull(update.mapName, "map name must be specified");
- checkNotNull(update.key, "key must be specified");
- switch (update.type) {
- case PUT:
- case PUT_IF_ABSENT:
- checkNotNull(update.value, "value must be specified.");
- break;
- case PUT_IF_VERSION_MATCH:
- checkNotNull(update.value, "value must be specified.");
- checkState(update.currentVersion >= 0, "current version must be specified");
- break;
- case PUT_IF_VALUE_MATCH:
- checkNotNull(update.value, "value must be specified.");
- checkNotNull(update.currentValue, "currentValue must be specified.");
- break;
- case REMOVE:
- break;
- case REMOVE_IF_VERSION_MATCH:
- checkState(update.currentVersion >= 0, "current version must be specified");
- break;
- case REMOVE_IF_VALUE_MATCH:
- checkNotNull(update.currentValue, "currentValue must be specified.");
- break;
- default:
- throw new IllegalStateException("Unknown operation type");
- }
- }
- }
-}
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
index abdb14d..09d2979 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -61,7 +61,12 @@
/**
* Leader elector.
*/
- LEADER_ELECTOR
+ LEADER_ELECTOR,
+
+ /**
+ * Transaction Context.
+ */
+ TRANSACTION_CONTEXT
}
static final long DEFAULT_OPERTATION_TIMEOUT_MILLIS = 5000L;
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
index 2259104..1e9a36e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
@@ -19,6 +19,8 @@
import java.util.List;
import java.util.Map;
+import org.onosproject.store.primitives.TransactionId;
+
/**
* Service for administering storage instances.
*/
@@ -62,14 +64,9 @@
Map<String, Long> getInMemoryDatabaseCounters();
/**
- * Returns all the transactions in the system.
+ * Returns all pending transactions.
*
- * @return collection of transactions
+ * @return collection of pending transaction identifiers.
*/
- Collection<Transaction> getTransactions();
-
- /**
- * Redrives stuck transactions while removing those that are done.
- */
- void redriveTransactions();
+ Collection<TransactionId> getPendingTransactions();
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Transaction.java b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
deleted file mode 100644
index 330d846..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/Transaction.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service;
-
-import java.util.List;
-
-/**
- * An immutable transaction object.
- */
-public interface Transaction {
-
- enum State {
- /**
- * Indicates a new transaction that is about to be prepared. All transactions
- * start their life in this state.
- */
- PREPARING,
-
- /**
- * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
- */
- PREPARED,
-
- /**
- * Indicates a transaction that is about to be committed.
- */
- COMMITTING,
-
- /**
- * Indicates a transaction that has successfully committed.
- */
- COMMITTED,
-
- /**
- * Indicates a transaction that is about to be rolled back.
- */
- ROLLINGBACK,
-
- /**
- * Indicates a transaction that has been rolled back and all locks are released.
- */
- ROLLEDBACK
- }
-
- /**
- * Returns the transaction Id.
- *
- * @return transaction id
- */
- long id();
-
- /**
- * Returns the list of updates that are part of this transaction.
- *
- * @return list of database updates
- */
- List<DatabaseUpdate> updates();
-
- /**
- * Returns the current state of this transaction.
- *
- * @return transaction state
- */
- State state();
-
- /**
- * Returns true if this transaction has completed execution.
- *
- * @return true is yes, false otherwise
- */
- default boolean isDone() {
- return state() == State.COMMITTED || state() == State.ROLLEDBACK;
- }
-
- /**
- * Returns a new transaction that is created by transitioning this one to the specified state.
- *
- * @param newState destination state
- * @return a new transaction instance similar to the current one but its state set to specified state
- */
- Transaction transition(State newState);
-
- /**
- * Returns the system time when the transaction was last updated.
- *
- * @return last update time
- */
- long lastUpdated();
-}
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
index ef97253..0ac490b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
@@ -16,6 +16,8 @@
package org.onosproject.store.service;
+import org.onosproject.store.primitives.TransactionId;
+
/**
* Provides a context for transactional operations.
* <p>
@@ -31,14 +33,19 @@
* context isolation level is REPEATABLE_READS i.e. only data that is committed can be read.
* The only uncommitted data that can be read is the data modified by the current transaction.
*/
-public interface TransactionContext {
+public interface TransactionContext extends DistributedPrimitive {
+
+ @Override
+ default DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.TRANSACTION_CONTEXT;
+ }
/**
- * Returns the unique transactionId.
+ * Returns the transaction identifier.
*
* @return transaction id
*/
- long transactionId();
+ TransactionId transactionId();
/**
* Returns if this transaction context is open.
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionContextBuilder.java b/core/api/src/main/java/org/onosproject/store/service/TransactionContextBuilder.java
index e9f3a02..8a43995 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionContextBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionContextBuilder.java
@@ -15,33 +15,14 @@
*/
package org.onosproject.store.service;
+import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
+
/**
- * Interface definition for a transaction context builder.
+ * Abstract base class for a transaction context builder.
*/
-public interface TransactionContextBuilder {
+public abstract class TransactionContextBuilder extends DistributedPrimitiveBuilder<TransactionContext> {
- /**
- * Disables distribution of map entries across multiple database partitions.
- * <p>
- * When partitioning is disabled, the returned map will have a single
- * partition that spans the entire cluster. Furthermore, the changes made to
- * the map are ephemeral and do not survive a full cluster restart.
- * </p>
- * <p>
- * Note: By default, partitions are enabled. This feature is intended to
- * simplify debugging.
- * </p>
- *
- * @return this TransactionalContextBuilder
- */
- TransactionContextBuilder withPartitionsDisabled();
-
- /**
- * Builds a TransactionContext based on configuration options supplied to this
- * builder.
- *
- * @return a new TransactionalContext
- * @throws java.lang.RuntimeException if a mandatory parameter is missing
- */
- TransactionContext build();
+ public TransactionContextBuilder() {
+ super(DistributedPrimitive.Type.TRANSACTION_CONTEXT);
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index f8e111b..cb78e27 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -32,6 +32,7 @@
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
@@ -40,8 +41,6 @@
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
import org.onosproject.store.primitives.resources.impl.RollbackResult;
-import org.onosproject.store.primitives.resources.impl.TransactionId;
-import org.onosproject.store.primitives.resources.impl.TransactionalMapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
@@ -66,7 +65,8 @@
MapEntryUpdateResult.Status.class,
MapUpdate.class,
MapUpdate.Type.class,
- TransactionalMapUpdate.class,
+ Transaction.class,
+ Transaction.State.class,
TransactionId.class,
PrepareResult.class,
CommitResult.class,
@@ -99,7 +99,8 @@
serializer.register(Match.class, factory);
serializer.register(MapEntryUpdateResult.class, factory);
serializer.register(MapEntryUpdateResult.Status.class, factory);
- serializer.register(TransactionalMapUpdate.class, factory);
+ serializer.register(Transaction.class, factory);
+ serializer.register(Transaction.State.class, factory);
serializer.register(PrepareResult.class, factory);
serializer.register(CommitResult.class, factory);
serializer.register(RollbackResult.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
index 587778c..d5cce9b2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -46,7 +47,6 @@
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
-import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -63,10 +63,12 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.ApplicationId;
-import org.onosproject.core.IdGenerator;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
@@ -79,7 +81,6 @@
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContextBuilder;
import org.slf4j.Logger;
@@ -112,7 +113,8 @@
protected NodeId localNodeId;
private TransactionManager transactionManager;
- private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
+ private final Supplier<TransactionId> transactionIdGenerator =
+ () -> TransactionId.from(UUID.randomUUID().toString());
private ApplicationListener appListener = new InternalApplicationListener();
@@ -212,7 +214,17 @@
Futures.getUnchecked(status);
- transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
+ AsyncConsistentMap<TransactionId, Transaction> transactions =
+ this.<TransactionId, Transaction>consistentMapBuilder()
+ .withName("onos-transactions")
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ MapUpdate.class,
+ MapUpdate.Type.class,
+ Transaction.class,
+ Transaction.State.class))
+ .buildAsyncMap();
+
+ transactionManager = new TransactionManager(partitionedDatabase, transactions);
partitionedDatabase.setTransactionManager(transactionManager);
log.info("Started");
@@ -238,7 +250,9 @@
@Override
public TransactionContextBuilder transactionContextBuilder() {
- return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
+ return new DefaultTransactionContextBuilder(this::consistentMapBuilder,
+ transactionManager::execute,
+ transactionIdGenerator.get());
}
@Override
@@ -385,8 +399,8 @@
}
@Override
- public Collection<Transaction> getTransactions() {
- return complete(transactionManager.getTransactions());
+ public Collection<TransactionId> getPendingTransactions() {
+ return complete(transactionManager.getPendingTransactionIds());
}
private static <T> T complete(CompletableFuture<T> future) {
@@ -402,11 +416,6 @@
}
}
- @Override
- public void redriveTransactions() {
- getTransactions().stream().forEach(transactionManager::execute);
- }
-
protected <K, V> DefaultAsyncConsistentMap<K, V> registerMap(DefaultAsyncConsistentMap<K, V> map) {
maps.put(map.name(), map);
if (map.applicationId() != null) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java
index 5c675eb..8352528 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java
@@ -17,7 +17,6 @@
package org.onosproject.store.primitives.impl;
import org.onlab.util.Match;
-import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
index 872bbeb..b91f4c9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
@@ -21,10 +21,10 @@
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.cluster.internal.MemberInfo;
@@ -69,13 +69,14 @@
private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
.nextId(KryoNamespace.FLOATING_ID)
.register(Versioned.class)
- .register(DatabaseUpdate.class)
- .register(DatabaseUpdate.Type.class)
+ .register(MapUpdate.class)
+ .register(MapUpdate.Type.class)
.register(Result.class)
.register(UpdateResult.class)
.register(Result.Status.class)
- .register(DefaultTransaction.class)
+ .register(Transaction.class)
.register(Transaction.State.class)
+ .register(TransactionId.class)
.register(org.onosproject.store.primitives.impl.CommitResponse.class)
.register(Match.class)
.register(NodeId.class)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java
index de308f5..58801e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java
@@ -22,7 +22,6 @@
import net.kuujo.copycat.state.StateContext;
import org.onlab.util.Match;
-import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabase.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabase.java
index 53edcbf..e4de296 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabase.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabase.java
@@ -26,7 +26,6 @@
import net.kuujo.copycat.util.function.TriConsumer;
import org.onlab.util.Match;
-import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
index 45869ff..d6d4ab4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
@@ -26,8 +26,8 @@
import net.kuujo.copycat.state.StateContext;
import org.onlab.util.Match;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.Versioned;
import java.util.Arrays;
@@ -278,7 +278,7 @@
return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
}
- private boolean isUpdatePossible(DatabaseUpdate update) {
+ private boolean isUpdatePossible(MapUpdate<String, byte[]> update) {
Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
switch (update.type()) {
case PUT:
@@ -299,7 +299,7 @@
}
}
- private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ private void doProvisionalUpdate(MapUpdate<String, byte[]> update, TransactionId transactionId) {
Map<String, Update> lockMap = getLockMap(update.mapName());
switch (update.type()) {
case PUT:
@@ -318,7 +318,8 @@
}
}
- private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ private UpdateResult<String, byte[]> commitProvisionalUpdate(
+ MapUpdate<String, byte[]> update, TransactionId transactionId) {
String mapName = update.mapName();
String key = update.key();
Update provisionalUpdate = getLockMap(mapName).get(key);
@@ -330,7 +331,7 @@
return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
}
- private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ private void undoProvisionalUpdate(MapUpdate<String, byte[]> update, TransactionId transactionId) {
String mapName = update.mapName();
String key = update.key();
Update provisionalUpdate = getLockMap(mapName).get(key);
@@ -342,7 +343,7 @@
}
}
- private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
+ private boolean isLockedByAnotherTransaction(String mapName, String key, TransactionId transactionId) {
Update update = getLockMap(mapName).get(key);
return update != null && !Objects.equal(transactionId, update.transactionId());
}
@@ -356,15 +357,15 @@
}
private class Update {
- private final long transactionId;
+ private final TransactionId transactionId;
private final byte[] value;
- public Update(long txId, byte[] value) {
+ public Update(TransactionId txId, byte[] value) {
this.transactionId = txId;
this.value = value;
}
- public long transactionId() {
+ public TransactionId transactionId() {
return this.transactionId;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransaction.java
deleted file mode 100644
index 406e901..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransaction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.List;
-
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * A Default transaction implementation.
- */
-public class DefaultTransaction implements Transaction {
-
- private final long transactionId;
- private final List<DatabaseUpdate> updates;
- private final State state;
- private final long lastUpdated;
-
- public DefaultTransaction(long transactionId, List<DatabaseUpdate> updates) {
- this(transactionId, updates, State.PREPARING, System.currentTimeMillis());
- }
-
- private DefaultTransaction(long transactionId, List<DatabaseUpdate> updates, State state, long lastUpdated) {
- this.transactionId = transactionId;
- this.updates = ImmutableList.copyOf(updates);
- this.state = state;
- this.lastUpdated = lastUpdated;
- }
-
- @Override
- public long id() {
- return transactionId;
- }
-
- @Override
- public List<DatabaseUpdate> updates() {
- return updates;
- }
-
- @Override
- public State state() {
- return state;
- }
-
- @Override
- public Transaction transition(State newState) {
- return new DefaultTransaction(transactionId, updates, newState, System.currentTimeMillis());
- }
-
- @Override
- public long lastUpdated() {
- return lastUpdated;
- }
-}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index 8cbe99f..d165bc1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -18,14 +18,17 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.*;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
@@ -44,20 +47,20 @@
@SuppressWarnings("rawtypes")
private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap();
private boolean isOpen = false;
- private final Database database;
- private final long transactionId;
+ private final Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter;
+ private final TransactionId transactionId;
private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
- public DefaultTransactionContext(long transactionId,
- Database database,
+ public DefaultTransactionContext(TransactionId transactionId,
+ Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter,
Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
this.transactionId = transactionId;
- this.database = checkNotNull(database);
+ this.transactionCommitter = checkNotNull(transactionCommitter);
this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
}
@Override
- public long transactionId() {
+ public TransactionId transactionId() {
return transactionId;
}
@@ -91,13 +94,13 @@
public boolean commit() {
// TODO: rework commit implementation to be more intuitive
checkState(isOpen, TX_NOT_OPEN_ERROR);
- CommitResponse response = null;
+ CommitResult result = null;
try {
- List<DatabaseUpdate> updates = Lists.newLinkedList();
- txMaps.values().forEach(m -> updates.addAll(m.prepareDatabaseUpdates()));
- Transaction transaction = new DefaultTransaction(transactionId, updates);
- response = Futures.getUnchecked(database.prepareAndCommit(transaction));
- return response.success();
+ List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
+ txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates()));
+ Transaction transaction = new Transaction(transactionId, updates);
+ result = Futures.getUnchecked(transactionCommitter.apply(transaction));
+ return result == CommitResult.OK;
} catch (Exception e) {
abort();
return false;
@@ -128,4 +131,9 @@
});
return s.toString();
}
+
+ @Override
+ public String name() {
+ return transactionId.toString();
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
index 36b2831..99d62ca 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
@@ -15,6 +15,13 @@
*/
package org.onosproject.store.primitives.impl;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionContextBuilder;
@@ -22,29 +29,28 @@
* The default implementation of a transaction context builder. This builder
* generates a {@link DefaultTransactionContext}.
*/
-public class DefaultTransactionContextBuilder implements TransactionContextBuilder {
+public class DefaultTransactionContextBuilder extends TransactionContextBuilder {
- private boolean partitionsEnabled = true;
- private final DatabaseManager manager;
- private final long transactionId;
+ private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
+ private final Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter;
+ private final TransactionId transactionId;
- public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) {
- this.manager = manager;
+ public DefaultTransactionContextBuilder(Supplier<ConsistentMapBuilder> mapBuilderSupplier,
+ Function<Transaction, CompletableFuture<CommitResult>> transactionCommiter,
+ TransactionId transactionId) {
+ this.mapBuilderSupplier = mapBuilderSupplier;
+ this.transactionCommitter = transactionCommiter;
this.transactionId = transactionId;
}
@Override
- public TransactionContextBuilder withPartitionsDisabled() {
- partitionsEnabled = false;
- return this;
- }
-
- @Override
public TransactionContext build() {
- return new DefaultTransactionContext(
- transactionId,
- partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
- () -> partitionsEnabled ? manager.consistentMapBuilder()
- : manager.consistentMapBuilder().withPartitionsDisabled());
+ return new DefaultTransactionContext(transactionId, transactionCommitter, () -> {
+ ConsistentMapBuilder mapBuilder = mapBuilderSupplier.get();
+ if (partitionsDisabled()) {
+ mapBuilder = mapBuilder.withPartitionsDisabled();
+ }
+ return mapBuilder;
+ });
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
index dfeb4c1..32b3057 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
@@ -21,8 +21,8 @@
import java.util.Set;
import org.onlab.util.HexString;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
@@ -159,14 +159,14 @@
return latest;
}
- protected List<DatabaseUpdate> prepareDatabaseUpdates() {
- List<DatabaseUpdate> updates = Lists.newLinkedList();
+ protected List<MapUpdate<String, byte[]>> toMapUpdates() {
+ List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
deleteSet.forEach(key -> {
Versioned<V> original = readCache.get(key);
if (original != null) {
- updates.add(DatabaseUpdate.newBuilder()
+ updates.add(MapUpdate.<String, byte[]>newBuilder()
.withMapName(name)
- .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey(keyCache.getUnchecked(key))
.withCurrentVersion(original.version())
.build());
@@ -175,16 +175,16 @@
writeCache.forEach((key, value) -> {
Versioned<V> original = readCache.get(key);
if (original == null) {
- updates.add(DatabaseUpdate.newBuilder()
+ updates.add(MapUpdate.<String, byte[]>newBuilder()
.withMapName(name)
- .withType(DatabaseUpdate.Type.PUT_IF_ABSENT)
+ .withType(MapUpdate.Type.PUT_IF_ABSENT)
.withKey(keyCache.getUnchecked(key))
.withValue(serializer.encode(value))
.build());
} else {
- updates.add(DatabaseUpdate.newBuilder()
+ updates.add(MapUpdate.<String, byte[]>newBuilder()
.withMapName(name)
- .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.withKey(keyCache.getUnchecked(key))
.withCurrentVersion(original.version())
.withValue(serializer.encode(value))
@@ -199,7 +199,7 @@
public String toString() {
return MoreObjects.toStringHelper(this)
.add("backingMap", backingMap)
- .add("updates", prepareDatabaseUpdates())
+ .add("updates", toMapUpdates())
.toString();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
index fdcc59f..24674f8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
@@ -26,8 +26,8 @@
import net.kuujo.copycat.resource.ResourceState;
import org.onlab.util.Match;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
+import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
@@ -273,7 +273,9 @@
if (transactionManager == null) {
throw new IllegalStateException("TransactionManager is not initialized");
}
- return transactionManager.execute(transaction);
+ return transactionManager.execute(transaction)
+ .thenApply(r -> r == CommitResult.OK
+ ? CommitResponse.success(ImmutableList.of()) : CommitResponse.failure());
}
}
@@ -373,15 +375,15 @@
private Map<Database, Transaction> createSubTransactions(
Transaction transaction) {
- Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
- for (DatabaseUpdate update : transaction.updates()) {
+ Map<Database, List<MapUpdate<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
+ for (MapUpdate<String, byte[]> update : transaction.updates()) {
Database partition = partitioner.getPartition(update.mapName(), update.key());
- List<DatabaseUpdate> partitionUpdates =
+ List<MapUpdate<String, byte[]>> partitionUpdates =
perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
partitionUpdates.add(update);
}
Map<Database, Transaction> subTransactions = Maps.newHashMap();
- perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
+ perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new Transaction(transaction.id(), v)));
return subTransactions;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
new file mode 100644
index 0000000..ac5238a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.List;
+
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * An immutable transaction object.
+ */
+public class Transaction {
+
+ enum State {
+ /**
+ * Indicates a new transaction that is about to be prepared. All transactions
+ * start their life in this state.
+ */
+ PREPARING,
+
+ /**
+ * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+ */
+ PREPARED,
+
+ /**
+ * Indicates a transaction that is about to be committed.
+ */
+ COMMITTING,
+
+ /**
+ * Indicates a transaction that has successfully committed.
+ */
+ COMMITTED,
+
+ /**
+ * Indicates a transaction that is about to be rolled back.
+ */
+ ROLLINGBACK,
+
+ /**
+ * Indicates a transaction that has been rolled back and all locks are released.
+ */
+ ROLLEDBACK
+ }
+
+ private final TransactionId transactionId;
+ private final List<MapUpdate<String, byte[]>> updates;
+ private final State state;
+
+ public Transaction(TransactionId transactionId, List<MapUpdate<String, byte[]>> updates) {
+ this(transactionId, updates, State.PREPARING);
+ }
+
+ private Transaction(TransactionId transactionId,
+ List<MapUpdate<String, byte[]>> updates,
+ State state) {
+ this.transactionId = transactionId;
+ this.updates = ImmutableList.copyOf(updates);
+ this.state = state;
+ }
+
+ public TransactionId id() {
+ return transactionId;
+ }
+
+ public List<MapUpdate<String, byte[]>> updates() {
+ return updates;
+ }
+
+ public State state() {
+ return state;
+ }
+
+ public Transaction transition(State newState) {
+ return new Transaction(transactionId, updates, newState);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("transactionId", transactionId)
+ .add("updates", updates)
+ .add("state", state)
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
index 7cfc88b..30aebb1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
@@ -17,87 +17,62 @@
import static com.google.common.base.Preconditions.checkNotNull;
-import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-import org.onosproject.store.service.Transaction.State;
-import com.google.common.collect.ImmutableList;
+import static org.onosproject.store.primitives.impl.Transaction.State.COMMITTED;
+import static org.onosproject.store.primitives.impl.Transaction.State.COMMITTING;
+import static org.onosproject.store.primitives.impl.Transaction.State.ROLLEDBACK;
+import static org.onosproject.store.primitives.impl.Transaction.State.ROLLINGBACK;
/**
* Agent that runs the two phase commit protocol.
*/
public class TransactionManager {
- private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
- .register(KryoNamespaces.BASIC)
- .nextId(KryoNamespace.FLOATING_ID)
- .register(Versioned.class)
- .register(DatabaseUpdate.class)
- .register(DatabaseUpdate.Type.class)
- .register(DefaultTransaction.class)
- .register(Transaction.State.class)
- .build();
-
- private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
private final Database database;
- private final AsyncConsistentMap<Long, Transaction> transactions;
+ private final AsyncConsistentMap<TransactionId, Transaction> transactions;
- /**
- * Constructs a new TransactionManager for the specified database instance.
- *
- * @param database database
- * @param mapBuilder builder for ConsistentMap instances
- */
- public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
+ public TransactionManager(Database database, AsyncConsistentMap<TransactionId, Transaction> transactions) {
this.database = checkNotNull(database, "database cannot be null");
- this.transactions = mapBuilder.withName("onos-transactions")
- .withSerializer(serializer)
- .buildAsyncMap();
+ this.transactions = transactions;
}
/**
* Executes the specified transaction by employing a two phase commit protocol.
*
* @param transaction transaction to commit
- * @return transaction result. Result value true indicates a successful commit, false
- * indicates abort
+ * @return transaction commit result
*/
- public CompletableFuture<CommitResponse> execute(Transaction transaction) {
+ public CompletableFuture<CommitResult> execute(Transaction transaction) {
// clean up if this transaction in already in a terminal state.
- if (transaction.state() == Transaction.State.COMMITTED ||
- transaction.state() == Transaction.State.ROLLEDBACK) {
- return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of()));
- } else if (transaction.state() == Transaction.State.COMMITTING) {
+ if (transaction.state() == COMMITTED || transaction.state() == ROLLEDBACK) {
+ return transactions.remove(transaction.id()).thenApply(v -> CommitResult.OK);
+ } else if (transaction.state() == COMMITTING) {
return commit(transaction);
- } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
- return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of()));
+ } else if (transaction.state() == ROLLINGBACK) {
+ return rollback(transaction).thenApply(v -> CommitResult.FAILURE_TO_PREPARE);
} else {
return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
}
}
-
/**
- * Returns all transactions in the system.
+ * Returns all pending transaction identifiers.
*
- * @return future for a collection of transactions
+ * @return future for a collection of transaction identifiers.
*/
- public CompletableFuture<Collection<Transaction>> getTransactions() {
- return transactions.values().thenApply(c -> {
- Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
- return txns;
- });
+ public CompletableFuture<Collection<TransactionId>> getPendingTransactionIds() {
+ return transactions.values().thenApply(c -> c.stream()
+ .map(v -> v.value())
+ .filter(v -> v.state() != COMMITTED && v.state() != ROLLEDBACK)
+ .map(Transaction::id)
+ .collect(Collectors.toList()));
}
private CompletableFuture<Boolean> prepare(Transaction transaction) {
@@ -105,22 +80,25 @@
.thenCompose(v -> database.prepare(transaction))
.thenCompose(status -> transactions.put(
transaction.id(),
- transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
+ transaction.transition(status ? COMMITTING : ROLLINGBACK))
.thenApply(v -> status));
}
- private CompletableFuture<CommitResponse> commit(Transaction transaction) {
+ private CompletableFuture<CommitResult> commit(Transaction transaction) {
return database.commit(transaction)
- .whenComplete((r, e) -> transactions.put(
- transaction.id(),
- transaction.transition(Transaction.State.COMMITTED)));
+ .thenCompose(r -> {
+ if (r.success()) {
+ return transactions.put(transaction.id(), transaction.transition(COMMITTED))
+ .thenApply(v -> CommitResult.OK);
+ } else {
+ return CompletableFuture.completedFuture(CommitResult.FAILURE_DURING_COMMIT);
+ }
+ });
}
- private CompletableFuture<CommitResponse> rollback(Transaction transaction) {
+ private CompletableFuture<CommitResult> rollback(Transaction transaction) {
return database.rollback(transaction)
- .thenCompose(v -> transactions.put(
- transaction.id(),
- transaction.transition(Transaction.State.ROLLEDBACK)))
- .thenApply(v -> CommitResponse.failure());
+ .thenCompose(v -> transactions.put(transaction.id(), transaction.transition(ROLLEDBACK)))
+ .thenApply(v -> CommitResult.FAILURE_TO_PREPARE);
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
new file mode 100644
index 0000000..58957a2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.primitives.resources.impl.PrepareResult;
+import org.onosproject.store.primitives.resources.impl.RollbackResult;
+
+/**
+ * Participant in a two-phase commit protocol.
+ */
+public interface TransactionParticipant {
+
+ /**
+ * Attempts to execute the prepare phase for the specified {@link Transaction transaction}.
+ * @param transaction transaction
+ * @return future for prepare result
+ */
+ CompletableFuture<PrepareResult> prepare(Transaction transaction);
+
+ /**
+ * Attempts to execute the commit phase for previously prepared transaction.
+ * @param transactionId transaction identifier
+ * @return future for commit result
+ */
+ CompletableFuture<CommitResult> commit(TransactionId transactionId);
+
+ /**
+ * Attempts to execute the rollback phase for previously prepared transaction.
+ * @param transactionId transaction identifier
+ * @return future for rollback result
+ */
+ CompletableFuture<RollbackResult> rollback(TransactionId transactionId);
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 46487f2..5342d74 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -31,6 +31,9 @@
import java.util.function.Predicate;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.impl.Transaction;
+import org.onosproject.store.primitives.impl.TransactionParticipant;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
@@ -43,7 +46,7 @@
*/
@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
- implements AsyncConsistentMap<String, byte[]> {
+ implements AsyncConsistentMap<String, byte[]>, TransactionParticipant {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
@@ -235,18 +238,6 @@
});
}
- public CompletableFuture<PrepareResult> prepare(TransactionalMapUpdate<String, byte[]> update) {
- return submit(new AtomixConsistentMapCommands.TransactionPrepare(update));
- }
-
- public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
- return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
- }
-
- public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
- return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
- }
-
@Override
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
if (!mapEventListeners.isEmpty()) {
@@ -274,6 +265,21 @@
}
}
+ @Override
+ public CompletableFuture<PrepareResult> prepare(Transaction transaction) {
+ return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction));
+ }
+
+ @Override
+ public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
+ return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
+ }
+
+ @Override
+ public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
+ return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
+ }
+
/**
* Change listener context.
*/
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index c463320..a5dd232 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -28,6 +28,8 @@
import java.util.Set;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
@@ -207,35 +209,35 @@
*/
@SuppressWarnings("serial")
public static class TransactionPrepare extends MapCommand<PrepareResult> {
- private TransactionalMapUpdate<String, byte[]> update;
+ private Transaction transaction;
public TransactionPrepare() {
}
- public TransactionPrepare(TransactionalMapUpdate<String, byte[]> update) {
- this.update = update;
+ public TransactionPrepare(Transaction transaction) {
+ this.transaction = transaction;
}
- public TransactionalMapUpdate<String, byte[]> transactionUpdate() {
- return update;
+ public Transaction transaction() {
+ return transaction;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
- serializer.writeObject(update, buffer);
+ serializer.writeObject(transaction, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
- update = serializer.readObject(buffer);
+ transaction = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("update", update)
+ .add("transaction", transaction)
.toString();
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 100941f..de22a75 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -37,6 +37,8 @@
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
@@ -382,9 +384,8 @@
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
boolean ok = false;
try {
- TransactionalMapUpdate<String, byte[]> transactionUpdate = commit
- .operation().transactionUpdate();
- for (MapUpdate<String, byte[]> update : transactionUpdate.batch()) {
+ Transaction transaction = commit.operation().transaction();
+ for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
if (preparedKeys.contains(key)) {
return PrepareResult.CONCURRENT_TRANSACTION;
@@ -403,8 +404,8 @@
// No violations detected. Add to pendingTranctions and mark
// modified keys as
// currently locked to updates.
- pendingTransactions.put(transactionUpdate.transactionId(), commit);
- transactionUpdate.batch().forEach(u -> preparedKeys.add(u.key()));
+ pendingTransactions.put(transaction.id(), commit);
+ transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
} finally {
@@ -429,16 +430,15 @@
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
}
- TransactionalMapUpdate<String, byte[]> transactionalUpdate = prepareCommit
- .operation().transactionUpdate();
- long totalReferencesToCommit = transactionalUpdate
- .batch()
+ Transaction transaction = prepareCommit.operation().transaction();
+ long totalReferencesToCommit = transaction
+ .updates()
.stream()
.filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.count();
CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
- for (MapUpdate<String, byte[]> update : transactionalUpdate.batch()) {
+ for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
MapEntryValue previousValue = mapEntries.remove(key);
MapEntryValue newValue = null;
@@ -473,8 +473,10 @@
if (prepareCommit == null) {
return RollbackResult.UNKNOWN_TRANSACTION_ID;
} else {
- prepareCommit.operation().transactionUpdate().batch()
- .forEach(u -> preparedKeys.remove(u.key()));
+ prepareCommit.operation()
+ .transaction()
+ .updates()
+ .forEach(u -> preparedKeys.remove(u.key()));
prepareCommit.close();
return RollbackResult.OK;
}
@@ -608,9 +610,8 @@
@Override
public byte[] value() {
- TransactionalMapUpdate<String, byte[]> update = completer.object()
- .operation().transactionUpdate();
- return update.valueForKey(key);
+ Transaction transaction = completer.object().operation().transaction();
+ return valueForKey(key, transaction);
}
@Override
@@ -622,5 +623,14 @@
public void discard() {
completer.countDown();
}
+
+ private byte[] valueForKey(String key, Transaction transaction) {
+ MapUpdate<String, byte[]> update = transaction.updates()
+ .stream()
+ .filter(u -> u.key().equals(key))
+ .findFirst()
+ .orElse(null);
+ return update == null ? null : update.value();
+ }
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java
index 2b43f83..ae1e1d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java
@@ -28,4 +28,14 @@
* Signifies a failure due to unrecognized transaction identifier.
*/
UNKNOWN_TRANSACTION_ID,
+
+ /**
+ * Signifies a failure to get participants to agree to commit (during prepare stage).
+ */
+ FAILURE_TO_PREPARE,
+
+ /**
+ * Failure during commit phase.
+ */
+ FAILURE_DURING_COMMIT
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
index 7858595..bc6d645 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
@@ -29,6 +29,8 @@
* Both old and new values are accessible along with a flag that indicates if the
* the value was updated. If flag is false, oldValue and newValue both
* point to the same unmodified value.
+ *
+ * @param <K> key type
* @param <V> result type
*/
public class MapEntryUpdateResult<K, V> {
@@ -123,6 +125,8 @@
* @param keyTransform transformer to use for transcoding keys
* @param valueMapper mapper to use for transcoding values
* @return new instance
+ * @param <K1> key type of returned {@code MapEntryUpdateResult}
+ * @param <V1> value type of returned {@code MapEntryUpdateResult}
*/
public <K1, V1> MapEntryUpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
return new MapEntryUpdateResult<>(status,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
index c105a08..dc80c56 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
@@ -70,6 +70,7 @@
REMOVE_IF_VALUE_MATCH,
}
+ private String mapName;
private Type type;
private K key;
private V value;
@@ -77,6 +78,15 @@
private long currentVersion = -1;
/**
+ * Returns the name of the map.
+ *
+ * @return map name
+ */
+ public String mapName() {
+ return mapName;
+ }
+
+ /**
* Returns the type of update operation.
* @return type of update.
*/
@@ -119,6 +129,7 @@
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
+ .add("mapName", mapName)
.add("type", type)
.add("key", key)
.add("value", value)
@@ -153,6 +164,11 @@
return update;
}
+ public Builder<K, V> withMapName(String name) {
+ update.mapName = checkNotNull(name, "name cannot be null");
+ return this;
+ }
+
public Builder<K, V> withType(Type type) {
update.type = checkNotNull(type, "type cannot be null");
return this;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java
index 0465743..fa57f05 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java
@@ -25,6 +25,11 @@
OK,
/**
+ * Signifies some participants in a distributed prepare operation failed.
+ */
+ PARTIAL_FAILURE,
+
+ /**
* Signifies a failure to another transaction locking the underlying state.
*/
CONCURRENT_TRANSACTION,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java
deleted file mode 100644
index 6e83b69..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2016 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.resources.impl;
-
-import java.util.Collection;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-
-/**
- * A batch updates to an {@code AsyncConsistentMap} be committed as a transaction.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class TransactionalMapUpdate<K, V> {
- private final TransactionId transactionId;
- private final Collection<MapUpdate<K, V>> updates;
- private boolean indexPopulated = false;
- private final Map<K, V> keyValueIndex = Maps.newHashMap();
-
- public TransactionalMapUpdate(TransactionId transactionId, Collection<MapUpdate<K, V>> updates) {
- this.transactionId = transactionId;
- this.updates = ImmutableList.copyOf(updates);
- populateIndex();
- }
-
- /**
- * Returns the transaction identifier.
- * @return transaction id
- */
- public TransactionId transactionId() {
- return transactionId;
- }
-
- /**
- * Returns the collection of map updates.
- * @return map updates
- */
- public Collection<MapUpdate<K, V>> batch() {
- return updates;
- }
-
- /**
- * Returns the value that will be associated with the key after this transaction commits.
- * @param key key
- * @return value that will be associated with the value once this transaction commits
- */
- public V valueForKey(K key) {
- if (!indexPopulated) {
- // We do not synchronize as we don't expect this called to be made from multiple threads.
- populateIndex();
- }
- return keyValueIndex.get(key);
- }
-
- /**
- * Populates the internal key -> value mapping.
- */
- private synchronized void populateIndex() {
- updates.forEach(mapUpdate -> {
- if (mapUpdate.value() != null) {
- keyValueIndex.put(mapUpdate.key(), mapUpdate.value());
- }
- });
- indexPopulated = true;
- }
-}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMapTest.java
index a630d1a..68e7ecb 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMapTest.java
@@ -41,7 +41,6 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 5711a88..21d8edc 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,8 @@
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
@@ -351,10 +353,9 @@
.withValue(value1)
.build();
- TransactionalMapUpdate<String, byte[]> txMapUpdate =
- new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+ Transaction tx = new Transaction(TransactionId.from("tx1"), Arrays.asList(update1));
- map.prepare(txMapUpdate).thenAccept(result -> {
+ map.prepare(tx).thenAccept(result -> {
assertEquals(PrepareResult.OK, result);
}).join();
assertNull(listener.event());
@@ -376,7 +377,7 @@
assertNull(listener.event());
- map.commit(txMapUpdate.transactionId()).join();
+ map.commit(tx.id()).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
@@ -406,14 +407,13 @@
.withKey("foo")
.withValue(value1)
.build();
- TransactionalMapUpdate<String, byte[]> txMapUpdate =
- new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
- map.prepare(txMapUpdate).thenAccept(result -> {
+ Transaction tx = new Transaction(TransactionId.from("tx1"), Arrays.asList(update1));
+ map.prepare(tx).thenAccept(result -> {
assertEquals(PrepareResult.OK, result);
}).join();
assertNull(listener.event());
- map.rollback(txMapUpdate.transactionId()).join();
+ map.rollback(tx.id()).join();
assertNull(listener.event());
map.get("foo").thenAccept(result -> {
diff --git a/core/api/src/test/java/org/onosproject/store/service/DatabaseUpdateTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/MapUpdateTest.java
similarity index 74%
rename from core/api/src/test/java/org/onosproject/store/service/DatabaseUpdateTest.java
rename to core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/MapUpdateTest.java
index ad439d6..6a0db93 100644
--- a/core/api/src/test/java/org/onosproject/store/service/DatabaseUpdateTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/MapUpdateTest.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.store.service;
+package org.onosproject.store.primitives.resources.impl;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
@@ -22,63 +22,63 @@
import static org.hamcrest.Matchers.is;
/**
- * Unit Tests for DatabseUpdate class.
+ * Unit Tests for MapUpdate class.
*/
-public class DatabaseUpdateTest {
+public class MapUpdateTest {
- private final DatabaseUpdate stats1 = DatabaseUpdate.newBuilder()
+ private final MapUpdate<String, byte[]> stats1 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
- .withType(DatabaseUpdate.Type.PUT)
+ .withType(MapUpdate.Type.PUT)
.build();
- private final DatabaseUpdate stats2 = DatabaseUpdate.newBuilder()
+ private final MapUpdate<String, byte[]> stats2 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
- .withType(DatabaseUpdate.Type.REMOVE)
+ .withType(MapUpdate.Type.REMOVE)
.build();
- private final DatabaseUpdate stats3 = DatabaseUpdate.newBuilder()
+ private final MapUpdate<String, byte[]> stats3 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
- .withType(DatabaseUpdate.Type.REMOVE_IF_VALUE_MATCH)
+ .withType(MapUpdate.Type.REMOVE_IF_VALUE_MATCH)
.build();
- private final DatabaseUpdate stats4 = DatabaseUpdate.newBuilder()
+ private final MapUpdate<String, byte[]> stats4 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
- .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.build();
- private final DatabaseUpdate stats5 = DatabaseUpdate.newBuilder()
+ private final MapUpdate<String, byte[]> stats5 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
- .withType(DatabaseUpdate.Type.PUT_IF_VALUE_MATCH)
+ .withType(MapUpdate.Type.PUT_IF_VALUE_MATCH)
.build();
- private final DatabaseUpdate stats6 = DatabaseUpdate.newBuilder()
+ private final MapUpdate<String, byte[]> stats6 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
- .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.build();
/**
@@ -91,7 +91,7 @@
assertThat(stats1.currentVersion(), is(3L));
assertThat(stats1.key(), is("4"));
assertThat(stats1.mapName(), is("5"));
- assertThat(stats1.type(), is(DatabaseUpdate.Type.PUT));
+ assertThat(stats1.type(), is(MapUpdate.Type.PUT));
}
/**
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index b92db06..13092fe 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -199,6 +199,7 @@
import org.onosproject.net.resource.link.MplsLabelResourceRequest;
import org.onosproject.security.Permission;
import org.onosproject.store.Timestamp;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.Versioned;
@@ -483,6 +484,7 @@
.register(new ExtensionCriterionSerializer(), ExtensionCriterion.class)
.register(ExtensionSelectorType.class)
.register(ExtensionTreatmentType.class)
+ .register(TransactionId.class)
.register(Versioned.class)
.register(MapEvent.class)
.register(MapEvent.Type.class)