blob: 3e7a8726c52e9942ecb81dc112749d221eae38c0 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070018import com.google.common.collect.Sets;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070019import io.atomix.protocols.raft.proxy.RaftProxy;
20import io.atomix.protocols.raft.service.RaftService;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080021import org.junit.Test;
Jordan Haltermanca7660a2018-04-04 23:43:23 -070022import org.onlab.util.HexString;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import org.onlab.util.Tools;
Madan Jampani74da78b2016-02-09 21:18:36 -080024import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080025import org.onosproject.store.primitives.TransactionId;
Jordan Haltermanca7660a2018-04-04 23:43:23 -070026import org.onosproject.store.primitives.impl.CompatibleValue;
27import org.onosproject.store.primitives.impl.DistributedPrimitives;
28import org.onosproject.store.serializers.KryoNamespaces;
29import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030import org.onosproject.store.service.MapEvent;
31import org.onosproject.store.service.MapEventListener;
Jordan Haltermanca7660a2018-04-04 23:43:23 -070032import org.onosproject.store.service.Serializer;
Jordan Halterman948d6592017-04-20 17:18:24 -070033import org.onosproject.store.service.TransactionLog;
Jordan Halterman5f97a302017-04-26 23:41:31 -070034import org.onosproject.store.service.Version;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import org.onosproject.store.service.Versioned;
36
Ray Milkey6a51cb92018-03-06 09:03:03 -080037import java.util.Arrays;
38import java.util.ConcurrentModificationException;
39import java.util.List;
40import java.util.concurrent.ArrayBlockingQueue;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.CompletionException;
43import java.util.stream.Collectors;
44
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070045import static org.hamcrest.Matchers.is;
46import static org.junit.Assert.assertArrayEquals;
47import static org.junit.Assert.assertEquals;
48import static org.junit.Assert.assertFalse;
49import static org.junit.Assert.assertNotNull;
50import static org.junit.Assert.assertNull;
51import static org.junit.Assert.assertThat;
52import static org.junit.Assert.assertTrue;
53import static org.junit.Assert.fail;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054
55/**
56 * Unit tests for {@link AtomixConsistentMap}.
57 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070058public class AtomixConsistentMapTest extends AtomixTestBase<AtomixConsistentMap> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059
60 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061 protected RaftService createService() {
62 return new AtomixConsistentMapService();
63 }
64
65 @Override
66 protected AtomixConsistentMap createPrimitive(RaftProxy proxy) {
67 return new AtomixConsistentMap(proxy);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080068 }
69
70 /**
71 * Tests various basic map operations.
72 */
73 @Test
74 public void testBasicMapOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070075 basicMapOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076 }
77
78 /**
79 * Tests various map compute* operations on different cluster sizes.
80 */
81 @Test
82 public void testMapComputeOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070083 mapComputeOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 }
85
86 /**
Jordan Halterman4922a062017-07-31 15:55:36 -070087 * Tests null values.
88 */
89 @Test
90 public void testNullValues() throws Throwable {
91 final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
92 final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
93
94 AtomixConsistentMap map = newPrimitive("testNullValues");
95
96 map.get("foo")
97 .thenAccept(v -> assertNull(v)).join();
98 map.put("foo", null)
99 .thenAccept(v -> assertNull(v)).join();
100 map.put("foo", rawFooValue).thenAccept(v -> {
101 assertNotNull(v);
102 assertNull(v.value());
103 }).join();
104 map.get("foo").thenAccept(v -> {
105 assertNotNull(v);
106 assertTrue(Arrays.equals(v.value(), rawFooValue));
107 }).join();
108 map.replace("foo", rawFooValue, null)
109 .thenAccept(replaced -> assertTrue(replaced)).join();
110 map.get("foo").thenAccept(v -> {
111 assertNotNull(v);
112 assertNull(v.value());
113 }).join();
114 map.replace("foo", rawFooValue, rawBarValue)
115 .thenAccept(replaced -> assertFalse(replaced)).join();
116 map.replace("foo", null, rawBarValue)
117 .thenAccept(replaced -> assertTrue(replaced)).join();
118 map.get("foo").thenAccept(v -> {
119 assertNotNull(v);
120 assertTrue(Arrays.equals(v.value(), rawBarValue));
121 }).join();
122 }
123
124 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800125 * Tests map event notifications.
126 */
127 @Test
128 public void testMapListeners() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700129 mapListenerTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800130 }
131
132 /**
Jordan Halterman5f97a302017-04-26 23:41:31 -0700133 * Tests map transaction prepare.
134 */
135 @Test
136 public void testTransactionPrepare() throws Throwable {
137 transactionPrepareTests();
138 }
139
140 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800141 * Tests map transaction commit.
142 */
143 @Test
144 public void testTransactionCommit() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700145 transactionCommitTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800146 }
147
148 /**
149 * Tests map transaction rollback.
150 */
151 @Test
152 public void testTransactionRollback() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700153 transactionRollbackTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800154 }
155
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700156 protected void basicMapOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800157 final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
158 final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
159
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700160 AtomixConsistentMap map = newPrimitive("testBasicMapOperationMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800161
162 map.isEmpty().thenAccept(result -> {
163 assertTrue(result);
164 }).join();
165
166 map.put("foo", rawFooValue).thenAccept(result -> {
167 assertNull(result);
168 }).join();
169
170 map.size().thenAccept(result -> {
171 assertTrue(result == 1);
172 }).join();
173
174 map.isEmpty().thenAccept(result -> {
175 assertFalse(result);
176 }).join();
177
178 map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
179 assertNotNull(result);
180 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
181 }).join();
182
183 map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
184 assertNull(result);
185 }).join();
186
187 map.size().thenAccept(result -> {
188 assertTrue(result == 2);
189 }).join();
190
191 map.keySet().thenAccept(result -> {
192 assertTrue(result.size() == 2);
193 assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
194 }).join();
195
196 map.values().thenAccept(result -> {
197 assertTrue(result.size() == 2);
198 List<String> rawValues =
199 result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
200 assertTrue(rawValues.contains("Hello foo!"));
201 assertTrue(rawValues.contains("Hello bar!"));
202 }).join();
203
204 map.entrySet().thenAccept(result -> {
205 assertTrue(result.size() == 2);
206 // TODO: check entries
207 }).join();
208
209 map.get("foo").thenAccept(result -> {
210 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
211 }).join();
212
213 map.remove("foo").thenAccept(result -> {
214 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
215 }).join();
216
217 map.containsKey("foo").thenAccept(result -> {
218 assertFalse(result);
219 }).join();
220
221 map.get("foo").thenAccept(result -> {
222 assertNull(result);
223 }).join();
224
225 map.get("bar").thenAccept(result -> {
226 assertNotNull(result);
227 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
228 }).join();
229
230 map.containsKey("bar").thenAccept(result -> {
231 assertTrue(result);
232 }).join();
233
234 map.size().thenAccept(result -> {
235 assertTrue(result == 1);
236 }).join();
237
238 map.containsValue(rawBarValue).thenAccept(result -> {
239 assertTrue(result);
240 }).join();
241
242 map.containsValue(rawFooValue).thenAccept(result -> {
243 assertFalse(result);
244 }).join();
245
246 map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
247 assertNotNull(result);
248 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
249 }).join();
250
251 map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
252 assertNull(result);
253 }).join();
254
255 // try replace_if_value_match for a non-existent key
256 map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
257 assertFalse(result);
258 }).join();
259
260 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
261 assertTrue(result);
262 }).join();
263
264 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
265 assertFalse(result);
266 }).join();
267
268 Versioned<byte[]> barValue = map.get("bar").join();
269 map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
270 assertTrue(result);
271 }).join();
272
273 map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
274 assertFalse(result);
275 }).join();
276
277 map.clear().join();
278
279 map.size().thenAccept(result -> {
280 assertTrue(result == 0);
281 }).join();
282 }
283
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700284 public void mapComputeOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800285 final byte[] value1 = Tools.getBytesUtf8("value1");
286 final byte[] value2 = Tools.getBytesUtf8("value2");
287 final byte[] value3 = Tools.getBytesUtf8("value3");
288
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700289 AtomixConsistentMap map = newPrimitive("testMapComputeOperationsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800290
291 map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
292 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
293 }).join();
294
295 map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
296 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
297 }).join();
298
299 map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
300 assertNull(result);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700301 }).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800302
303 map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
304 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
305 }).join();
306
307 map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
308 assertNull(result);
309 }).join();
310
311 map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
312 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
313 }).join();
314
315 map.compute("foo", (k, v) -> value2).thenAccept(result -> {
316 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
317 }).join();
318 }
319
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700320 protected void mapListenerTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800321 final byte[] value1 = Tools.getBytesUtf8("value1");
322 final byte[] value2 = Tools.getBytesUtf8("value2");
323 final byte[] value3 = Tools.getBytesUtf8("value3");
324
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700325 AtomixConsistentMap map = newPrimitive("testMapListenerMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800326 TestMapEventListener listener = new TestMapEventListener();
327
328 // add listener; insert new value into map and verify an INSERT event is received.
Madan Jampani40f022e2016-03-02 21:35:14 -0800329 map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
330 MapEvent<String, byte[]> event = listener.event();
331 assertNotNull(event);
332 assertEquals(MapEvent.Type.INSERT, event.type());
333 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800334
335 // remove listener and verify listener is not notified.
Madan Jampani40f022e2016-03-02 21:35:14 -0800336 map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
337 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800338
339 // add the listener back and verify UPDATE events are received correctly
Madan Jampani40f022e2016-03-02 21:35:14 -0800340 map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
341 event = listener.event();
342 assertNotNull(event);
343 assertEquals(MapEvent.Type.UPDATE, event.type());
344 assertTrue(Arrays.equals(value3, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800345
346 // perform a non-state changing operation and verify no events are received.
347 map.putIfAbsent("foo", value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800348 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800349
350 // verify REMOVE events are received correctly.
351 map.remove("foo").join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800352 event = listener.event();
353 assertNotNull(event);
354 assertEquals(MapEvent.Type.REMOVE, event.type());
355 assertTrue(Arrays.equals(value3, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800356
357 // verify compute methods also generate events.
358 map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800359 event = listener.event();
360 assertNotNull(event);
361 assertEquals(MapEvent.Type.INSERT, event.type());
362 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800363
364 map.compute("foo", (k, v) -> value2).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800365 event = listener.event();
366 assertNotNull(event);
367 assertEquals(MapEvent.Type.UPDATE, event.type());
368 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800369
370 map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800371 event = listener.event();
372 assertNotNull(event);
373 assertEquals(MapEvent.Type.REMOVE, event.type());
374 assertTrue(Arrays.equals(value2, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800375
376 map.removeListener(listener).join();
377 }
378
Jordan Halterman5f97a302017-04-26 23:41:31 -0700379 protected void transactionPrepareTests() throws Throwable {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700380 AtomixConsistentMap map = newPrimitive("testPrepareTestsMap");
Jordan Halterman5f97a302017-04-26 23:41:31 -0700381
382 TransactionId transactionId1 = TransactionId.from("tx1");
383 TransactionId transactionId2 = TransactionId.from("tx2");
384 TransactionId transactionId3 = TransactionId.from("tx3");
385 TransactionId transactionId4 = TransactionId.from("tx4");
386
387 Version lock1 = map.begin(transactionId1).join();
388
389 MapUpdate<String, byte[]> update1 =
390 MapUpdate.<String, byte[]>newBuilder()
391 .withType(MapUpdate.Type.LOCK)
392 .withKey("foo")
393 .withVersion(lock1.value())
394 .build();
395 MapUpdate<String, byte[]> update2 =
396 MapUpdate.<String, byte[]>newBuilder()
397 .withType(MapUpdate.Type.LOCK)
398 .withKey("bar")
399 .withVersion(lock1.value())
400 .build();
401
402 map.prepare(new TransactionLog<>(transactionId1, lock1.value(), Arrays.asList(update1, update2)))
403 .thenAccept(result -> {
404 assertTrue(result);
405 }).join();
406
407 Version lock2 = map.begin(transactionId2).join();
408
409 MapUpdate<String, byte[]> update3 =
410 MapUpdate.<String, byte[]>newBuilder()
411 .withType(MapUpdate.Type.LOCK)
412 .withKey("foo")
413 .withVersion(lock2.value())
414 .build();
415
416 map.prepare(new TransactionLog<>(transactionId2, lock2.value(), Arrays.asList(update3)))
417 .thenAccept(result -> {
418 assertFalse(result);
419 }).join();
420 map.rollback(transactionId2).join();
421
422 Version lock3 = map.begin(transactionId3).join();
423
424 MapUpdate<String, byte[]> update4 =
425 MapUpdate.<String, byte[]>newBuilder()
426 .withType(MapUpdate.Type.LOCK)
427 .withKey("baz")
428 .withVersion(0)
429 .build();
430
431 map.prepare(new TransactionLog<>(transactionId3, lock3.value(), Arrays.asList(update4)))
432 .thenAccept(result -> {
433 assertFalse(result);
434 }).join();
435 map.rollback(transactionId3).join();
436
437 Version lock4 = map.begin(transactionId4).join();
438
439 MapUpdate<String, byte[]> update5 =
440 MapUpdate.<String, byte[]>newBuilder()
441 .withType(MapUpdate.Type.LOCK)
442 .withKey("baz")
443 .withVersion(lock4.value())
444 .build();
445
446 map.prepare(new TransactionLog<>(transactionId4, lock4.value(), Arrays.asList(update5)))
447 .thenAccept(result -> {
448 assertTrue(result);
449 }).join();
450 }
451
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700452 protected void transactionCommitTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800453 final byte[] value1 = Tools.getBytesUtf8("value1");
454 final byte[] value2 = Tools.getBytesUtf8("value2");
455
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700456 AtomixConsistentMap map = newPrimitive("testCommitTestsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800457 TestMapEventListener listener = new TestMapEventListener();
458
459 map.addListener(listener).join();
460
Jordan Halterman5f97a302017-04-26 23:41:31 -0700461 TransactionId transactionId = TransactionId.from("tx1");
462
463 // Begin the transaction.
464 Version lock = map.begin(transactionId).join();
465
466 // PUT_IF_VERSION_MATCH
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800467 MapUpdate<String, byte[]> update1 =
Jordan Halterman5f97a302017-04-26 23:41:31 -0700468 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
469 .withKey("foo")
470 .withValue(value1)
471 .withVersion(lock.value())
472 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800473
Jordan Halterman5f97a302017-04-26 23:41:31 -0700474 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800475 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800476 }).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700477 // verify changes in Tx is not visible yet until commit
Madan Jampani40f022e2016-03-02 21:35:14 -0800478 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800479
480 map.size().thenAccept(result -> {
481 assertTrue(result == 0);
482 }).join();
483
484 map.get("foo").thenAccept(result -> {
485 assertNull(result);
486 }).join();
487
488 try {
489 map.put("foo", value2).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700490 fail("update to map entry in open tx should fail with Exception");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800491 } catch (CompletionException e) {
492 assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
493 }
494
Madan Jampani40f022e2016-03-02 21:35:14 -0800495 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800496
Jordan Halterman5f97a302017-04-26 23:41:31 -0700497 map.commit(transactionId).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800498 MapEvent<String, byte[]> event = listener.event();
499 assertNotNull(event);
500 assertEquals(MapEvent.Type.INSERT, event.type());
501 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800502
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700503 // map should be update-able after commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800504 map.put("foo", value2).thenAccept(result -> {
505 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
506 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800507 event = listener.event();
508 assertNotNull(event);
509 assertEquals(MapEvent.Type.UPDATE, event.type());
510 assertTrue(Arrays.equals(value2, event.newValue().value()));
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700511
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700512 // REMOVE_IF_VERSION_MATCH
513 byte[] currFoo = map.get("foo").get().value();
514 long currFooVersion = map.get("foo").get().version();
515 MapUpdate<String, byte[]> remove1 =
516 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
Jordan Halterman5f97a302017-04-26 23:41:31 -0700517 .withKey("foo")
518 .withVersion(currFooVersion)
519 .build();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700520
Jordan Halterman5f97a302017-04-26 23:41:31 -0700521 transactionId = TransactionId.from("tx2");
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700522
Jordan Halterman5f97a302017-04-26 23:41:31 -0700523 // Begin the transaction.
524 map.begin(transactionId).join();
525
526 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(remove1))).thenAccept(result -> {
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700527 assertTrue("prepare should succeed", result);
528 }).join();
529 // verify changes in Tx is not visible yet until commit
530 assertFalse(listener.eventReceived());
531
532 map.size().thenAccept(size -> {
533 assertThat(size, is(1));
534 }).join();
535
536 map.get("foo").thenAccept(result -> {
537 assertThat(result.value(), is(currFoo));
538 }).join();
539
Jordan Halterman5f97a302017-04-26 23:41:31 -0700540 map.commit(transactionId).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700541 event = listener.event();
542 assertNotNull(event);
543 assertEquals(MapEvent.Type.REMOVE, event.type());
544 assertArrayEquals(currFoo, event.oldValue().value());
545
546 map.size().thenAccept(size -> {
547 assertThat(size, is(0));
548 }).join();
549
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800550 }
551
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700552 protected void transactionRollbackTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800553 final byte[] value1 = Tools.getBytesUtf8("value1");
554 final byte[] value2 = Tools.getBytesUtf8("value2");
555
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700556 AtomixConsistentMap map = newPrimitive("testTransactionRollbackTestsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800557 TestMapEventListener listener = new TestMapEventListener();
558
559 map.addListener(listener).join();
560
Jordan Halterman5f97a302017-04-26 23:41:31 -0700561 TransactionId transactionId = TransactionId.from("tx1");
562
563 Version lock = map.begin(transactionId).join();
564
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800565 MapUpdate<String, byte[]> update1 =
Jordan Halterman5f97a302017-04-26 23:41:31 -0700566 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
567 .withKey("foo")
568 .withValue(value1)
569 .withVersion(lock.value())
570 .build();
571
572 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800573 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800574 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800575 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800576
Jordan Halterman5f97a302017-04-26 23:41:31 -0700577 map.rollback(transactionId).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800578 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800579
580 map.get("foo").thenAccept(result -> {
581 assertNull(result);
582 }).join();
583
584 map.put("foo", value2).thenAccept(result -> {
585 assertNull(result);
586 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800587 MapEvent<String, byte[]> event = listener.event();
588 assertNotNull(event);
589 assertEquals(MapEvent.Type.INSERT, event.type());
590 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800591 }
592
Jordan Haltermanca7660a2018-04-04 23:43:23 -0700593 @Test
594 public void testCompatibilityFunction() throws Throwable {
595 AtomixConsistentMap atomixMap = newPrimitive("testCompatibilityFunction");
596
597 Serializer rawSerializer = Serializer.using(KryoNamespaces.API, CompatibleValue.class);
598 Serializer valueSerializer = Serializer.using(KryoNamespaces.BASIC);
599
600 // Convert the byte[] value to CompatibleValue<byte[]>
601 AsyncConsistentMap<String, CompatibleValue<byte[]>> rawMap = DistributedPrimitives.newTranscodingMap(
602 atomixMap,
603 key -> HexString.toHexString(rawSerializer.encode(key)),
604 string -> rawSerializer.decode(HexString.fromHexString(string)),
605 value -> value == null ? null : rawSerializer.encode(value),
606 bytes -> rawSerializer.decode(bytes));
607
608 // Convert the CompatibleValue<byte[]> value to CompatibleValue<V> using the user-provided serializer.
609 AsyncConsistentMap<String, CompatibleValue<String>> compatibleMap =
610 DistributedPrimitives.newTranscodingMap(
611 rawMap,
612 key -> key,
613 key -> key,
614 value -> value == null ? null :
615 new CompatibleValue<byte[]>(valueSerializer.encode(value.value()), value.version()),
616 value -> value == null ? null :
617 new CompatibleValue<String>(valueSerializer.decode(value.value()), value.version()));
618
619 AsyncConsistentMap<String, String> map1 = DistributedPrimitives.newCompatibleMap(
620 compatibleMap,
621 (value, version) -> version + ":" + value,
622 org.onosproject.core.Version.version("1.0.0"));
623 AsyncConsistentMap<String, String> map2 = DistributedPrimitives.newCompatibleMap(
624 compatibleMap,
625 (value, version) -> version + ":" + value,
626 org.onosproject.core.Version.version("1.0.1"));
627
628 map1.put("foo", "Hello world!").join();
629 assertEquals("Hello world!", map1.get("foo").join().value());
630 assertEquals("1.0.0:Hello world!", map2.get("foo").join().value());
631
632 map2.put("bar", "Hello world again!").join();
633 assertEquals("Hello world again!", map2.get("bar").join().value());
634 assertEquals("1.0.1:Hello world again!", map1.get("bar").join().value());
635 }
636
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800637 private static class TestMapEventListener implements MapEventListener<String, byte[]> {
638
Madan Jampani40f022e2016-03-02 21:35:14 -0800639 private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800640
641 @Override
642 public void event(MapEvent<String, byte[]> event) {
Madan Jampani40f022e2016-03-02 21:35:14 -0800643 try {
644 queue.put(event);
645 } catch (InterruptedException e) {
Ray Milkey6a51cb92018-03-06 09:03:03 -0800646 Thread.currentThread().interrupt();
647 throw new IllegalStateException(e);
Madan Jampani40f022e2016-03-02 21:35:14 -0800648 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800649 }
650
Madan Jampani40f022e2016-03-02 21:35:14 -0800651 public boolean eventReceived() {
652 return !queue.isEmpty();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800653 }
654
Madan Jampani40f022e2016-03-02 21:35:14 -0800655 public MapEvent<String, byte[]> event() throws InterruptedException {
656 return queue.take();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800657 }
658 }
659}