blob: e858b3f32d858535017c384ec8ad85141698369c [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.util.Arrays;
19import java.util.ConcurrentModificationException;
20import java.util.List;
21import java.util.concurrent.ArrayBlockingQueue;
22import java.util.concurrent.BlockingQueue;
23import java.util.concurrent.CompletionException;
24import java.util.stream.Collectors;
25
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070026import com.google.common.base.Throwables;
27import com.google.common.collect.Sets;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import io.atomix.protocols.raft.proxy.RaftProxy;
29import io.atomix.protocols.raft.service.RaftService;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030import org.junit.Test;
31import org.onlab.util.Tools;
Madan Jampani74da78b2016-02-09 21:18:36 -080032import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080033import org.onosproject.store.primitives.TransactionId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080034import org.onosproject.store.service.MapEvent;
35import org.onosproject.store.service.MapEventListener;
Jordan Halterman948d6592017-04-20 17:18:24 -070036import org.onosproject.store.service.TransactionLog;
Jordan Halterman5f97a302017-04-26 23:41:31 -070037import org.onosproject.store.service.Version;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080038import org.onosproject.store.service.Versioned;
39
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070040import static org.hamcrest.Matchers.is;
41import static org.junit.Assert.assertArrayEquals;
42import static org.junit.Assert.assertEquals;
43import static org.junit.Assert.assertFalse;
44import static org.junit.Assert.assertNotNull;
45import static org.junit.Assert.assertNull;
46import static org.junit.Assert.assertThat;
47import static org.junit.Assert.assertTrue;
48import static org.junit.Assert.fail;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080049
50/**
51 * Unit tests for {@link AtomixConsistentMap}.
52 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070053public class AtomixConsistentMapTest extends AtomixTestBase<AtomixConsistentMap> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054
55 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -070056 protected RaftService createService() {
57 return new AtomixConsistentMapService();
58 }
59
60 @Override
61 protected AtomixConsistentMap createPrimitive(RaftProxy proxy) {
62 return new AtomixConsistentMap(proxy);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080063 }
64
65 /**
66 * Tests various basic map operations.
67 */
68 @Test
69 public void testBasicMapOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070070 basicMapOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080071 }
72
73 /**
74 * Tests various map compute* operations on different cluster sizes.
75 */
76 @Test
77 public void testMapComputeOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070078 mapComputeOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080079 }
80
81 /**
82 * Tests map event notifications.
83 */
84 @Test
85 public void testMapListeners() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070086 mapListenerTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080087 }
88
89 /**
Jordan Halterman5f97a302017-04-26 23:41:31 -070090 * Tests map transaction prepare.
91 */
92 @Test
93 public void testTransactionPrepare() throws Throwable {
94 transactionPrepareTests();
95 }
96
97 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -080098 * Tests map transaction commit.
99 */
100 @Test
101 public void testTransactionCommit() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700102 transactionCommitTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800103 }
104
105 /**
106 * Tests map transaction rollback.
107 */
108 @Test
109 public void testTransactionRollback() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700110 transactionRollbackTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800111 }
112
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700113 protected void basicMapOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
115 final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
116
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700117 AtomixConsistentMap map = newPrimitive("testBasicMapOperationMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800118
119 map.isEmpty().thenAccept(result -> {
120 assertTrue(result);
121 }).join();
122
123 map.put("foo", rawFooValue).thenAccept(result -> {
124 assertNull(result);
125 }).join();
126
127 map.size().thenAccept(result -> {
128 assertTrue(result == 1);
129 }).join();
130
131 map.isEmpty().thenAccept(result -> {
132 assertFalse(result);
133 }).join();
134
135 map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
136 assertNotNull(result);
137 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
138 }).join();
139
140 map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
141 assertNull(result);
142 }).join();
143
144 map.size().thenAccept(result -> {
145 assertTrue(result == 2);
146 }).join();
147
148 map.keySet().thenAccept(result -> {
149 assertTrue(result.size() == 2);
150 assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
151 }).join();
152
153 map.values().thenAccept(result -> {
154 assertTrue(result.size() == 2);
155 List<String> rawValues =
156 result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
157 assertTrue(rawValues.contains("Hello foo!"));
158 assertTrue(rawValues.contains("Hello bar!"));
159 }).join();
160
161 map.entrySet().thenAccept(result -> {
162 assertTrue(result.size() == 2);
163 // TODO: check entries
164 }).join();
165
166 map.get("foo").thenAccept(result -> {
167 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
168 }).join();
169
170 map.remove("foo").thenAccept(result -> {
171 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
172 }).join();
173
174 map.containsKey("foo").thenAccept(result -> {
175 assertFalse(result);
176 }).join();
177
178 map.get("foo").thenAccept(result -> {
179 assertNull(result);
180 }).join();
181
182 map.get("bar").thenAccept(result -> {
183 assertNotNull(result);
184 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
185 }).join();
186
187 map.containsKey("bar").thenAccept(result -> {
188 assertTrue(result);
189 }).join();
190
191 map.size().thenAccept(result -> {
192 assertTrue(result == 1);
193 }).join();
194
195 map.containsValue(rawBarValue).thenAccept(result -> {
196 assertTrue(result);
197 }).join();
198
199 map.containsValue(rawFooValue).thenAccept(result -> {
200 assertFalse(result);
201 }).join();
202
203 map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
204 assertNotNull(result);
205 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
206 }).join();
207
208 map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
209 assertNull(result);
210 }).join();
211
212 // try replace_if_value_match for a non-existent key
213 map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
214 assertFalse(result);
215 }).join();
216
217 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
218 assertTrue(result);
219 }).join();
220
221 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
222 assertFalse(result);
223 }).join();
224
225 Versioned<byte[]> barValue = map.get("bar").join();
226 map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
227 assertTrue(result);
228 }).join();
229
230 map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
231 assertFalse(result);
232 }).join();
233
234 map.clear().join();
235
236 map.size().thenAccept(result -> {
237 assertTrue(result == 0);
238 }).join();
239 }
240
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700241 public void mapComputeOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800242 final byte[] value1 = Tools.getBytesUtf8("value1");
243 final byte[] value2 = Tools.getBytesUtf8("value2");
244 final byte[] value3 = Tools.getBytesUtf8("value3");
245
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700246 AtomixConsistentMap map = newPrimitive("testMapComputeOperationsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800247
248 map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
249 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
250 }).join();
251
252 map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
253 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
254 }).join();
255
256 map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
257 assertNull(result);
258 });
259
260 map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
261 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
262 }).join();
263
264 map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
265 assertNull(result);
266 }).join();
267
268 map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
269 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
270 }).join();
271
272 map.compute("foo", (k, v) -> value2).thenAccept(result -> {
273 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
274 }).join();
275 }
276
277
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700278 protected void mapListenerTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800279 final byte[] value1 = Tools.getBytesUtf8("value1");
280 final byte[] value2 = Tools.getBytesUtf8("value2");
281 final byte[] value3 = Tools.getBytesUtf8("value3");
282
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700283 AtomixConsistentMap map = newPrimitive("testMapListenerMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800284 TestMapEventListener listener = new TestMapEventListener();
285
286 // add listener; insert new value into map and verify an INSERT event is received.
Madan Jampani40f022e2016-03-02 21:35:14 -0800287 map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
288 MapEvent<String, byte[]> event = listener.event();
289 assertNotNull(event);
290 assertEquals(MapEvent.Type.INSERT, event.type());
291 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800292
293 // remove listener and verify listener is not notified.
Madan Jampani40f022e2016-03-02 21:35:14 -0800294 map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
295 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800296
297 // add the listener back and verify UPDATE events are received correctly
Madan Jampani40f022e2016-03-02 21:35:14 -0800298 map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
299 event = listener.event();
300 assertNotNull(event);
301 assertEquals(MapEvent.Type.UPDATE, event.type());
302 assertTrue(Arrays.equals(value3, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800303
304 // perform a non-state changing operation and verify no events are received.
305 map.putIfAbsent("foo", value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800306 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800307
308 // verify REMOVE events are received correctly.
309 map.remove("foo").join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800310 event = listener.event();
311 assertNotNull(event);
312 assertEquals(MapEvent.Type.REMOVE, event.type());
313 assertTrue(Arrays.equals(value3, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800314
315 // verify compute methods also generate events.
316 map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800317 event = listener.event();
318 assertNotNull(event);
319 assertEquals(MapEvent.Type.INSERT, event.type());
320 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800321
322 map.compute("foo", (k, v) -> value2).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800323 event = listener.event();
324 assertNotNull(event);
325 assertEquals(MapEvent.Type.UPDATE, event.type());
326 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800327
328 map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800329 event = listener.event();
330 assertNotNull(event);
331 assertEquals(MapEvent.Type.REMOVE, event.type());
332 assertTrue(Arrays.equals(value2, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800333
334 map.removeListener(listener).join();
335 }
336
Jordan Halterman5f97a302017-04-26 23:41:31 -0700337 protected void transactionPrepareTests() throws Throwable {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700338 AtomixConsistentMap map = newPrimitive("testPrepareTestsMap");
Jordan Halterman5f97a302017-04-26 23:41:31 -0700339
340 TransactionId transactionId1 = TransactionId.from("tx1");
341 TransactionId transactionId2 = TransactionId.from("tx2");
342 TransactionId transactionId3 = TransactionId.from("tx3");
343 TransactionId transactionId4 = TransactionId.from("tx4");
344
345 Version lock1 = map.begin(transactionId1).join();
346
347 MapUpdate<String, byte[]> update1 =
348 MapUpdate.<String, byte[]>newBuilder()
349 .withType(MapUpdate.Type.LOCK)
350 .withKey("foo")
351 .withVersion(lock1.value())
352 .build();
353 MapUpdate<String, byte[]> update2 =
354 MapUpdate.<String, byte[]>newBuilder()
355 .withType(MapUpdate.Type.LOCK)
356 .withKey("bar")
357 .withVersion(lock1.value())
358 .build();
359
360 map.prepare(new TransactionLog<>(transactionId1, lock1.value(), Arrays.asList(update1, update2)))
361 .thenAccept(result -> {
362 assertTrue(result);
363 }).join();
364
365 Version lock2 = map.begin(transactionId2).join();
366
367 MapUpdate<String, byte[]> update3 =
368 MapUpdate.<String, byte[]>newBuilder()
369 .withType(MapUpdate.Type.LOCK)
370 .withKey("foo")
371 .withVersion(lock2.value())
372 .build();
373
374 map.prepare(new TransactionLog<>(transactionId2, lock2.value(), Arrays.asList(update3)))
375 .thenAccept(result -> {
376 assertFalse(result);
377 }).join();
378 map.rollback(transactionId2).join();
379
380 Version lock3 = map.begin(transactionId3).join();
381
382 MapUpdate<String, byte[]> update4 =
383 MapUpdate.<String, byte[]>newBuilder()
384 .withType(MapUpdate.Type.LOCK)
385 .withKey("baz")
386 .withVersion(0)
387 .build();
388
389 map.prepare(new TransactionLog<>(transactionId3, lock3.value(), Arrays.asList(update4)))
390 .thenAccept(result -> {
391 assertFalse(result);
392 }).join();
393 map.rollback(transactionId3).join();
394
395 Version lock4 = map.begin(transactionId4).join();
396
397 MapUpdate<String, byte[]> update5 =
398 MapUpdate.<String, byte[]>newBuilder()
399 .withType(MapUpdate.Type.LOCK)
400 .withKey("baz")
401 .withVersion(lock4.value())
402 .build();
403
404 map.prepare(new TransactionLog<>(transactionId4, lock4.value(), Arrays.asList(update5)))
405 .thenAccept(result -> {
406 assertTrue(result);
407 }).join();
408 }
409
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700410 protected void transactionCommitTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800411 final byte[] value1 = Tools.getBytesUtf8("value1");
412 final byte[] value2 = Tools.getBytesUtf8("value2");
413
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700414 AtomixConsistentMap map = newPrimitive("testCommitTestsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800415 TestMapEventListener listener = new TestMapEventListener();
416
417 map.addListener(listener).join();
418
Jordan Halterman5f97a302017-04-26 23:41:31 -0700419 TransactionId transactionId = TransactionId.from("tx1");
420
421 // Begin the transaction.
422 Version lock = map.begin(transactionId).join();
423
424 // PUT_IF_VERSION_MATCH
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800425 MapUpdate<String, byte[]> update1 =
Jordan Halterman5f97a302017-04-26 23:41:31 -0700426 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
427 .withKey("foo")
428 .withValue(value1)
429 .withVersion(lock.value())
430 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800431
Jordan Halterman5f97a302017-04-26 23:41:31 -0700432 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800433 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800434 }).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700435 // verify changes in Tx is not visible yet until commit
Madan Jampani40f022e2016-03-02 21:35:14 -0800436 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800437
438 map.size().thenAccept(result -> {
439 assertTrue(result == 0);
440 }).join();
441
442 map.get("foo").thenAccept(result -> {
443 assertNull(result);
444 }).join();
445
446 try {
447 map.put("foo", value2).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700448 fail("update to map entry in open tx should fail with Exception");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800449 } catch (CompletionException e) {
450 assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
451 }
452
Madan Jampani40f022e2016-03-02 21:35:14 -0800453 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800454
Jordan Halterman5f97a302017-04-26 23:41:31 -0700455 map.commit(transactionId).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800456 MapEvent<String, byte[]> event = listener.event();
457 assertNotNull(event);
458 assertEquals(MapEvent.Type.INSERT, event.type());
459 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800460
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700461 // map should be update-able after commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800462 map.put("foo", value2).thenAccept(result -> {
463 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
464 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800465 event = listener.event();
466 assertNotNull(event);
467 assertEquals(MapEvent.Type.UPDATE, event.type());
468 assertTrue(Arrays.equals(value2, event.newValue().value()));
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700469
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700470 // REMOVE_IF_VERSION_MATCH
471 byte[] currFoo = map.get("foo").get().value();
472 long currFooVersion = map.get("foo").get().version();
473 MapUpdate<String, byte[]> remove1 =
474 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
Jordan Halterman5f97a302017-04-26 23:41:31 -0700475 .withKey("foo")
476 .withVersion(currFooVersion)
477 .build();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700478
Jordan Halterman5f97a302017-04-26 23:41:31 -0700479 transactionId = TransactionId.from("tx2");
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700480
Jordan Halterman5f97a302017-04-26 23:41:31 -0700481 // Begin the transaction.
482 map.begin(transactionId).join();
483
484 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(remove1))).thenAccept(result -> {
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700485 assertTrue("prepare should succeed", result);
486 }).join();
487 // verify changes in Tx is not visible yet until commit
488 assertFalse(listener.eventReceived());
489
490 map.size().thenAccept(size -> {
491 assertThat(size, is(1));
492 }).join();
493
494 map.get("foo").thenAccept(result -> {
495 assertThat(result.value(), is(currFoo));
496 }).join();
497
Jordan Halterman5f97a302017-04-26 23:41:31 -0700498 map.commit(transactionId).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700499 event = listener.event();
500 assertNotNull(event);
501 assertEquals(MapEvent.Type.REMOVE, event.type());
502 assertArrayEquals(currFoo, event.oldValue().value());
503
504 map.size().thenAccept(size -> {
505 assertThat(size, is(0));
506 }).join();
507
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800508 }
509
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700510 protected void transactionRollbackTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800511 final byte[] value1 = Tools.getBytesUtf8("value1");
512 final byte[] value2 = Tools.getBytesUtf8("value2");
513
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700514 AtomixConsistentMap map = newPrimitive("testTransactionRollbackTestsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800515 TestMapEventListener listener = new TestMapEventListener();
516
517 map.addListener(listener).join();
518
Jordan Halterman5f97a302017-04-26 23:41:31 -0700519 TransactionId transactionId = TransactionId.from("tx1");
520
521 Version lock = map.begin(transactionId).join();
522
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800523 MapUpdate<String, byte[]> update1 =
Jordan Halterman5f97a302017-04-26 23:41:31 -0700524 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
525 .withKey("foo")
526 .withValue(value1)
527 .withVersion(lock.value())
528 .build();
529
530 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800531 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800532 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800533 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800534
Jordan Halterman5f97a302017-04-26 23:41:31 -0700535 map.rollback(transactionId).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800536 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800537
538 map.get("foo").thenAccept(result -> {
539 assertNull(result);
540 }).join();
541
542 map.put("foo", value2).thenAccept(result -> {
543 assertNull(result);
544 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800545 MapEvent<String, byte[]> event = listener.event();
546 assertNotNull(event);
547 assertEquals(MapEvent.Type.INSERT, event.type());
548 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800549 }
550
551 private static class TestMapEventListener implements MapEventListener<String, byte[]> {
552
Madan Jampani40f022e2016-03-02 21:35:14 -0800553 private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800554
555 @Override
556 public void event(MapEvent<String, byte[]> event) {
Madan Jampani40f022e2016-03-02 21:35:14 -0800557 try {
558 queue.put(event);
559 } catch (InterruptedException e) {
560 Throwables.propagate(e);
561 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800562 }
563
Madan Jampani40f022e2016-03-02 21:35:14 -0800564 public boolean eventReceived() {
565 return !queue.isEmpty();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800566 }
567
Madan Jampani40f022e2016-03-02 21:35:14 -0800568 public MapEvent<String, byte[]> event() throws InterruptedException {
569 return queue.take();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800570 }
571 }
572}