[ONOS-6345] Track tombstones within transactions for optimistic locking on null values

Change-Id: Ib4764721e512462ec1552124ff696b8f89687d8f
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index ac4c60d..284b57b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,7 @@
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Arrays;
@@ -41,7 +42,6 @@
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -92,6 +92,14 @@
     }
 
     /**
+     * Tests map transaction prepare.
+     */
+    @Test
+    public void testTransactionPrepare() throws Throwable {
+        transactionPrepareTests();
+    }
+
+    /**
      * Tests map transaction commit.
      */
     @Test
@@ -112,22 +120,12 @@
         final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testBasicMapOperationMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
 
         map.isEmpty().thenAccept(result -> {
             assertTrue(result);
         }).join();
 
-        map.getOrDefault("nothing", null).thenAccept(result -> {
-            assertEquals(0, result.version());
-            assertNull(result.value());
-        }).join();
-
-        map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
-            assertEquals(0, result.version());
-            assertArrayEquals("bar".getBytes(), result.value());
-        }).join();
-
         map.put("foo", rawFooValue).thenAccept(result -> {
             assertNull(result);
         }).join();
@@ -175,11 +173,6 @@
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
         }).join();
 
-        map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
-            assertNotEquals(0, result.version());
-            assertArrayEquals(rawFooValue, result.value());
-        }).join();
-
         map.remove("foo").thenAccept(result -> {
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
         }).join();
@@ -257,7 +250,7 @@
         final byte[] value3 = Tools.getBytesUtf8("value3");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testMapComputeOperationsMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
 
         map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
@@ -295,7 +288,7 @@
         final byte[] value3 = Tools.getBytesUtf8("value3");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testMapListenerMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
         TestMapEventListener listener = new TestMapEventListener();
 
         // add listener; insert new value into map and verify an INSERT event is received.
@@ -349,27 +342,104 @@
         map.removeListener(listener).join();
     }
 
+    protected void transactionPrepareTests() throws Throwable {
+        AtomixConsistentMap map = createAtomixClient().getResource("testPrepareTestsMap",
+                AtomixConsistentMap.class).join();
+
+        TransactionId transactionId1 = TransactionId.from("tx1");
+        TransactionId transactionId2 = TransactionId.from("tx2");
+        TransactionId transactionId3 = TransactionId.from("tx3");
+        TransactionId transactionId4 = TransactionId.from("tx4");
+
+        Version lock1 = map.begin(transactionId1).join();
+
+        MapUpdate<String, byte[]> update1 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("foo")
+                        .withVersion(lock1.value())
+                        .build();
+        MapUpdate<String, byte[]> update2 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("bar")
+                        .withVersion(lock1.value())
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId1, lock1.value(), Arrays.asList(update1, update2)))
+                .thenAccept(result -> {
+                    assertTrue(result);
+                }).join();
+
+        Version lock2 = map.begin(transactionId2).join();
+
+        MapUpdate<String, byte[]> update3 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("foo")
+                        .withVersion(lock2.value())
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId2, lock2.value(), Arrays.asList(update3)))
+                .thenAccept(result -> {
+                    assertFalse(result);
+                }).join();
+        map.rollback(transactionId2).join();
+
+        Version lock3 = map.begin(transactionId3).join();
+
+        MapUpdate<String, byte[]> update4 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("baz")
+                        .withVersion(0)
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId3, lock3.value(), Arrays.asList(update4)))
+                .thenAccept(result -> {
+                    assertFalse(result);
+                }).join();
+        map.rollback(transactionId3).join();
+
+        Version lock4 = map.begin(transactionId4).join();
+
+        MapUpdate<String, byte[]> update5 =
+                MapUpdate.<String, byte[]>newBuilder()
+                        .withType(MapUpdate.Type.LOCK)
+                        .withKey("baz")
+                        .withVersion(lock4.value())
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId4, lock4.value(), Arrays.asList(update5)))
+                .thenAccept(result -> {
+                    assertTrue(result);
+                }).join();
+    }
+
     protected void transactionCommitTests() throws Throwable {
         final byte[] value1 = Tools.getBytesUtf8("value1");
         final byte[] value2 = Tools.getBytesUtf8("value2");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testCommitTestsMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
         TestMapEventListener listener = new TestMapEventListener();
 
         map.addListener(listener).join();
 
-        // PUT_IF_ABSENT
+        TransactionId transactionId = TransactionId.from("tx1");
+
+        // Begin the transaction.
+        Version lock = map.begin(transactionId).join();
+
+        // PUT_IF_VERSION_MATCH
         MapUpdate<String, byte[]> update1 =
-                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
-                .withKey("foo")
-                .withValue(value1)
-                .build();
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey("foo")
+                        .withValue(value1)
+                        .withVersion(lock.value())
+                        .build();
 
-        TransactionLog<MapUpdate<String, byte[]>> tx =
-                new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
-
-        map.prepare(tx).thenAccept(result -> {
+        map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
             assertEquals(true, result);
         }).join();
         // verify changes in Tx is not visible yet until commit
@@ -392,7 +462,7 @@
 
         assertFalse(listener.eventReceived());
 
-        map.commit(tx.transactionId()).join();
+        map.commit(transactionId).join();
         MapEvent<String, byte[]> event = listener.event();
         assertNotNull(event);
         assertEquals(MapEvent.Type.INSERT, event.type());
@@ -407,19 +477,21 @@
         assertEquals(MapEvent.Type.UPDATE, event.type());
         assertTrue(Arrays.equals(value2, event.newValue().value()));
 
-
         // REMOVE_IF_VERSION_MATCH
         byte[] currFoo = map.get("foo").get().value();
         long currFooVersion = map.get("foo").get().version();
         MapUpdate<String, byte[]> remove1 =
                 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
-                .withKey("foo")
-                .withCurrentVersion(currFooVersion)
-                .build();
+                        .withKey("foo")
+                        .withVersion(currFooVersion)
+                        .build();
 
-        tx = new TransactionLog<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+        transactionId = TransactionId.from("tx2");
 
-        map.prepare(tx).thenAccept(result -> {
+        // Begin the transaction.
+        map.begin(transactionId).join();
+
+        map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(remove1))).thenAccept(result -> {
             assertTrue("prepare should succeed", result);
         }).join();
         // verify changes in Tx is not visible yet until commit
@@ -433,7 +505,7 @@
             assertThat(result.value(), is(currFoo));
         }).join();
 
-        map.commit(tx.transactionId()).join();
+        map.commit(transactionId).join();
         event = listener.event();
         assertNotNull(event);
         assertEquals(MapEvent.Type.REMOVE, event.type());
@@ -450,24 +522,28 @@
         final byte[] value2 = Tools.getBytesUtf8("value2");
 
         AtomixConsistentMap map = createAtomixClient().getResource("testTransactionRollbackTestsMap",
-                                                                   AtomixConsistentMap.class).join();
+                AtomixConsistentMap.class).join();
         TestMapEventListener listener = new TestMapEventListener();
 
         map.addListener(listener).join();
 
+        TransactionId transactionId = TransactionId.from("tx1");
+
+        Version lock = map.begin(transactionId).join();
+
         MapUpdate<String, byte[]> update1 =
-                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
-                .withKey("foo")
-                .withValue(value1)
-                .build();
-        TransactionLog<MapUpdate<String, byte[]>> tx =
-                new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
-        map.prepare(tx).thenAccept(result -> {
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+                        .withKey("foo")
+                        .withValue(value1)
+                        .withVersion(lock.value())
+                        .build();
+
+        map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
             assertEquals(true, result);
         }).join();
         assertFalse(listener.eventReceived());
 
-        map.rollback(tx.transactionId()).join();
+        map.rollback(transactionId).join();
         assertFalse(listener.eventReceived());
 
         map.get("foo").thenAccept(result -> {