[ONOS-6345] Track tombstones within transactions for optimistic locking on null values
Change-Id: Ib4764721e512462ec1552124ff696b8f89687d8f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
index 9acc377..19f6e0c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
@@ -22,6 +22,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
/**
@@ -37,13 +38,19 @@
@Override
protected V read(K key) {
- Versioned<V> value = readCache.computeIfAbsent(key, backingMap::get);
- return value != null ? value.value() : null;
+ Versioned<V> value = backingMap.getOrDefault(key, null);
+ readCache.put(key, value);
+ return value.value();
}
@Override
- protected Stream<MapUpdate<K, V>> records() {
- return Stream.concat(deleteStream(), writeStream());
+ public boolean hasPendingUpdates() {
+ return !writeCache.isEmpty() || !deleteSet.isEmpty();
+ }
+
+ @Override
+ protected Stream<MapUpdate<K, V>> records(Version lockVersion) {
+ return Stream.concat(deleteStream(), writeStream(lockVersion));
}
/**
@@ -52,34 +59,26 @@
private Stream<MapUpdate<K, V>> deleteStream() {
return deleteSet.stream()
.map(key -> Pair.of(key, readCache.get(key)))
- .filter(e -> e.getValue() != null)
.map(e -> MapUpdate.<K, V>newBuilder()
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey(e.getKey())
- .withCurrentVersion(e.getValue().version())
+ .withVersion(e.getValue().version())
.build());
}
/**
* Returns a transaction record stream for updated keys.
*/
- private Stream<MapUpdate<K, V>> writeStream() {
- return writeCache.entrySet().stream().map(entry -> {
- Versioned<V> original = readCache.get(entry.getKey());
- if (original == null) {
- return MapUpdate.<K, V>newBuilder()
- .withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey(entry.getKey())
- .withValue(entry.getValue())
- .build();
- } else {
- return MapUpdate.<K, V>newBuilder()
- .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
- .withKey(entry.getKey())
- .withCurrentVersion(original.version())
- .withValue(entry.getValue())
- .build();
- }
- });
+ private Stream<MapUpdate<K, V>> writeStream(Version lockVersion) {
+ return writeCache.entrySet().stream()
+ .map(entry -> {
+ Versioned<V> original = readCache.get(entry.getKey());
+ return MapUpdate.<K, V>newBuilder()
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey(entry.getKey())
+ .withValue(entry.getValue())
+ .withVersion(Math.max(original.version(), lockVersion.value()))
+ .build();
+ });
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index 82b3919..8ad8cb8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -109,6 +109,7 @@
protected final Transactional<T> transactionalObject;
private final AtomicBoolean open = new AtomicBoolean();
private volatile State state = State.ACTIVE;
+ private volatile Version lock;
public Transaction(TransactionId transactionId, Transactional<T> transactionalObject) {
this.transactionId = transactionId;
@@ -191,7 +192,10 @@
*/
public CompletableFuture<Version> begin() {
open();
- return transactionalObject.begin(transactionId);
+ return transactionalObject.begin(transactionId).thenApply(lock -> {
+ this.lock = lock;
+ return lock;
+ });
}
/**
@@ -208,8 +212,10 @@
public CompletableFuture<Boolean> prepare(List<T> updates) {
checkOpen();
checkActive();
+ Version lock = this.lock;
+ checkState(lock != null, TX_INACTIVE_ERROR);
setState(State.PREPARING);
- return transactionalObject.prepare(new TransactionLog<T>(transactionId, updates))
+ return transactionalObject.prepare(new TransactionLog<T>(transactionId, lock.value(), updates))
.thenApply(succeeded -> {
setState(State.PREPARED);
return succeeded;
@@ -228,8 +234,10 @@
public CompletableFuture<Boolean> prepareAndCommit(List<T> updates) {
checkOpen();
checkActive();
+ Version lock = this.lock;
+ checkState(lock != null, TX_INACTIVE_ERROR);
setState(State.PREPARING);
- return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, updates))
+ return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, lock.value(), updates))
.thenApply(succeeded -> {
setState(State.COMMITTED);
return succeeded;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
index 4429a1b..977312a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -16,7 +16,6 @@
package org.onosproject.store.primitives.impl;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -60,7 +59,6 @@
protected final Transaction<MapUpdate<K, V>> transaction;
protected final Map<K, V> writeCache = Maps.newConcurrentMap();
protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
- protected final List<MapUpdate<K, V>> log = new ArrayList<>();
protected volatile Version lock;
protected TransactionalMapParticipant(
@@ -77,18 +75,22 @@
* synchronized prior to a read.
*/
private void beginTransaction() {
- if (!transaction.isOpen()) {
- try {
- lock = transaction.begin()
- .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new TransactionException.Interrupted();
- } catch (TimeoutException e) {
- throw new TransactionException.Timeout();
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause());
- throw new TransactionException(e.getCause());
+ if (lock == null) {
+ synchronized (this) {
+ if (lock == null) {
+ try {
+ lock = transaction.begin()
+ .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TransactionException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new TransactionException.Timeout();
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause());
+ throw new TransactionException(e.getCause());
+ }
+ }
}
}
}
@@ -183,13 +185,8 @@
}
@Override
- public boolean hasPendingUpdates() {
- return records().findAny().isPresent();
- }
-
- @Override
public CompletableFuture<Boolean> prepare() {
- return transaction.prepare(log());
+ return transaction.prepare(log(lock));
}
@Override
@@ -199,7 +196,7 @@
@Override
public CompletableFuture<Boolean> prepareAndCommit() {
- return transaction.prepareAndCommit(log());
+ return transaction.prepareAndCommit(log(lock));
}
@Override
@@ -210,24 +207,25 @@
/**
* Returns a list of updates performed within this map partition.
*
+ * @param lockVersion the global transaction lock version
* @return a list of map updates
*/
- protected List<MapUpdate<K, V>> log() {
- return records().collect(Collectors.toList());
+ protected List<MapUpdate<K, V>> log(Version lockVersion) {
+ return records(lockVersion).collect(Collectors.toList());
}
/**
* Returns a stream of updates performed within this map partition.
*
+ * @param lockVersion the global transaction lock version
* @return a stream of map updates
*/
- protected abstract Stream<MapUpdate<K, V>> records();
+ protected abstract Stream<MapUpdate<K, V>> records(Version lockVersion);
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("backingMap", backingMap)
- .add("updates", log())
.toString();
}
}
\ No newline at end of file