[ONOS-6297] - Use commit indexes for entry versioning in ConsistentMap state machine.

Change-Id: I6941888461ecb0016996ea0a01b017b8cba77ee1
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 84cd53a..a6473ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -36,7 +36,6 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import org.onlab.util.CountDownCompleter;
@@ -79,7 +78,6 @@
     private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
     private final Set<String> preparedKeys = Sets.newHashSet();
     private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
-    private AtomicLong versionCounter = new AtomicLong(0);
 
     public AtomixConsistentMapState(Properties properties) {
         super(properties);
@@ -87,12 +85,10 @@
 
     @Override
     public void snapshot(SnapshotWriter writer) {
-        writer.writeLong(versionCounter.get());
     }
 
     @Override
     public void install(SnapshotReader reader) {
-        versionCounter = new AtomicLong(reader.readLong());
     }
 
     @Override
@@ -271,7 +267,7 @@
             }
 
             byte[] newValue = commit.operation().value();
-            long newVersion = versionCounter.incrementAndGet();
+            long newVersion = commit.index();
             Versioned<byte[]> newMapValue = newValue == null ? null
                     : new Versioned<>(newValue, newVersion);
 
@@ -370,7 +366,7 @@
     protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
         PrepareResult prepareResult = prepare(commit);
         if (prepareResult == PrepareResult.OK) {
-            commitInternal(commit.operation().transaction().transactionId());
+            commitInternal(commit.operation().transaction().transactionId(), commit.index());
         }
         return prepareResult;
     }
@@ -426,7 +422,7 @@
     protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
         TransactionId transactionId = commit.operation().transactionId();
         try {
-            return commitInternal(transactionId);
+            return commitInternal(transactionId, commit.index());
         } catch (Exception e) {
             log.warn("Failure applying {}", commit, e);
             throw Throwables.propagate(e);
@@ -435,7 +431,7 @@
         }
     }
 
-    private CommitResult commitInternal(TransactionId transactionId) {
+    private CommitResult commitInternal(TransactionId transactionId, long version) {
         Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
                 .remove(transactionId);
         if (prepareCommit == null) {
@@ -456,7 +452,7 @@
             MapEntryValue previousValue = mapEntries.remove(key);
             MapEntryValue newValue = null;
             if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
-                newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
+                newValue = new TransactionalCommit(key, version, completer);
             }
             eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
             if (newValue != null) {