[ONOS-6345] Track tombstones within transactions for optimistic locking on null values

Change-Id: Ib4764721e512462ec1552124ff696b8f89687d8f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
index 4429a1b..977312a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.store.primitives.impl;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -60,7 +59,6 @@
     protected final Transaction<MapUpdate<K, V>> transaction;
     protected final Map<K, V> writeCache = Maps.newConcurrentMap();
     protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
-    protected final List<MapUpdate<K, V>> log = new ArrayList<>();
     protected volatile Version lock;
 
     protected TransactionalMapParticipant(
@@ -77,18 +75,22 @@
      * synchronized prior to a read.
      */
     private void beginTransaction() {
-        if (!transaction.isOpen()) {
-            try {
-                lock = transaction.begin()
-                        .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new TransactionException.Interrupted();
-            } catch (TimeoutException e) {
-                throw new TransactionException.Timeout();
-            } catch (ExecutionException e) {
-                Throwables.propagateIfPossible(e.getCause());
-                throw new TransactionException(e.getCause());
+        if (lock == null) {
+            synchronized (this) {
+                if (lock == null) {
+                    try {
+                        lock = transaction.begin()
+                                .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new TransactionException.Interrupted();
+                    } catch (TimeoutException e) {
+                        throw new TransactionException.Timeout();
+                    } catch (ExecutionException e) {
+                        Throwables.propagateIfPossible(e.getCause());
+                        throw new TransactionException(e.getCause());
+                    }
+                }
             }
         }
     }
@@ -183,13 +185,8 @@
     }
 
     @Override
-    public boolean hasPendingUpdates() {
-        return records().findAny().isPresent();
-    }
-
-    @Override
     public CompletableFuture<Boolean> prepare() {
-        return transaction.prepare(log());
+        return transaction.prepare(log(lock));
     }
 
     @Override
@@ -199,7 +196,7 @@
 
     @Override
     public CompletableFuture<Boolean> prepareAndCommit() {
-        return transaction.prepareAndCommit(log());
+        return transaction.prepareAndCommit(log(lock));
     }
 
     @Override
@@ -210,24 +207,25 @@
     /**
      * Returns a list of updates performed within this map partition.
      *
+     * @param lockVersion the global transaction lock version
      * @return a list of map updates
      */
-    protected List<MapUpdate<K, V>> log() {
-        return records().collect(Collectors.toList());
+    protected List<MapUpdate<K, V>> log(Version lockVersion) {
+        return records(lockVersion).collect(Collectors.toList());
     }
 
     /**
      * Returns a stream of updates performed within this map partition.
      *
+     * @param lockVersion the global transaction lock version
      * @return a stream of map updates
      */
-    protected abstract Stream<MapUpdate<K, V>> records();
+    protected abstract Stream<MapUpdate<K, V>> records(Version lockVersion);
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
                 .add("backingMap", backingMap)
-                .add("updates", log())
                 .toString();
     }
 }
\ No newline at end of file