[ONOS-6345] Track tombstones within transactions for optimistic locking on null values
Change-Id: Ib4764721e512462ec1552124ff696b8f89687d8f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
index 4429a1b..977312a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -16,7 +16,6 @@
package org.onosproject.store.primitives.impl;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -60,7 +59,6 @@
protected final Transaction<MapUpdate<K, V>> transaction;
protected final Map<K, V> writeCache = Maps.newConcurrentMap();
protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
- protected final List<MapUpdate<K, V>> log = new ArrayList<>();
protected volatile Version lock;
protected TransactionalMapParticipant(
@@ -77,18 +75,22 @@
* synchronized prior to a read.
*/
private void beginTransaction() {
- if (!transaction.isOpen()) {
- try {
- lock = transaction.begin()
- .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new TransactionException.Interrupted();
- } catch (TimeoutException e) {
- throw new TransactionException.Timeout();
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause());
- throw new TransactionException(e.getCause());
+ if (lock == null) {
+ synchronized (this) {
+ if (lock == null) {
+ try {
+ lock = transaction.begin()
+ .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TransactionException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new TransactionException.Timeout();
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause());
+ throw new TransactionException(e.getCause());
+ }
+ }
}
}
}
@@ -183,13 +185,8 @@
}
@Override
- public boolean hasPendingUpdates() {
- return records().findAny().isPresent();
- }
-
- @Override
public CompletableFuture<Boolean> prepare() {
- return transaction.prepare(log());
+ return transaction.prepare(log(lock));
}
@Override
@@ -199,7 +196,7 @@
@Override
public CompletableFuture<Boolean> prepareAndCommit() {
- return transaction.prepareAndCommit(log());
+ return transaction.prepareAndCommit(log(lock));
}
@Override
@@ -210,24 +207,25 @@
/**
* Returns a list of updates performed within this map partition.
*
+ * @param lockVersion the global transaction lock version
* @return a list of map updates
*/
- protected List<MapUpdate<K, V>> log() {
- return records().collect(Collectors.toList());
+ protected List<MapUpdate<K, V>> log(Version lockVersion) {
+ return records(lockVersion).collect(Collectors.toList());
}
/**
* Returns a stream of updates performed within this map partition.
*
+ * @param lockVersion the global transaction lock version
* @return a stream of map updates
*/
- protected abstract Stream<MapUpdate<K, V>> records();
+ protected abstract Stream<MapUpdate<K, V>> records(Version lockVersion);
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("backingMap", backingMap)
- .add("updates", log())
.toString();
}
}
\ No newline at end of file