[ONOS-6345] Track tombstones within transactions for optimistic locking on null values
Change-Id: Ib4764721e512462ec1552124ff696b8f89687d8f
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index ac4c60d..284b57b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,7 @@
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import java.util.Arrays;
@@ -41,7 +42,6 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -92,6 +92,14 @@
}
/**
+ * Tests map transaction prepare.
+ */
+ @Test
+ public void testTransactionPrepare() throws Throwable {
+ transactionPrepareTests();
+ }
+
+ /**
* Tests map transaction commit.
*/
@Test
@@ -112,22 +120,12 @@
final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
AtomixConsistentMap map = createAtomixClient().getResource("testBasicMapOperationMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
map.isEmpty().thenAccept(result -> {
assertTrue(result);
}).join();
- map.getOrDefault("nothing", null).thenAccept(result -> {
- assertEquals(0, result.version());
- assertNull(result.value());
- }).join();
-
- map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
- assertEquals(0, result.version());
- assertArrayEquals("bar".getBytes(), result.value());
- }).join();
-
map.put("foo", rawFooValue).thenAccept(result -> {
assertNull(result);
}).join();
@@ -175,11 +173,6 @@
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
}).join();
- map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
- assertNotEquals(0, result.version());
- assertArrayEquals(rawFooValue, result.value());
- }).join();
-
map.remove("foo").thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
}).join();
@@ -257,7 +250,7 @@
final byte[] value3 = Tools.getBytesUtf8("value3");
AtomixConsistentMap map = createAtomixClient().getResource("testMapComputeOperationsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
@@ -295,7 +288,7 @@
final byte[] value3 = Tools.getBytesUtf8("value3");
AtomixConsistentMap map = createAtomixClient().getResource("testMapListenerMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
// add listener; insert new value into map and verify an INSERT event is received.
@@ -349,27 +342,104 @@
map.removeListener(listener).join();
}
+ protected void transactionPrepareTests() throws Throwable {
+ AtomixConsistentMap map = createAtomixClient().getResource("testPrepareTestsMap",
+ AtomixConsistentMap.class).join();
+
+ TransactionId transactionId1 = TransactionId.from("tx1");
+ TransactionId transactionId2 = TransactionId.from("tx2");
+ TransactionId transactionId3 = TransactionId.from("tx3");
+ TransactionId transactionId4 = TransactionId.from("tx4");
+
+ Version lock1 = map.begin(transactionId1).join();
+
+ MapUpdate<String, byte[]> update1 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("foo")
+ .withVersion(lock1.value())
+ .build();
+ MapUpdate<String, byte[]> update2 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("bar")
+ .withVersion(lock1.value())
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId1, lock1.value(), Arrays.asList(update1, update2)))
+ .thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ Version lock2 = map.begin(transactionId2).join();
+
+ MapUpdate<String, byte[]> update3 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("foo")
+ .withVersion(lock2.value())
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId2, lock2.value(), Arrays.asList(update3)))
+ .thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+ map.rollback(transactionId2).join();
+
+ Version lock3 = map.begin(transactionId3).join();
+
+ MapUpdate<String, byte[]> update4 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("baz")
+ .withVersion(0)
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId3, lock3.value(), Arrays.asList(update4)))
+ .thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+ map.rollback(transactionId3).join();
+
+ Version lock4 = map.begin(transactionId4).join();
+
+ MapUpdate<String, byte[]> update5 =
+ MapUpdate.<String, byte[]>newBuilder()
+ .withType(MapUpdate.Type.LOCK)
+ .withKey("baz")
+ .withVersion(lock4.value())
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId4, lock4.value(), Arrays.asList(update5)))
+ .thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+ }
+
protected void transactionCommitTests() throws Throwable {
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
AtomixConsistentMap map = createAtomixClient().getResource("testCommitTestsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
- // PUT_IF_ABSENT
+ TransactionId transactionId = TransactionId.from("tx1");
+
+ // Begin the transaction.
+ Version lock = map.begin(transactionId).join();
+
+ // PUT_IF_VERSION_MATCH
MapUpdate<String, byte[]> update1 =
- MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey("foo")
- .withValue(value1)
- .build();
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey("foo")
+ .withValue(value1)
+ .withVersion(lock.value())
+ .build();
- TransactionLog<MapUpdate<String, byte[]>> tx =
- new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
-
- map.prepare(tx).thenAccept(result -> {
+ map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
assertEquals(true, result);
}).join();
// verify changes in Tx is not visible yet until commit
@@ -392,7 +462,7 @@
assertFalse(listener.eventReceived());
- map.commit(tx.transactionId()).join();
+ map.commit(transactionId).join();
MapEvent<String, byte[]> event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.INSERT, event.type());
@@ -407,19 +477,21 @@
assertEquals(MapEvent.Type.UPDATE, event.type());
assertTrue(Arrays.equals(value2, event.newValue().value()));
-
// REMOVE_IF_VERSION_MATCH
byte[] currFoo = map.get("foo").get().value();
long currFooVersion = map.get("foo").get().version();
MapUpdate<String, byte[]> remove1 =
MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
- .withKey("foo")
- .withCurrentVersion(currFooVersion)
- .build();
+ .withKey("foo")
+ .withVersion(currFooVersion)
+ .build();
- tx = new TransactionLog<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+ transactionId = TransactionId.from("tx2");
- map.prepare(tx).thenAccept(result -> {
+ // Begin the transaction.
+ map.begin(transactionId).join();
+
+ map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(remove1))).thenAccept(result -> {
assertTrue("prepare should succeed", result);
}).join();
// verify changes in Tx is not visible yet until commit
@@ -433,7 +505,7 @@
assertThat(result.value(), is(currFoo));
}).join();
- map.commit(tx.transactionId()).join();
+ map.commit(transactionId).join();
event = listener.event();
assertNotNull(event);
assertEquals(MapEvent.Type.REMOVE, event.type());
@@ -450,24 +522,28 @@
final byte[] value2 = Tools.getBytesUtf8("value2");
AtomixConsistentMap map = createAtomixClient().getResource("testTransactionRollbackTestsMap",
- AtomixConsistentMap.class).join();
+ AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
+ TransactionId transactionId = TransactionId.from("tx1");
+
+ Version lock = map.begin(transactionId).join();
+
MapUpdate<String, byte[]> update1 =
- MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey("foo")
- .withValue(value1)
- .build();
- TransactionLog<MapUpdate<String, byte[]>> tx =
- new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
- map.prepare(tx).thenAccept(result -> {
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey("foo")
+ .withValue(value1)
+ .withVersion(lock.value())
+ .build();
+
+ map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
assertEquals(true, result);
}).join();
assertFalse(listener.eventReceived());
- map.rollback(tx.transactionId()).join();
+ map.rollback(transactionId).join();
assertFalse(listener.eventReceived());
map.get("foo").thenAccept(result -> {