[ONOS-6345] Track tombstones within transactions for optimistic locking on null values

Change-Id: Ib4764721e512462ec1552124ff696b8f89687d8f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
index 9acc377..19f6e0c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
@@ -22,6 +22,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -37,13 +38,19 @@
 
     @Override
     protected V read(K key) {
-        Versioned<V> value = readCache.computeIfAbsent(key, backingMap::get);
-        return value != null ? value.value() : null;
+        Versioned<V> value = backingMap.getOrDefault(key, null);
+        readCache.put(key, value);
+        return value.value();
     }
 
     @Override
-    protected Stream<MapUpdate<K, V>> records() {
-        return Stream.concat(deleteStream(), writeStream());
+    public boolean hasPendingUpdates() {
+        return !writeCache.isEmpty() || !deleteSet.isEmpty();
+    }
+
+    @Override
+    protected Stream<MapUpdate<K, V>> records(Version lockVersion) {
+        return Stream.concat(deleteStream(), writeStream(lockVersion));
     }
 
     /**
@@ -52,34 +59,26 @@
     private Stream<MapUpdate<K, V>> deleteStream() {
         return deleteSet.stream()
                 .map(key -> Pair.of(key, readCache.get(key)))
-                .filter(e -> e.getValue() != null)
                 .map(e -> MapUpdate.<K, V>newBuilder()
                         .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
                         .withKey(e.getKey())
-                        .withCurrentVersion(e.getValue().version())
+                        .withVersion(e.getValue().version())
                         .build());
     }
 
     /**
      * Returns a transaction record stream for updated keys.
      */
-    private Stream<MapUpdate<K, V>> writeStream() {
-        return writeCache.entrySet().stream().map(entry -> {
-            Versioned<V> original = readCache.get(entry.getKey());
-            if (original == null) {
-                return MapUpdate.<K, V>newBuilder()
-                        .withType(MapUpdate.Type.PUT_IF_ABSENT)
-                        .withKey(entry.getKey())
-                        .withValue(entry.getValue())
-                        .build();
-            } else {
-                return MapUpdate.<K, V>newBuilder()
-                        .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
-                        .withKey(entry.getKey())
-                        .withCurrentVersion(original.version())
-                        .withValue(entry.getValue())
-                        .build();
-            }
-        });
+    private Stream<MapUpdate<K, V>> writeStream(Version lockVersion) {
+        return writeCache.entrySet().stream()
+                .map(entry -> {
+                    Versioned<V> original = readCache.get(entry.getKey());
+                    return MapUpdate.<K, V>newBuilder()
+                            .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                            .withKey(entry.getKey())
+                            .withValue(entry.getValue())
+                            .withVersion(Math.max(original.version(), lockVersion.value()))
+                            .build();
+                });
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index 82b3919..8ad8cb8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -109,6 +109,7 @@
     protected final Transactional<T> transactionalObject;
     private final AtomicBoolean open = new AtomicBoolean();
     private volatile State state = State.ACTIVE;
+    private volatile Version lock;
 
     public Transaction(TransactionId transactionId, Transactional<T> transactionalObject) {
         this.transactionId = transactionId;
@@ -191,7 +192,10 @@
      */
     public CompletableFuture<Version> begin() {
         open();
-        return transactionalObject.begin(transactionId);
+        return transactionalObject.begin(transactionId).thenApply(lock -> {
+            this.lock = lock;
+            return lock;
+        });
     }
 
     /**
@@ -208,8 +212,10 @@
     public CompletableFuture<Boolean> prepare(List<T> updates) {
         checkOpen();
         checkActive();
+        Version lock = this.lock;
+        checkState(lock != null, TX_INACTIVE_ERROR);
         setState(State.PREPARING);
-        return transactionalObject.prepare(new TransactionLog<T>(transactionId, updates))
+        return transactionalObject.prepare(new TransactionLog<T>(transactionId, lock.value(), updates))
                 .thenApply(succeeded -> {
                     setState(State.PREPARED);
                     return succeeded;
@@ -228,8 +234,10 @@
     public CompletableFuture<Boolean> prepareAndCommit(List<T> updates) {
         checkOpen();
         checkActive();
+        Version lock = this.lock;
+        checkState(lock != null, TX_INACTIVE_ERROR);
         setState(State.PREPARING);
-        return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, updates))
+        return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, lock.value(), updates))
                 .thenApply(succeeded -> {
                     setState(State.COMMITTED);
                     return succeeded;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
index 4429a1b..977312a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.store.primitives.impl;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -60,7 +59,6 @@
     protected final Transaction<MapUpdate<K, V>> transaction;
     protected final Map<K, V> writeCache = Maps.newConcurrentMap();
     protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
-    protected final List<MapUpdate<K, V>> log = new ArrayList<>();
     protected volatile Version lock;
 
     protected TransactionalMapParticipant(
@@ -77,18 +75,22 @@
      * synchronized prior to a read.
      */
     private void beginTransaction() {
-        if (!transaction.isOpen()) {
-            try {
-                lock = transaction.begin()
-                        .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new TransactionException.Interrupted();
-            } catch (TimeoutException e) {
-                throw new TransactionException.Timeout();
-            } catch (ExecutionException e) {
-                Throwables.propagateIfPossible(e.getCause());
-                throw new TransactionException(e.getCause());
+        if (lock == null) {
+            synchronized (this) {
+                if (lock == null) {
+                    try {
+                        lock = transaction.begin()
+                                .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new TransactionException.Interrupted();
+                    } catch (TimeoutException e) {
+                        throw new TransactionException.Timeout();
+                    } catch (ExecutionException e) {
+                        Throwables.propagateIfPossible(e.getCause());
+                        throw new TransactionException(e.getCause());
+                    }
+                }
             }
         }
     }
@@ -183,13 +185,8 @@
     }
 
     @Override
-    public boolean hasPendingUpdates() {
-        return records().findAny().isPresent();
-    }
-
-    @Override
     public CompletableFuture<Boolean> prepare() {
-        return transaction.prepare(log());
+        return transaction.prepare(log(lock));
     }
 
     @Override
@@ -199,7 +196,7 @@
 
     @Override
     public CompletableFuture<Boolean> prepareAndCommit() {
-        return transaction.prepareAndCommit(log());
+        return transaction.prepareAndCommit(log(lock));
     }
 
     @Override
@@ -210,24 +207,25 @@
     /**
      * Returns a list of updates performed within this map partition.
      *
+     * @param lockVersion the global transaction lock version
      * @return a list of map updates
      */
-    protected List<MapUpdate<K, V>> log() {
-        return records().collect(Collectors.toList());
+    protected List<MapUpdate<K, V>> log(Version lockVersion) {
+        return records(lockVersion).collect(Collectors.toList());
     }
 
     /**
      * Returns a stream of updates performed within this map partition.
      *
+     * @param lockVersion the global transaction lock version
      * @return a stream of map updates
      */
-    protected abstract Stream<MapUpdate<K, V>> records();
+    protected abstract Stream<MapUpdate<K, V>> records(Version lockVersion);
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
                 .add("backingMap", backingMap)
-                .add("updates", log())
                 .toString();
     }
 }
\ No newline at end of file