[ONOS-6297] - Use commit indexes for entry versioning in ConsistentMap state machine.
Change-Id: I6941888461ecb0016996ea0a01b017b8cba77ee1
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 84cd53a..a6473ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -36,7 +36,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.onlab.util.CountDownCompleter;
@@ -79,7 +78,6 @@
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
- private AtomicLong versionCounter = new AtomicLong(0);
public AtomixConsistentMapState(Properties properties) {
super(properties);
@@ -87,12 +85,10 @@
@Override
public void snapshot(SnapshotWriter writer) {
- writer.writeLong(versionCounter.get());
}
@Override
public void install(SnapshotReader reader) {
- versionCounter = new AtomicLong(reader.readLong());
}
@Override
@@ -271,7 +267,7 @@
}
byte[] newValue = commit.operation().value();
- long newVersion = versionCounter.incrementAndGet();
+ long newVersion = commit.index();
Versioned<byte[]> newMapValue = newValue == null ? null
: new Versioned<>(newValue, newVersion);
@@ -370,7 +366,7 @@
protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
PrepareResult prepareResult = prepare(commit);
if (prepareResult == PrepareResult.OK) {
- commitInternal(commit.operation().transaction().transactionId());
+ commitInternal(commit.operation().transaction().transactionId(), commit.index());
}
return prepareResult;
}
@@ -426,7 +422,7 @@
protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
- return commitInternal(transactionId);
+ return commitInternal(transactionId, commit.index());
} catch (Exception e) {
log.warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
@@ -435,7 +431,7 @@
}
}
- private CommitResult commitInternal(TransactionId transactionId) {
+ private CommitResult commitInternal(TransactionId transactionId, long version) {
Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
.remove(transactionId);
if (prepareCommit == null) {
@@ -456,7 +452,7 @@
MapEntryValue previousValue = mapEntries.remove(key);
MapEntryValue newValue = null;
if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
- newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
+ newValue = new TransactionalCommit(key, version, completer);
}
eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
if (newValue != null) {