[ONOS-6345] Track tombstones within transactions for optimistic locking on null values
Change-Id: Ib4764721e512462ec1552124ff696b8f89687d8f
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
index e04f10e..f720ded 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
@@ -31,7 +31,6 @@
*
* @param <K> map key type
* @param <V> map value type
- *
*/
public final class MapUpdate<K, V> {
@@ -39,47 +38,38 @@
* Type of database update operation.
*/
public enum Type {
- /**
- * Insert/Update entry without any checks.
- */
- PUT,
/**
- * Insert an entry iff there is no existing entry for that key.
+ * Acquires a read lock on a key.
+ * <p>
+ * This record type will check to ensure that the lock version matches the current version for the key
+ * in the map and acquire a read lock on the key for the duration of the transaction.
*/
- PUT_IF_ABSENT,
+ LOCK,
/**
- * Update entry if the current version matches specified version.
+ * Checks the version of a key without locking the key.
+ * <p>
+ * This record type will perform a simple version check during the prepare phase of the two-phase commit
+ * protocol to ensure that the key has not changed during a transaction.
+ */
+ VERSION_MATCH,
+
+ /**
+ * Updates an entry if the current version matches specified version.
*/
PUT_IF_VERSION_MATCH,
/**
- * Update entry if the current value matches specified value.
- */
- PUT_IF_VALUE_MATCH,
-
- /**
- * Remove entry without any checks.
- */
- REMOVE,
-
- /**
- * Remove entry if the current version matches specified version.
+ * Removes an entry if the current version matches specified version.
*/
REMOVE_IF_VERSION_MATCH,
-
- /**
- * Remove entry if the current value matches specified value.
- */
- REMOVE_IF_VALUE_MATCH,
}
private Type type;
private K key;
private V value;
- private V currentValue;
- private long currentVersion = -1;
+ private long version = -1;
/**
* Returns the type of update operation.
@@ -106,19 +96,11 @@
}
/**
- * Returns the expected current value for the key.
- * @return current value in database.
- */
- public V currentValue() {
- return currentValue;
- }
-
- /**
* Returns the expected current version in the database for the key.
* @return expected version.
*/
- public long currentVersion() {
- return currentVersion;
+ public long version() {
+ return version;
}
/**
@@ -135,14 +117,13 @@
.withType(type)
.withKey(keyMapper.apply(key))
.withValue(value == null ? null : valueMapper.apply(value))
- .withCurrentValue(currentValue == null ? null : valueMapper.apply(currentValue))
- .withCurrentVersion(currentVersion)
+ .withVersion(version)
.build();
}
@Override
public int hashCode() {
- return Objects.hash(type, key, value, currentValue, currentVersion);
+ return Objects.hash(type, key, value, version);
}
@Override
@@ -152,8 +133,7 @@
return this.type == that.type
&& Objects.equals(this.key, that.key)
&& Objects.equals(this.value, that.value)
- && Objects.equals(this.currentValue, that.currentValue)
- && Objects.equals(this.currentVersion, that.currentVersion);
+ && Objects.equals(this.version, that.version);
}
return false;
}
@@ -164,8 +144,7 @@
.add("type", type)
.add("key", key)
.add("value", value instanceof byte[] ? new ByteArraySizeHashPrinter((byte[]) value) : value)
- .add("currentValue", currentValue)
- .add("currentVersion", currentVersion)
+ .add("version", version)
.toString();
}
@@ -205,48 +184,38 @@
return this;
}
- public Builder<K, V> withCurrentValue(V value) {
- update.currentValue = value;
- return this;
- }
-
public Builder<K, V> withValue(V value) {
update.value = value;
return this;
}
- public Builder<K, V> withCurrentVersion(long version) {
- update.currentVersion = version;
+ public Builder<K, V> withVersion(long version) {
+ update.version = version;
return this;
}
private void validateInputs() {
checkNotNull(update.type, "type must be specified");
- checkNotNull(update.key, "key must be specified");
switch (update.type) {
- case PUT:
- case PUT_IF_ABSENT:
- checkNotNull(update.value, "value must be specified.");
- break;
- case PUT_IF_VERSION_MATCH:
- checkNotNull(update.value, "value must be specified.");
- checkState(update.currentVersion >= 0, "current version must be specified");
- break;
- case PUT_IF_VALUE_MATCH:
- checkNotNull(update.value, "value must be specified.");
- checkNotNull(update.currentValue, "currentValue must be specified.");
- break;
- case REMOVE:
- break;
- case REMOVE_IF_VERSION_MATCH:
- checkState(update.currentVersion >= 0, "current version must be specified");
- break;
- case REMOVE_IF_VALUE_MATCH:
- checkNotNull(update.currentValue, "currentValue must be specified.");
- break;
- default:
- throw new IllegalStateException("Unknown operation type");
+ case VERSION_MATCH:
+ break;
+ case LOCK:
+ checkNotNull(update.key, "key must be specified");
+ checkState(update.version >= 0, "version must be specified");
+ break;
+ case PUT_IF_VERSION_MATCH:
+ checkNotNull(update.key, "key must be specified");
+ checkNotNull(update.value, "value must be specified.");
+ checkState(update.version >= 0, "version must be specified");
+ break;
+ case REMOVE_IF_VERSION_MATCH:
+ checkNotNull(update.key, "key must be specified");
+ checkState(update.version >= 0, "version must be specified");
+ break;
+ default:
+ throw new IllegalStateException("Unknown operation type");
}
+
}
}
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java b/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java
index c367f5d..ab84c3a 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionLog.java
@@ -31,10 +31,12 @@
*/
public class TransactionLog<T> {
private final TransactionId transactionId;
+ private final long version;
private final List<T> records;
- public TransactionLog(TransactionId transactionId, List<T> records) {
+ public TransactionLog(TransactionId transactionId, long version, List<T> records) {
this.transactionId = transactionId;
+ this.version = version;
this.records = ImmutableList.copyOf(records);
}
@@ -48,6 +50,15 @@
}
/**
+ * Returns the transaction lock version.
+ *
+ * @return the transaction lock version
+ */
+ public long version() {
+ return version;
+ }
+
+ /**
* Returns the list of transaction log records.
*
* @return a list of transaction log records
@@ -75,6 +86,7 @@
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("transactionId", transactionId)
+ .add("version", version)
.add("records", records)
.toString();
}
@@ -88,6 +100,6 @@
* @param <U> record type of returned instance
*/
public <U> TransactionLog<U> map(Function<T, U> mapper) {
- return new TransactionLog<>(transactionId, Lists.transform(records, mapper::apply));
+ return new TransactionLog<>(transactionId, version, Lists.transform(records, mapper::apply));
}
}
\ No newline at end of file
diff --git a/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java b/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
index 53cc160..6f16e97 100644
--- a/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
+++ b/core/api/src/test/java/org/onosproject/store/primitives/MapUpdateTest.java
@@ -29,49 +29,27 @@
public class MapUpdateTest {
private final MapUpdate<String, byte[]> stats1 = MapUpdate.<String, byte[]>newBuilder()
- .withCurrentValue("1".getBytes())
.withValue("2".getBytes())
- .withCurrentVersion(3)
+ .withVersion(3)
.withKey("4")
- .withType(MapUpdate.Type.PUT)
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.build();
private final MapUpdate<String, byte[]> stats2 = MapUpdate.<String, byte[]>newBuilder()
- .withCurrentValue("1".getBytes())
- .withValue("2".getBytes())
- .withCurrentVersion(3)
- .withKey("4")
- .withType(MapUpdate.Type.REMOVE)
+ .withType(MapUpdate.Type.VERSION_MATCH)
+ .withVersion(10)
.build();
private final MapUpdate<String, byte[]> stats3 = MapUpdate.<String, byte[]>newBuilder()
- .withCurrentValue("1".getBytes())
.withValue("2".getBytes())
- .withCurrentVersion(3)
- .withKey("4")
- .withType(MapUpdate.Type.REMOVE_IF_VALUE_MATCH)
- .build();
-
- private final MapUpdate<String, byte[]> stats4 = MapUpdate.<String, byte[]>newBuilder()
- .withCurrentValue("1".getBytes())
- .withValue("2".getBytes())
- .withCurrentVersion(3)
+ .withVersion(3)
.withKey("4")
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.build();
- private final MapUpdate<String, byte[]> stats5 = MapUpdate.<String, byte[]>newBuilder()
- .withCurrentValue("1".getBytes())
+ private final MapUpdate<String, byte[]> stats4 = MapUpdate.<String, byte[]>newBuilder()
.withValue("2".getBytes())
- .withCurrentVersion(3)
- .withKey("4")
- .withType(MapUpdate.Type.PUT_IF_VALUE_MATCH)
- .build();
-
- private final MapUpdate<String, byte[]> stats6 = MapUpdate.<String, byte[]>newBuilder()
- .withCurrentValue("1".getBytes())
- .withValue("2".getBytes())
- .withCurrentVersion(3)
+ .withVersion(3)
.withKey("4")
.withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.build();
@@ -81,11 +59,10 @@
*/
@Test
public void testConstruction() {
- assertThat(stats1.currentValue(), is("1".getBytes()));
assertThat(stats1.value(), is("2".getBytes()));
- assertThat(stats1.currentVersion(), is(3L));
+ assertThat(stats1.version(), is(3L));
assertThat(stats1.key(), is("4"));
- assertThat(stats1.type(), is(MapUpdate.Type.PUT));
+ assertThat(stats1.type(), is(MapUpdate.Type.PUT_IF_VERSION_MATCH));
}
/**
@@ -102,11 +79,6 @@
.addEqualityGroup(stats3, stats3)
.addEqualityGroup(stats4)
.testEquals();
-
- new EqualsTester()
- .addEqualityGroup(stats5, stats5)
- .addEqualityGroup(stats6)
- .testEquals();
}
/**
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
index 9acc377..19f6e0c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
@@ -22,6 +22,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
/**
@@ -37,13 +38,19 @@
@Override
protected V read(K key) {
- Versioned<V> value = readCache.computeIfAbsent(key, backingMap::get);
- return value != null ? value.value() : null;
+ Versioned<V> value = backingMap.getOrDefault(key, null);
+ readCache.put(key, value);
+ return value.value();
}
@Override
- protected Stream<MapUpdate<K, V>> records() {
- return Stream.concat(deleteStream(), writeStream());
+ public boolean hasPendingUpdates() {
+ return !writeCache.isEmpty() || !deleteSet.isEmpty();
+ }
+
+ @Override
+ protected Stream<MapUpdate<K, V>> records(Version lockVersion) {
+ return Stream.concat(deleteStream(), writeStream(lockVersion));
}
/**
@@ -52,34 +59,26 @@
private Stream<MapUpdate<K, V>> deleteStream() {
return deleteSet.stream()
.map(key -> Pair.of(key, readCache.get(key)))
- .filter(e -> e.getValue() != null)
.map(e -> MapUpdate.<K, V>newBuilder()
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey(e.getKey())
- .withCurrentVersion(e.getValue().version())
+ .withVersion(e.getValue().version())
.build());
}
/**
* Returns a transaction record stream for updated keys.
*/
- private Stream<MapUpdate<K, V>> writeStream() {
- return writeCache.entrySet().stream().map(entry -> {
- Versioned<V> original = readCache.get(entry.getKey());
- if (original == null) {
- return MapUpdate.<K, V>newBuilder()
- .withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey(entry.getKey())
- .withValue(entry.getValue())
- .build();
- } else {
- return MapUpdate.<K, V>newBuilder()
- .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
- .withKey(entry.getKey())
- .withCurrentVersion(original.version())
- .withValue(entry.getValue())
- .build();
- }
- });
+ private Stream<MapUpdate<K, V>> writeStream(Version lockVersion) {
+ return writeCache.entrySet().stream()
+ .map(entry -> {
+ Versioned<V> original = readCache.get(entry.getKey());
+ return MapUpdate.<K, V>newBuilder()
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey(entry.getKey())
+ .withValue(entry.getValue())
+ .withVersion(Math.max(original.version(), lockVersion.value()))
+ .build();
+ });
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index 3ea2066..d74748b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -110,6 +110,7 @@
protected final Transactional<T> transactionalObject;
private final AtomicBoolean open = new AtomicBoolean();
private volatile State state = State.ACTIVE;
+ private volatile Version lock;
public Transaction(TransactionId transactionId, Transactional<T> transactionalObject) {
this.transactionId = transactionId;
@@ -192,7 +193,10 @@
*/
public CompletableFuture<Version> begin() {
open();
- return transactionalObject.begin(transactionId);
+ return transactionalObject.begin(transactionId).thenApply(lock -> {
+ this.lock = lock;
+ return lock;
+ });
}
/**
@@ -209,8 +213,10 @@
public CompletableFuture<Boolean> prepare(List<T> updates) {
checkOpen();
checkActive();
+ Version lock = this.lock;
+ checkState(lock != null, TX_INACTIVE_ERROR);
setState(State.PREPARING);
- return transactionalObject.prepare(new TransactionLog<T>(transactionId, updates))
+ return transactionalObject.prepare(new TransactionLog<T>(transactionId, lock.value(), updates))
.thenApply(succeeded -> {
setState(State.PREPARED);
return succeeded;
@@ -229,8 +235,10 @@
public CompletableFuture<Boolean> prepareAndCommit(List<T> updates) {
checkOpen();
checkActive();
+ Version lock = this.lock;
+ checkState(lock != null, TX_INACTIVE_ERROR);
setState(State.PREPARING);
- return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, updates))
+ return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, lock.value(), updates))
.thenApply(succeeded -> {
setState(State.COMMITTED);
return succeeded;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
index 4429a1b..977312a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -16,7 +16,6 @@
package org.onosproject.store.primitives.impl;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -60,7 +59,6 @@
protected final Transaction<MapUpdate<K, V>> transaction;
protected final Map<K, V> writeCache = Maps.newConcurrentMap();
protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
- protected final List<MapUpdate<K, V>> log = new ArrayList<>();
protected volatile Version lock;
protected TransactionalMapParticipant(
@@ -77,18 +75,22 @@
* synchronized prior to a read.
*/
private void beginTransaction() {
- if (!transaction.isOpen()) {
- try {
- lock = transaction.begin()
- .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new TransactionException.Interrupted();
- } catch (TimeoutException e) {
- throw new TransactionException.Timeout();
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause());
- throw new TransactionException(e.getCause());
+ if (lock == null) {
+ synchronized (this) {
+ if (lock == null) {
+ try {
+ lock = transaction.begin()
+ .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TransactionException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new TransactionException.Timeout();
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause());
+ throw new TransactionException(e.getCause());
+ }
+ }
}
}
}
@@ -183,13 +185,8 @@
}
@Override
- public boolean hasPendingUpdates() {
- return records().findAny().isPresent();
- }
-
- @Override
public CompletableFuture<Boolean> prepare() {
- return transaction.prepare(log());
+ return transaction.prepare(log(lock));
}
@Override
@@ -199,7 +196,7 @@
@Override
public CompletableFuture<Boolean> prepareAndCommit() {
- return transaction.prepareAndCommit(log());
+ return transaction.prepareAndCommit(log(lock));
}
@Override
@@ -210,24 +207,25 @@
/**
* Returns a list of updates performed within this map partition.
*
+ * @param lockVersion the global transaction lock version
* @return a list of map updates
*/
- protected List<MapUpdate<K, V>> log() {
- return records().collect(Collectors.toList());
+ protected List<MapUpdate<K, V>> log(Version lockVersion) {
+ return records(lockVersion).collect(Collectors.toList());
}
/**
* Returns a stream of updates performed within this map partition.
*
+ * @param lockVersion the global transaction lock version
* @return a stream of map updates
*/
- protected abstract Stream<MapUpdate<K, V>> records();
+ protected abstract Stream<MapUpdate<K, V>> records(Version lockVersion);
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("backingMap", backingMap)
- .add("updates", log())
.toString();
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 8a13c01..7220c37 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -298,7 +298,7 @@
@Override
public CompletableFuture<Version> begin(TransactionId transactionId) {
- return client.submit(new TransactionBegin()).thenApply(Version::new);
+ return client.submit(new TransactionBegin(transactionId)).thenApply(Version::new);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 0c5dd76..8dd4096 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -15,6 +15,11 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.MoreObjects;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
@@ -24,19 +29,12 @@
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.Command;
import io.atomix.copycat.Query;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
import org.onlab.util.Match;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
-import com.google.common.base.MoreObjects;
-
/**
* {@link AtomixConsistentMap} resource state machine operations.
*/
@@ -203,12 +201,32 @@
}
/**
- * Transaction begin query.
+ * Transaction begin command.
*/
- public static class TransactionBegin extends MapQuery<Long> {
+ public static class TransactionBegin extends MapCommand<Long> {
+ private TransactionId transactionId;
+
+ public TransactionBegin() {
+ }
+
+ public TransactionBegin(TransactionId transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public TransactionId transactionId() {
+ return transactionId;
+ }
+
@Override
- public ConsistencyLevel consistency() {
- return ConsistencyLevel.LINEARIZABLE;
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ serializer.writeObject(transactionId, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ transactionId = serializer.readObject(buffer);
}
}
@@ -264,7 +282,7 @@
@Override
public CompactionMode compaction() {
- return CompactionMode.QUORUM;
+ return CompactionMode.TOMBSTONE;
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 433b3b3..2fc91c8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -15,11 +15,6 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import static com.google.common.base.Preconditions.checkState;
-import static org.onosproject.store.service.MapEvent.Type.INSERT;
-import static org.onosproject.store.service.MapEvent.Type.REMOVE;
-import static org.onosproject.store.service.MapEvent.Type.UPDATE;
-import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
@@ -36,7 +31,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.onlab.util.CountDownCompleter;
@@ -71,6 +65,12 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.store.service.MapEvent.Type.INSERT;
+import static org.onosproject.store.service.MapEvent.Type.REMOVE;
+import static org.onosproject.store.service.MapEvent.Type.UPDATE;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* State Machine for {@link AtomixConsistentMap} resource.
*/
@@ -80,8 +80,8 @@
private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
- private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
- private AtomicLong versionCounter = new AtomicLong(0);
+ private final Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+ private long currentVersion;
public AtomixConsistentMapState(Properties properties) {
super(properties);
@@ -89,12 +89,10 @@
@Override
public void snapshot(SnapshotWriter writer) {
- writer.writeLong(versionCounter.get());
}
@Override
public void install(SnapshotReader reader) {
- versionCounter = new AtomicLong(reader.readLong());
}
@Override
@@ -141,7 +139,8 @@
*/
protected boolean containsKey(Commit<? extends ContainsKey> commit) {
try {
- return toVersioned(mapEntries.get(commit.operation().key())) != null;
+ MapEntryValue value = mapEntries.get(commit.operation().key());
+ return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
} finally {
commit.close();
}
@@ -155,9 +154,9 @@
*/
protected boolean containsValue(Commit<? extends ContainsValue> commit) {
try {
- Match<byte[]> valueMatch = Match
- .ifValue(commit.operation().value());
+ Match<byte[]> valueMatch = Match.ifValue(commit.operation().value());
return mapEntries.values().stream()
+ .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
.anyMatch(value -> valueMatch.matches(value.value()));
} finally {
commit.close();
@@ -186,8 +185,14 @@
*/
protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
try {
- Versioned<byte[]> value = toVersioned(mapEntries.get(commit.operation().key()));
- return value != null ? value : new Versioned<>(commit.operation().defaultValue(), 0);
+ MapEntryValue value = mapEntries.get(commit.operation().key());
+ if (value == null) {
+ return new Versioned<>(commit.operation().defaultValue(), 0);
+ } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
+ return new Versioned<>(commit.operation().defaultValue(), value.version);
+ } else {
+ return new Versioned<>(value.value(), value.version);
+ }
} finally {
commit.close();
}
@@ -201,7 +206,9 @@
*/
protected int size(Commit<? extends Size> commit) {
try {
- return mapEntries.size();
+ return (int) mapEntries.values().stream()
+ .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+ .count();
} finally {
commit.close();
}
@@ -215,7 +222,8 @@
*/
protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
try {
- return mapEntries.isEmpty();
+ return mapEntries.values().stream()
+ .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
} finally {
commit.close();
}
@@ -229,7 +237,10 @@
*/
protected Set<String> keySet(Commit<? extends KeySet> commit) {
try {
- return mapEntries.keySet().stream().collect(Collectors.toSet());
+ return mapEntries.entrySet().stream()
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
} finally {
commit.close();
}
@@ -243,7 +254,10 @@
*/
protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
try {
- return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
+ return mapEntries.entrySet().stream()
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(entry -> toVersioned(entry.getValue()))
+ .collect(Collectors.toList());
} finally {
commit.close();
}
@@ -258,11 +272,9 @@
*/
protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
try {
- return mapEntries
- .entrySet()
- .stream()
- .map(e -> Maps.immutableEntry(e.getKey(),
- toVersioned(e.getValue())))
+ return mapEntries.entrySet().stream()
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
.collect(Collectors.toSet());
} finally {
commit.close();
@@ -289,21 +301,34 @@
}
byte[] newValue = commit.operation().value();
- long newVersion = versionCounter.incrementAndGet();
+ currentVersion = commit.index();
Versioned<byte[]> newMapValue = newValue == null ? null
- : new Versioned<>(newValue, newVersion);
+ : new Versioned<>(newValue, currentVersion);
MapEvent.Type updateType = newValue == null ? REMOVE
: oldCommitValue == null ? INSERT : UPDATE;
+
+ // If a value existed in the map, remove and discard the value to ensure disk can be freed.
if (updateType == REMOVE || updateType == UPDATE) {
mapEntries.remove(key);
oldCommitValue.discard();
}
+
+ // If this is an insert/update commit, add the commit to the map entries.
if (updateType == INSERT || updateType == UPDATE) {
- mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
+ mapEntries.put(key, new NonTransactionalCommit(commit));
+ } else if (!activeTransactions.isEmpty()) {
+ // If this is a delete but transactions are currently running, ensure tombstones are retained
+ // for version checks.
+ TombstoneCommit tombstone = new TombstoneCommit(
+ commit.index(),
+ new CountDownCompleter<>(commit, 1, Commit::close));
+ mapEntries.put(key, tombstone);
} else {
+ // If no transactions are in progress, we can safely delete the key from memory.
commit.close();
}
+
publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
newMapValue);
@@ -387,7 +412,9 @@
*/
protected long begin(Commit<? extends TransactionBegin> commit) {
try {
- return commit.index();
+ long version = commit.index();
+ activeTransactions.put(commit.operation().transactionId(), new TransactionScope(version));
+ return version;
} finally {
commit.close();
}
@@ -401,11 +428,18 @@
*/
protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
PrepareResult prepareResult = prepare(commit);
+ TransactionScope transactionScope =
+ activeTransactions.remove(commit.operation().transactionLog().transactionId());
+ this.currentVersion = commit.index();
if (prepareResult == PrepareResult.OK) {
-
- commitInternal(commit.operation().transactionLog().transactionId()/*, commit.index()*/);
+ transactionScope = transactionScope.prepared(commit);
+ commit(transactionScope);
+ } else {
+ transactionScope.close();
}
+ discardTombstones();
return prepareResult;
+
}
/**
@@ -416,30 +450,72 @@
*/
protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
boolean ok = false;
+
try {
- TransactionLog<MapUpdate<String, byte[]>> transaction = commit.operation().transactionLog();
- for (MapUpdate<String, byte[]> update : transaction.records()) {
- String key = update.key();
+ TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.operation().transactionLog();
+
+ // Iterate through records in the transaction log and perform isolation checks.
+ for (MapUpdate<String, byte[]> record : transactionLog.records()) {
+ String key = record.key();
+
+ // If the record is a VERSION_MATCH then check that the record's version matches the current
+ // version of the state machine.
+ if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
+ if (record.version() > currentVersion) {
+ return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+ } else {
+ continue;
+ }
+ }
+
+ // If the prepared keys already contains the key contained within the record, that indicates a
+ // conflict with a concurrent transaction.
if (preparedKeys.contains(key)) {
return PrepareResult.CONCURRENT_TRANSACTION;
}
+
+ // Read the existing value from the map.
MapEntryValue existingValue = mapEntries.get(key);
+
+ // Note: if the existing value is null, that means the key has not changed during the transaction,
+ // otherwise a tombstone would have been retained.
if (existingValue == null) {
- if (update.type() != MapUpdate.Type.PUT_IF_ABSENT) {
+ // If the value is null, ensure the version is equal to the transaction version.
+ if (record.version() != transactionLog.version()) {
return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
}
} else {
- if (existingValue.version() != update.currentVersion()) {
+ // If the value is non-null, compare the current version with the record version.
+ if (existingValue.version() > record.version()) {
return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
}
}
}
- // No violations detected. Add to pendingTransactions and mark
- // modified keys as locked for updates.
- pendingTransactions.put(transaction.transactionId(), commit);
- transaction.records().forEach(u -> preparedKeys.add(u.key()));
+
+ // No violations detected. Mark modified keys locked for transactions.
+ transactionLog.records().forEach(record -> {
+ if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+ preparedKeys.add(record.key());
+ }
+ });
+
ok = true;
- return PrepareResult.OK;
+
+ // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
+ // coordinator is communicating with another node. Transactions assume that the client is communicating
+ // with a single leader in order to limit the overhead of retaining tombstones.
+ TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
+ if (transactionScope == null) {
+ activeTransactions.put(
+ transactionLog.transactionId(),
+ new TransactionScope(transactionLog.version(), commit));
+ return PrepareResult.PARTIAL_FAILURE;
+ } else {
+ activeTransactions.put(
+ transactionLog.transactionId(),
+ transactionScope.prepared(commit));
+ return PrepareResult.OK;
+ }
} catch (Exception e) {
log.warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
@@ -458,43 +534,72 @@
*/
protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
TransactionId transactionId = commit.operation().transactionId();
+ TransactionScope transactionScope = activeTransactions.remove(transactionId);
+ if (transactionScope == null) {
+ return CommitResult.UNKNOWN_TRANSACTION_ID;
+ }
+
try {
- return commitInternal(transactionId);
+ this.currentVersion = commit.index();
+ return commit(transactionScope.committed(commit));
} catch (Exception e) {
log.warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
} finally {
- commit.close();
+ discardTombstones();
}
}
- private CommitResult commitInternal(TransactionId transactionId) {
- Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
- .remove(transactionId);
- if (prepareCommit == null) {
- return CommitResult.UNKNOWN_TRANSACTION_ID;
- }
- TransactionLog<MapUpdate<String, byte[]>> transaction = prepareCommit.operation().transactionLog();
- long totalReferencesToCommit = transaction
- .records()
- .stream()
- .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ /**
+ * Applies committed operations to the state machine.
+ */
+ private CommitResult commit(TransactionScope transactionScope) {
+ TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
+ boolean retainTombstones = !activeTransactions.isEmpty();
+
+ // Count the total number of keys that will be set by this transaction. This is necessary to do reference
+ // counting for garbage collection.
+ long totalReferencesToCommit = transactionLog.records().stream()
+ // No keys are set for version checks. For deletes, references are only retained of tombstones
+ // need to be retained for concurrent transactions.
+ .filter(record -> record.type() != MapUpdate.Type.VERSION_MATCH && record.type() != MapUpdate.Type.LOCK
+ && (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH || retainTombstones))
.count();
- CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
- new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+
+ // Create a count down completer that counts references to the transaction commit for garbage collection.
+ CountDownCompleter<TransactionScope> completer = new CountDownCompleter<>(
+ transactionScope, totalReferencesToCommit, TransactionScope::close);
+
List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
- for (MapUpdate<String, byte[]> update : transaction.records()) {
- String key = update.key();
+ for (MapUpdate<String, byte[]> record : transactionLog.records()) {
+ if (record.type() == MapUpdate.Type.VERSION_MATCH) {
+ continue;
+ }
+
+ String key = record.key();
checkState(preparedKeys.remove(key), "key is not prepared");
+
+ if (record.type() == MapUpdate.Type.LOCK) {
+ continue;
+ }
+
MapEntryValue previousValue = mapEntries.remove(key);
MapEntryValue newValue = null;
- if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
- newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
+
+ // If the record is not a delete, create a transactional commit.
+ if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+ newValue = new TransactionalCommit(currentVersion, record.value(), completer);
+ } else if (retainTombstones) {
+ // For deletes, if tombstones need to be retained then create and store a tombstone commit.
+ newValue = new TombstoneCommit(currentVersion, completer);
}
+
eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+
if (newValue != null) {
mapEntries.put(key, newValue);
}
+
if (previousValue != null) {
previousValue.discard();
}
@@ -511,20 +616,57 @@
*/
protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
TransactionId transactionId = commit.operation().transactionId();
- try {
- Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
- if (prepareCommit == null) {
- return RollbackResult.UNKNOWN_TRANSACTION_ID;
- } else {
- prepareCommit.operation()
- .transactionLog()
- .records()
- .forEach(u -> preparedKeys.remove(u.key()));
- prepareCommit.close();
- return RollbackResult.OK;
- }
- } finally {
+ TransactionScope transactionScope = activeTransactions.remove(transactionId);
+ if (transactionScope == null) {
+ return RollbackResult.UNKNOWN_TRANSACTION_ID;
+ } else if (!transactionScope.isPrepared()) {
+ discardTombstones();
+ transactionScope.close();
commit.close();
+ return RollbackResult.OK;
+ } else {
+ try {
+ transactionScope.transactionLog().records()
+ .forEach(record -> {
+ if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+ preparedKeys.remove(record.key());
+ }
+ });
+ return RollbackResult.OK;
+ } finally {
+ discardTombstones();
+ transactionScope.close();
+ commit.close();
+ }
+ }
+
+ }
+
+ /**
+ * Discards tombstones no longer needed by active transactions.
+ */
+ private void discardTombstones() {
+ if (activeTransactions.isEmpty()) {
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+ while (iterator.hasNext()) {
+ MapEntryValue value = iterator.next().getValue();
+ if (value.type() == MapEntryValue.Type.TOMBSTONE) {
+ iterator.remove();
+ value.discard();
+ }
+ }
+ } else {
+ long lowWaterMark = activeTransactions.values().stream()
+ .mapToLong(TransactionScope::version)
+ .min().getAsLong();
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+ while (iterator.hasNext()) {
+ MapEntryValue value = iterator.next().getValue();
+ if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
+ iterator.remove();
+ value.discard();
+ }
+ }
}
}
@@ -558,7 +700,8 @@
* @return versioned instance
*/
private Versioned<byte[]> toVersioned(MapEntryValue value) {
- return value == null ? null : new Versioned<>(value.value(), value.version());
+ return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
+ ? new Versioned<>(value.value(), value.version()) : null;
}
/**
@@ -599,52 +742,73 @@
/**
* Interface implemented by map values.
*/
- private interface MapEntryValue {
+ private abstract static class MapEntryValue {
+ protected final Type type;
+ protected final long version;
+
+ MapEntryValue(Type type, long version) {
+ this.type = type;
+ this.version = version;
+ }
+
/**
- * Returns the raw {@code byte[]}.
+ * Returns the value type.
*
- * @return raw value
+ * @return the value type
*/
- byte[] value();
+ Type type() {
+ return type;
+ }
/**
* Returns the version of the value.
*
* @return version
*/
- long version();
+ long version() {
+ return version;
+ }
+
+ /**
+ * Returns the raw {@code byte[]}.
+ *
+ * @return raw value
+ */
+ abstract byte[] value();
/**
* Discards the value by invoke appropriate clean up actions.
*/
- void discard();
+ abstract void discard();
+
+ /**
+ * Value type.
+ */
+ enum Type {
+ VALUE,
+ TOMBSTONE,
+ }
}
/**
* A {@code MapEntryValue} that is derived from a non-transactional update
* i.e. via any standard map update operation.
*/
- private class NonTransactionalCommit implements MapEntryValue {
- private final long version;
+ private static class NonTransactionalCommit extends MapEntryValue {
private final Commit<? extends UpdateAndGet> commit;
- public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
- this.version = version;
+ NonTransactionalCommit(Commit<? extends UpdateAndGet> commit) {
+ super(Type.VALUE, commit.index());
this.commit = commit;
}
@Override
- public byte[] value() {
+ byte[] value() {
return commit.operation().value();
}
@Override
- public long version() {
- return version;
- }
-
- @Override
- public void discard() {
+ void discard() {
commit.close();
}
}
@@ -653,43 +817,135 @@
* A {@code MapEntryValue} that is derived from updates submitted via a
* transaction.
*/
- private class TransactionalCommit implements MapEntryValue {
- private final String key;
+ private static class TransactionalCommit extends MapEntryValue {
+ private final byte[] value;
+ private final CountDownCompleter<?> completer;
+
+ TransactionalCommit(long version, byte[] value, CountDownCompleter<?> completer) {
+ super(Type.VALUE, version);
+ this.value = value;
+ this.completer = completer;
+ }
+
+ @Override
+ byte[] value() {
+ return value;
+ }
+
+ @Override
+ void discard() {
+ completer.countDown();
+ }
+ }
+
+ /**
+ * A {@code MapEntryValue} that represents a deleted entry.
+ */
+ private static class TombstoneCommit extends MapEntryValue {
+ private final CountDownCompleter<?> completer;
+
+ public TombstoneCommit(long version, CountDownCompleter<?> completer) {
+ super(Type.TOMBSTONE, version);
+ this.completer = completer;
+ }
+
+ @Override
+ byte[] value() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ void discard() {
+ completer.countDown();
+ }
+ }
+
+ /**
+ * Map transaction scope.
+ */
+ private static final class TransactionScope {
private final long version;
- private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
+ private final Commit<? extends TransactionPrepare> prepareCommit;
+ private final Commit<? extends TransactionCommit> commitCommit;
- public TransactionalCommit(
- String key,
+ private TransactionScope(long version) {
+ this(version, null, null);
+ }
+
+ private TransactionScope(
long version,
- CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
- this.key = key;
+ Commit<? extends TransactionPrepare> prepareCommit) {
+ this(version, prepareCommit, null);
+ }
+
+ private TransactionScope(
+ long version,
+ Commit<? extends TransactionPrepare> prepareCommit,
+ Commit<? extends TransactionCommit> commitCommit) {
this.version = version;
- this.completer = commit;
+ this.prepareCommit = prepareCommit;
+ this.commitCommit = commitCommit;
}
- @Override
- public byte[] value() {
- TransactionLog<MapUpdate<String, byte[]>> transaction = completer.object().operation().transactionLog();
- return valueForKey(key, transaction);
- }
-
- @Override
- public long version() {
+ /**
+ * Returns the transaction version.
+ *
+ * @return the transaction version
+ */
+ long version() {
return version;
}
- @Override
- public void discard() {
- completer.countDown();
+ /**
+ * Returns whether this is a prepared transaction scope.
+ *
+ * @return whether this is a prepared transaction scope
+ */
+ boolean isPrepared() {
+ return prepareCommit != null;
}
- private byte[] valueForKey(String key, TransactionLog<MapUpdate<String, byte[]>> transaction) {
- MapUpdate<String, byte[]> update = transaction.records()
- .stream()
- .filter(u -> u.key().equals(key))
- .findFirst()
- .orElse(null);
- return update == null ? null : update.value();
+ /**
+ * Returns the transaction commit log.
+ *
+ * @return the transaction commit log
+ */
+ TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
+ checkState(isPrepared());
+ return prepareCommit.operation().transactionLog();
+ }
+
+ /**
+ * Returns a new transaction scope with a prepare commit.
+ *
+ * @param commit the prepare commit
+ * @return new transaction scope updated with the prepare commit
+ */
+ TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
+ return new TransactionScope(version, commit);
+ }
+
+ /**
+ * Returns a new transaction scope with a commit commit.
+ *
+ * @param commit the commit commit ;-)
+ * @return new transaction scope updated with the commit commit
+ */
+ TransactionScope committed(Commit<? extends TransactionCommit> commit) {
+ checkState(isPrepared());
+ return new TransactionScope(version, prepareCommit, commit);
+ }
+
+ /**
+ * Closes the transaction and all associated commits.
+ */
+ void close() {
+ if (prepareCommit != null) {
+ prepareCommit.close();
+ }
+ if (commitCommit != null) {
+ commitCommit.close();
+ }
}
}
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
index 533ea97..ef03935 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
@@ -74,7 +74,7 @@
expect(asyncMap.begin(transactionId))
.andReturn(CompletableFuture.completedFuture(new Version(1)));
- expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+ expect(asyncMap.prepare(new TransactionLog<>(transactionId, 1, updates)))
.andReturn(CompletableFuture.completedFuture(true));
expect(asyncMap.commit(transactionId))
.andReturn(CompletableFuture.completedFuture(null));
@@ -117,7 +117,7 @@
expect(asyncMap.begin(transactionId))
.andReturn(CompletableFuture.completedFuture(new Version(1)));
- expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+ expect(asyncMap.prepare(new TransactionLog<>(transactionId, 1, updates)))
.andReturn(CompletableFuture.completedFuture(true));
replay(asyncMap);
@@ -160,16 +160,16 @@
.andReturn(CompletableFuture.completedFuture(new Version(1)));
expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.prepare(
- new TransactionLog<>(transactionId, Arrays.asList(
+ new TransactionLog<>(transactionId, 1, Arrays.asList(
MapUpdate.<String, String>newBuilder()
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey("foo")
- .withCurrentVersion(1)
+ .withVersion(1)
.build(),
MapUpdate.<String, String>newBuilder()
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey("baz")
- .withCurrentVersion(2)
+ .withVersion(2)
.build()
)))).andReturn(CompletableFuture.completedFuture(true));
@@ -181,11 +181,12 @@
.andReturn(CompletableFuture.completedFuture(new Version(1)));
expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.prepare(
- new TransactionLog<>(transactionId, Arrays.asList(
+ new TransactionLog<>(transactionId, 1, Arrays.asList(
MapUpdate.<String, String>newBuilder()
- .withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.withKey("bar")
.withValue("baz")
+ .withVersion(1)
.build()
)))).andReturn(CompletableFuture.completedFuture(true));
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index ac4c60d..284b57b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,7 @@
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import java.util.Arrays;
@@ -41,7 +42,6 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -92,6 +92,14 @@
}
/**
+ * Tests map transaction prepare.
+ */
+ @Test
+ public void testTransactionPrepare() throws Throwable {
+ transactionPrepareTests();
+ }
+
+ /**
* Tests map transaction commit.
*/
@Test
@@ -112,22 +120,12 @@
final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
AtomixConsistentMap map = createAtomixClient().getResource("testBasicMapOperationMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
map.isEmpty().thenAccept(result -> {
assertTrue(result);
}).join();
- map.getOrDefault("nothing", null).thenAccept(result -> {
- assertEquals(0, result.version());
- assertNull(result.value());
- }).join();
-
- map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
- assertEquals(0, result.version());
- assertArrayEquals("bar".getBytes(), result.value());
- }).join();
-
map.put("foo", rawFooValue).thenAccept(result -> {
assertNull(result);
}).join();
@@ -175,11 +173,6 @@
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
}).join();
- map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
- assertNotEquals(0, result.version());
- assertArrayEquals(rawFooValue, result.value());
- }).join();
-
map.remove("foo").thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
}).join();
@@ -257,7 +250,7 @@
final byte[] value3 = Tools.getBytesUtf8("value3");
AtomixConsistentMap map = createAtomixClient().getResource("testMapComputeOperationsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
@@ -295,7 +288,7 @@
final byte[] value3 = Tools.getBytesUtf8("value3");
AtomixConsistentMap map = createAtomixClient().getResource("testMapListenerMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
// add listener; insert new value into map and verify an INSERT event is received.
@@ -349,27 +342,104 @@
map.removeListener(listener).join();
}
+ protected void transactionPrepareTests() throws Throwable {
+ AtomixConsistentMap map = createAtomixClient().getResource("testPrepareTestsMap",
+ AtomixConsistentMap.class).join();
+
+ TransactionId transactionId1 = TransactionId.from("tx1");
+ TransactionId transactionId2 = TransactionId.from("tx2");
+ TransactionId transactionId3 = TransactionId.from("tx3");
+ TransactionId transactionId4 = TransactionId.from("tx4");
+
+ Version lock1 = map.begin(transactionId1).join();
+
+ MapUpdate<String, byte[]> update1 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("foo")
+ .withVersion(lock1.value())
+ .build();
+ MapUpdate<String, byte[]> update2 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("bar")
+ .withVersion(lock1.value())
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId1, lock1.value(), Arrays.asList(update1, update2)))
+ .thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ Version lock2 = map.begin(transactionId2).join();
+
+ MapUpdate<String, byte[]> update3 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("foo")
+ .withVersion(lock2.value())
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId2, lock2.value(), Arrays.asList(update3)))
+ .thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+ map.rollback(transactionId2).join();
+
+ Version lock3 = map.begin(transactionId3).join();
+
+ MapUpdate<String, byte[]> update4 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("baz")
+ .withVersion(0)
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId3, lock3.value(), Arrays.asList(update4)))
+ .thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+ map.rollback(transactionId3).join();
+
+ Version lock4 = map.begin(transactionId4).join();
+
+ MapUpdate<String, byte[]> update5 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("baz")
+ .withVersion(lock4.value())
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId4, lock4.value(), Arrays.asList(update5)))
+ .thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+ }
+
protected void transactionCommitTests() throws Throwable {
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
AtomixConsistentMap map = createAtomixClient().getResource("testCommitTestsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
- // PUT_IF_ABSENT
+ TransactionId transactionId = TransactionId.from("tx1");
+
+ // Begin the transaction.
+ Version lock = map.begin(transactionId).join();
+
+ // PUT_IF_VERSION_MATCH
MapUpdate<String, byte[]> update1 =
- MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey("foo")
- .withValue(value1)
- .build();
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey("foo")
+ .withValue(value1)
+ .withVersion(lock.value())
+ .build();
- TransactionLog<MapUpdate<String, byte[]>> tx =
- new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
-
- map.prepare(tx).thenAccept(result -> {
+ map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
assertEquals(true, result);
}).join();
// verify changes in Tx is not visible yet until commit
@@ -392,7 +462,7 @@
assertFalse(listener.eventReceived());
- map.commit(tx.transactionId()).join();
+ map.commit(transactionId).join();
MapEvent<String, byte[]> event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.INSERT, event.type());
@@ -407,19 +477,21 @@
assertEquals(MapEvent.Type.UPDATE, event.type());
assertTrue(Arrays.equals(value2, event.newValue().value()));
-
// REMOVE_IF_VERSION_MATCH
byte[] currFoo = map.get("foo").get().value();
long currFooVersion = map.get("foo").get().version();
MapUpdate<String, byte[]> remove1 =
MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
- .withKey("foo")
- .withCurrentVersion(currFooVersion)
- .build();
+ .withKey("foo")
+ .withVersion(currFooVersion)
+ .build();
- tx = new TransactionLog<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+ transactionId = TransactionId.from("tx2");
- map.prepare(tx).thenAccept(result -> {
+ // Begin the transaction.
+ map.begin(transactionId).join();
+
+ map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(remove1))).thenAccept(result -> {
assertTrue("prepare should succeed", result);
}).join();
// verify changes in Tx is not visible yet until commit
@@ -433,7 +505,7 @@
assertThat(result.value(), is(currFoo));
}).join();
- map.commit(tx.transactionId()).join();
+ map.commit(transactionId).join();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.REMOVE, event.type());
@@ -450,24 +522,28 @@
final byte[] value2 = Tools.getBytesUtf8("value2");
AtomixConsistentMap map = createAtomixClient().getResource("testTransactionRollbackTestsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
+ TransactionId transactionId = TransactionId.from("tx1");
+
+ Version lock = map.begin(transactionId).join();
+
MapUpdate<String, byte[]> update1 =
- MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey("foo")
- .withValue(value1)
- .build();
- TransactionLog<MapUpdate<String, byte[]>> tx =
- new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
- map.prepare(tx).thenAccept(result -> {
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey("foo")
+ .withValue(value1)
+ .withVersion(lock.value())
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
assertEquals(true, result);
}).join();
assertFalse(listener.eventReceived());
- map.rollback(tx.transactionId()).join();
+ map.rollback(transactionId).join();
assertFalse(listener.eventReceived());
map.get("foo").thenAccept(result -> {