AsyncConsistentMap methods for supporting transactional updates
Change-Id: Iaeb0aa0abf9f52d514a2c040598599a5b8a55ee8
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index cb78e27..92f6870 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -32,17 +32,18 @@
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
import org.onosproject.store.primitives.resources.impl.RollbackResult;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
@@ -65,7 +66,7 @@
MapEntryUpdateResult.Status.class,
MapUpdate.class,
MapUpdate.Type.class,
- Transaction.class,
+ MapTransaction.class,
Transaction.State.class,
TransactionId.class,
PrepareResult.class,
@@ -99,7 +100,7 @@
serializer.register(Match.class, factory);
serializer.register(MapEntryUpdateResult.class, factory);
serializer.register(MapEntryUpdateResult.Status.class, factory);
- serializer.register(Transaction.class, factory);
+ serializer.register(MapTransaction.class, factory);
serializer.register(Transaction.State.class, factory);
serializer.register(PrepareResult.class, factory);
serializer.register(CommitResult.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
index d5cce9b2..b6d5cd2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
@@ -65,8 +65,8 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AtomicCounterBuilder;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
index b91f4c9..ec9b926 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java
@@ -21,8 +21,8 @@
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.Versioned;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
index 958734c..97cd23b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
@@ -40,11 +40,13 @@
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.onosproject.utils.MeteringAgent;
@@ -491,6 +493,21 @@
return CompletableFuture.completedFuture(null);
}
+ @Override
+ public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+ return Tools.exceptionalFuture(new UnsupportedOperationException());
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return Tools.exceptionalFuture(new UnsupportedOperationException());
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return Tools.exceptionalFuture(new UnsupportedOperationException());
+ }
+
protected void notifyListeners(MapEvent<K, V> event) {
if (event == null) {
return;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
index d6d4ab4..027341e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
@@ -26,8 +26,8 @@
import net.kuujo.copycat.state.StateContext;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.Versioned;
import java.util.Arrays;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index d165bc1..b71902b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -24,9 +24,9 @@
import static com.google.common.base.Preconditions.*;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
@@ -84,7 +84,10 @@
checkNotNull(serializer);
return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
name,
- mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
+ mapBuilderSupplier.get()
+ .withName(name)
+ .withSerializer(serializer)
+ .buildAsyncMap(),
this,
serializer));
}
@@ -113,7 +116,7 @@
public void abort() {
if (isOpen) {
try {
- txMaps.values().forEach(m -> m.rollback());
+ txMaps.values().forEach(m -> m.abort());
} finally {
isOpen = false;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
index 32b3057..1aaf297 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
@@ -19,10 +19,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.onlab.util.HexString;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
@@ -46,11 +49,12 @@
* @param <K> key type
* @param <V> value type.
*/
-public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
+public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
private final TransactionContext txContext;
private static final String TX_CLOSED_ERROR = "Transaction is closed";
- private final ConsistentMap<K, V> backingMap;
+ private final AsyncConsistentMap<K, V> backingMap;
+ private final ConsistentMap<K, V> backingConsitentMap;
private final String name;
private final Serializer serializer;
private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
@@ -76,11 +80,12 @@
public DefaultTransactionalMap(
String name,
- ConsistentMap<K, V> backingMap,
+ AsyncConsistentMap<K, V> backingMap,
TransactionContext txContext,
Serializer serializer) {
this.name = name;
this.backingMap = backingMap;
+ this.backingConsitentMap = backingMap.asConsistentMap();
this.txContext = txContext;
this.serializer = serializer;
}
@@ -96,7 +101,7 @@
if (latest != null) {
return latest;
} else {
- Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k));
+ Versioned<V> v = readCache.computeIfAbsent(key, k -> backingConsitentMap.get(k));
return v != null ? v.value() : null;
}
}
@@ -159,6 +164,62 @@
return latest;
}
+ @Override
+ public CompletableFuture<Boolean> prepare() {
+ return backingMap.prepare(new MapTransaction<>(txContext.transactionId(), updates()));
+ }
+
+ @Override
+ public CompletableFuture<Void> commit() {
+ return backingMap.commit(txContext.transactionId());
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback() {
+ return backingMap.rollback(txContext.transactionId());
+ }
+
+ @Override
+ public boolean hasPendingUpdates() {
+ return updates().size() > 0;
+ }
+
+ protected List<MapUpdate<K, V>> updates() {
+ List<MapUpdate<K, V>> updates = Lists.newLinkedList();
+ deleteSet.forEach(key -> {
+ Versioned<V> original = readCache.get(key);
+ if (original != null) {
+ updates.add(MapUpdate.<K, V>newBuilder()
+ .withMapName(name)
+ .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withKey(key)
+ .withCurrentVersion(original.version())
+ .build());
+ }
+ });
+ writeCache.forEach((key, value) -> {
+ Versioned<V> original = readCache.get(key);
+ if (original == null) {
+ updates.add(MapUpdate.<K, V>newBuilder()
+ .withMapName(name)
+ .withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withKey(key)
+ .withValue(value)
+ .build());
+ } else {
+ updates.add(MapUpdate.<K, V>newBuilder()
+ .withMapName(name)
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey(key)
+ .withCurrentVersion(original.version())
+ .withValue(value)
+ .build());
+ }
+ });
+ return updates;
+ }
+
+
protected List<MapUpdate<String, byte[]>> toMapUpdates() {
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
deleteSet.forEach(key -> {
@@ -194,19 +255,18 @@
return updates;
}
- // TODO: build expected result Map processing DB updates?
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("backingMap", backingMap)
- .add("updates", toMapUpdates())
+ .add("updates", updates())
.toString();
}
/**
* Discards all changes made to this transactional map.
*/
- protected void rollback() {
+ protected void abort() {
readCache.clear();
writeCache.clear();
deleteSet.clear();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 76d7488..f1095d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -27,8 +27,10 @@
import java.util.function.Predicate;
import org.onosproject.core.ApplicationId;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
@@ -161,6 +163,21 @@
}
@Override
+ public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+ return delegateMap.prepare(transaction);
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return delegateMap.commit(transactionId);
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return delegateMap.rollback(transactionId);
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("delegateMap", delegateMap)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 3090436..6715ba7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -28,10 +28,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Lists;
@@ -198,6 +203,39 @@
.toArray(CompletableFuture[]::new));
}
+ @Override
+ public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+
+ Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
+ transaction.updates().forEach(update -> {
+ AsyncConsistentMap<K, V> map = getMap(update.key());
+ updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
+ });
+ Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
+ Maps.transformValues(updatesGroupedByMap,
+ list -> new MapTransaction<>(transaction.transactionId(), list));
+
+ return Tools.allOf(transactionsByMap.entrySet()
+ .stream()
+ .map(e -> e.getKey().prepare(e.getValue()))
+ .collect(Collectors.toList()))
+ .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return CompletableFuture.allOf(getMaps().stream()
+ .map(e -> e.commit(transactionId))
+ .toArray(CompletableFuture[]::new));
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return CompletableFuture.allOf(getMaps().stream()
+ .map(e -> e.rollback(transactionId))
+ .toArray(CompletableFuture[]::new));
+ }
+
/**
* Returns the map (partition) to which the specified key maps.
* @param key key
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
index 24674f8..31e22a8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java
@@ -26,8 +26,8 @@
import net.kuujo.copycat.resource.ResourceState;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.resources.impl.CommitResult;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index ac5238a..4c8b240 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -17,8 +17,8 @@
import java.util.List;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.MapUpdate;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
index 58957a2..66e1f3a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
@@ -17,34 +17,32 @@
import java.util.concurrent.CompletableFuture;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.CommitResult;
-import org.onosproject.store.primitives.resources.impl.PrepareResult;
-import org.onosproject.store.primitives.resources.impl.RollbackResult;
-
/**
* Participant in a two-phase commit protocol.
*/
public interface TransactionParticipant {
/**
- * Attempts to execute the prepare phase for the specified {@link Transaction transaction}.
- * @param transaction transaction
- * @return future for prepare result
+ * Returns if this participant has updates that need to be committed.
+ * @return {@code true} if yes; {@code false} otherwise
*/
- CompletableFuture<PrepareResult> prepare(Transaction transaction);
+ boolean hasPendingUpdates();
+
+ /**
+ * Executes the prepare phase.
+ * @return {@code true} is successful; {@code false} otherwise
+ */
+ CompletableFuture<Boolean> prepare();
/**
* Attempts to execute the commit phase for previously prepared transaction.
- * @param transactionId transaction identifier
- * @return future for commit result
+ * @return future that is completed when the operation completes
*/
- CompletableFuture<CommitResult> commit(TransactionId transactionId);
+ CompletableFuture<Void> commit();
/**
* Attempts to execute the rollback phase for previously prepared transaction.
- * @param transactionId transaction identifier
- * @return future for rollback result
+ * @return future that is completed when the operation completes
*/
- CompletableFuture<RollbackResult> rollback(TransactionId transactionId);
+ CompletableFuture<Void> rollback();
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 438d9b3..a2989ac 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -26,9 +26,11 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
@@ -196,6 +198,21 @@
}
}
+ @Override
+ public CompletableFuture<Boolean> prepare(MapTransaction<K1, V1> transaction) {
+ return backingMap.prepare(transaction.map(keyEncoder, valueEncoder));
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return backingMap.commit(transactionId);
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return backingMap.rollback(transactionId);
+ }
+
private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 5342d74..bd9690c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -32,11 +32,10 @@
import org.onlab.util.Match;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.impl.Transaction;
-import org.onosproject.store.primitives.impl.TransactionParticipant;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Sets;
@@ -46,7 +45,7 @@
*/
@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
- implements AsyncConsistentMap<String, byte[]>, TransactionParticipant {
+ implements AsyncConsistentMap<String, byte[]> {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
@@ -266,18 +265,21 @@
}
@Override
- public CompletableFuture<PrepareResult> prepare(Transaction transaction) {
- return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction));
+ public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
+ return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction))
+ .thenApply(v -> v == PrepareResult.OK);
}
@Override
- public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
- return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId))
+ .thenApply(v -> null);
}
@Override
- public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
- return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId))
+ .thenApply(v -> null);
}
/**
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index a5dd232..458e5fb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -29,7 +29,7 @@
import org.onlab.util.Match;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.impl.Transaction;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
@@ -209,35 +209,35 @@
*/
@SuppressWarnings("serial")
public static class TransactionPrepare extends MapCommand<PrepareResult> {
- private Transaction transaction;
+ private MapTransaction<String, byte[]> mapTransaction;
public TransactionPrepare() {
}
- public TransactionPrepare(Transaction transaction) {
- this.transaction = transaction;
+ public TransactionPrepare(MapTransaction<String, byte[]> mapTransaction) {
+ this.mapTransaction = mapTransaction;
}
- public Transaction transaction() {
- return transaction;
+ public MapTransaction<String, byte[]> transaction() {
+ return mapTransaction;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
- serializer.writeObject(transaction, buffer);
+ serializer.writeObject(mapTransaction, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
- transaction = serializer.readObject(buffer);
+ mapTransaction = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("transaction", transaction)
+ .add("mapTransaction", mapTransaction)
.toString();
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index de22a75..e580fed 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -37,10 +37,11 @@
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
@@ -384,7 +385,7 @@
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
boolean ok = false;
try {
- Transaction transaction = commit.operation().transaction();
+ MapTransaction<String, byte[]> transaction = commit.operation().transaction();
for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
if (preparedKeys.contains(key)) {
@@ -404,7 +405,7 @@
// No violations detected. Add to pendingTranctions and mark
// modified keys as
// currently locked to updates.
- pendingTransactions.put(transaction.id(), commit);
+ pendingTransactions.put(transaction.transactionId(), commit);
transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
@@ -430,7 +431,7 @@
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
}
- Transaction transaction = prepareCommit.operation().transaction();
+ MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
long totalReferencesToCommit = transaction
.updates()
.stream()
@@ -610,7 +611,7 @@
@Override
public byte[] value() {
- Transaction transaction = completer.object().operation().transaction();
+ MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
return valueForKey(key, transaction);
}
@@ -624,7 +625,7 @@
completer.countDown();
}
- private byte[] valueForKey(String key, Transaction transaction) {
+ private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
MapUpdate<String, byte[]> update = transaction.updates()
.stream()
.filter(u -> u.key().equals(key))
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
deleted file mode 100644
index dc80c56..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.resources.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Map update operation.
- *
- * @param <K> map key type
- * @param <V> map value type
- *
- */
-public final class MapUpdate<K, V> {
-
- /**
- * Type of database update operation.
- */
- public enum Type {
- /**
- * Insert/Update entry without any checks.
- */
- PUT,
- /**
- * Insert an entry iff there is no existing entry for that key.
- */
- PUT_IF_ABSENT,
-
- /**
- * Update entry if the current version matches specified version.
- */
- PUT_IF_VERSION_MATCH,
-
- /**
- * Update entry if the current value matches specified value.
- */
- PUT_IF_VALUE_MATCH,
-
- /**
- * Remove entry without any checks.
- */
- REMOVE,
-
- /**
- * Remove entry if the current version matches specified version.
- */
- REMOVE_IF_VERSION_MATCH,
-
- /**
- * Remove entry if the current value matches specified value.
- */
- REMOVE_IF_VALUE_MATCH,
- }
-
- private String mapName;
- private Type type;
- private K key;
- private V value;
- private V currentValue;
- private long currentVersion = -1;
-
- /**
- * Returns the name of the map.
- *
- * @return map name
- */
- public String mapName() {
- return mapName;
- }
-
- /**
- * Returns the type of update operation.
- * @return type of update.
- */
- public Type type() {
- return type;
- }
-
- /**
- * Returns the item key being updated.
- * @return item key
- */
- public K key() {
- return key;
- }
-
- /**
- * Returns the new value.
- * @return item's target value.
- */
- public V value() {
- return value;
- }
-
- /**
- * Returns the expected current value for the key.
- * @return current value in database.
- */
- public V currentValue() {
- return currentValue;
- }
-
- /**
- * Returns the expected current version in the database for the key.
- * @return expected version.
- */
- public long currentVersion() {
- return currentVersion;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("mapName", mapName)
- .add("type", type)
- .add("key", key)
- .add("value", value)
- .add("currentValue", currentValue)
- .add("currentVersion", currentVersion)
- .toString();
- }
-
- /**
- * Creates a new builder instance.
- *
- * @param <K> key type
- * @param <V> value type
- * @return builder.
- */
- public static <K, V> Builder<K, V> newBuilder() {
- return new Builder<>();
- }
-
- /**
- * MapUpdate builder.
- *
- * @param <K> key type
- * @param <V> value type
- */
- public static final class Builder<K, V> {
-
- private MapUpdate<K, V> update = new MapUpdate<>();
-
- public MapUpdate<K, V> build() {
- validateInputs();
- return update;
- }
-
- public Builder<K, V> withMapName(String name) {
- update.mapName = checkNotNull(name, "name cannot be null");
- return this;
- }
-
- public Builder<K, V> withType(Type type) {
- update.type = checkNotNull(type, "type cannot be null");
- return this;
- }
-
- public Builder<K, V> withKey(K key) {
- update.key = checkNotNull(key, "key cannot be null");
- return this;
- }
-
- public Builder<K, V> withCurrentValue(V value) {
- update.currentValue = checkNotNull(value, "currentValue cannot be null");
- return this;
- }
-
- public Builder<K, V> withValue(V value) {
- update.value = checkNotNull(value, "value cannot be null");
- return this;
- }
-
- public Builder<K, V> withCurrentVersion(long version) {
- checkArgument(version >= 0, "version cannot be negative");
- update.currentVersion = version;
- return this;
- }
-
- private void validateInputs() {
- checkNotNull(update.type, "type must be specified");
- checkNotNull(update.key, "key must be specified");
- switch (update.type) {
- case PUT:
- case PUT_IF_ABSENT:
- checkNotNull(update.value, "value must be specified.");
- break;
- case PUT_IF_VERSION_MATCH:
- checkNotNull(update.value, "value must be specified.");
- checkState(update.currentVersion >= 0, "current version must be specified");
- break;
- case PUT_IF_VALUE_MATCH:
- checkNotNull(update.value, "value must be specified.");
- checkNotNull(update.currentValue, "currentValue must be specified.");
- break;
- case REMOVE:
- break;
- case REMOVE_IF_VERSION_MATCH:
- checkState(update.currentVersion >= 0, "current version must be specified");
- break;
- case REMOVE_IF_VALUE_MATCH:
- checkNotNull(update.currentValue, "currentValue must be specified.");
- break;
- default:
- throw new IllegalStateException("Unknown operation type");
- }
- }
- }
-}