blob: 8753783b6c562a7268aba284b23fdec7854323e8 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070018import com.google.common.collect.Sets;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070019import io.atomix.protocols.raft.proxy.RaftProxy;
20import io.atomix.protocols.raft.service.RaftService;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080021import org.junit.Test;
Jordan Haltermanca7660a2018-04-04 23:43:23 -070022import org.onlab.util.HexString;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import org.onlab.util.Tools;
Madan Jampani74da78b2016-02-09 21:18:36 -080024import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080025import org.onosproject.store.primitives.TransactionId;
Jordan Haltermanca7660a2018-04-04 23:43:23 -070026import org.onosproject.store.primitives.impl.CompatibleValue;
27import org.onosproject.store.primitives.impl.DistributedPrimitives;
28import org.onosproject.store.serializers.KryoNamespaces;
29import org.onosproject.store.service.AsyncConsistentMap;
Jordan Haltermandae11602018-07-03 00:00:47 -070030import org.onosproject.store.service.AsyncIterator;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080031import org.onosproject.store.service.MapEvent;
32import org.onosproject.store.service.MapEventListener;
Jordan Haltermanca7660a2018-04-04 23:43:23 -070033import org.onosproject.store.service.Serializer;
Jordan Halterman948d6592017-04-20 17:18:24 -070034import org.onosproject.store.service.TransactionLog;
Jordan Halterman5f97a302017-04-26 23:41:31 -070035import org.onosproject.store.service.Version;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080036import org.onosproject.store.service.Versioned;
37
Jordan Haltermandae11602018-07-03 00:00:47 -070038import java.util.ArrayList;
Ray Milkey6a51cb92018-03-06 09:03:03 -080039import java.util.Arrays;
40import java.util.ConcurrentModificationException;
41import java.util.List;
Jordan Haltermandae11602018-07-03 00:00:47 -070042import java.util.Map;
43import java.util.UUID;
Ray Milkey6a51cb92018-03-06 09:03:03 -080044import java.util.concurrent.ArrayBlockingQueue;
45import java.util.concurrent.BlockingQueue;
46import java.util.concurrent.CompletionException;
Jordan Haltermandae11602018-07-03 00:00:47 -070047import java.util.concurrent.TimeUnit;
Ray Milkey6a51cb92018-03-06 09:03:03 -080048import java.util.stream.Collectors;
49
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070050import static org.hamcrest.Matchers.is;
51import static org.junit.Assert.assertArrayEquals;
52import static org.junit.Assert.assertEquals;
53import static org.junit.Assert.assertFalse;
54import static org.junit.Assert.assertNotNull;
55import static org.junit.Assert.assertNull;
56import static org.junit.Assert.assertThat;
57import static org.junit.Assert.assertTrue;
58import static org.junit.Assert.fail;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059
60/**
61 * Unit tests for {@link AtomixConsistentMap}.
62 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070063public class AtomixConsistentMapTest extends AtomixTestBase<AtomixConsistentMap> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064
65 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -070066 protected RaftService createService() {
67 return new AtomixConsistentMapService();
68 }
69
70 @Override
71 protected AtomixConsistentMap createPrimitive(RaftProxy proxy) {
72 return new AtomixConsistentMap(proxy);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080073 }
74
75 /**
76 * Tests various basic map operations.
77 */
78 @Test
79 public void testBasicMapOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070080 basicMapOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080081 }
82
83 /**
84 * Tests various map compute* operations on different cluster sizes.
85 */
86 @Test
87 public void testMapComputeOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070088 mapComputeOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080089 }
90
91 /**
Jordan Halterman4922a062017-07-31 15:55:36 -070092 * Tests null values.
93 */
94 @Test
95 public void testNullValues() throws Throwable {
96 final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
97 final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
98
99 AtomixConsistentMap map = newPrimitive("testNullValues");
100
101 map.get("foo")
102 .thenAccept(v -> assertNull(v)).join();
103 map.put("foo", null)
104 .thenAccept(v -> assertNull(v)).join();
105 map.put("foo", rawFooValue).thenAccept(v -> {
106 assertNotNull(v);
107 assertNull(v.value());
108 }).join();
109 map.get("foo").thenAccept(v -> {
110 assertNotNull(v);
111 assertTrue(Arrays.equals(v.value(), rawFooValue));
112 }).join();
113 map.replace("foo", rawFooValue, null)
114 .thenAccept(replaced -> assertTrue(replaced)).join();
115 map.get("foo").thenAccept(v -> {
116 assertNotNull(v);
117 assertNull(v.value());
118 }).join();
119 map.replace("foo", rawFooValue, rawBarValue)
120 .thenAccept(replaced -> assertFalse(replaced)).join();
121 map.replace("foo", null, rawBarValue)
122 .thenAccept(replaced -> assertTrue(replaced)).join();
123 map.get("foo").thenAccept(v -> {
124 assertNotNull(v);
125 assertTrue(Arrays.equals(v.value(), rawBarValue));
126 }).join();
127 }
128
129 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800130 * Tests map event notifications.
131 */
132 @Test
133 public void testMapListeners() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700134 mapListenerTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800135 }
136
137 /**
Jordan Halterman5f97a302017-04-26 23:41:31 -0700138 * Tests map transaction prepare.
139 */
140 @Test
141 public void testTransactionPrepare() throws Throwable {
142 transactionPrepareTests();
143 }
144
145 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800146 * Tests map transaction commit.
147 */
148 @Test
149 public void testTransactionCommit() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700150 transactionCommitTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800151 }
152
153 /**
154 * Tests map transaction rollback.
155 */
156 @Test
157 public void testTransactionRollback() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700158 transactionRollbackTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800159 }
160
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700161 protected void basicMapOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800162 final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
163 final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
164
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700165 AtomixConsistentMap map = newPrimitive("testBasicMapOperationMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166
167 map.isEmpty().thenAccept(result -> {
168 assertTrue(result);
169 }).join();
170
171 map.put("foo", rawFooValue).thenAccept(result -> {
172 assertNull(result);
173 }).join();
174
175 map.size().thenAccept(result -> {
176 assertTrue(result == 1);
177 }).join();
178
179 map.isEmpty().thenAccept(result -> {
180 assertFalse(result);
181 }).join();
182
183 map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
184 assertNotNull(result);
185 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
186 }).join();
187
188 map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
189 assertNull(result);
190 }).join();
191
192 map.size().thenAccept(result -> {
193 assertTrue(result == 2);
194 }).join();
195
196 map.keySet().thenAccept(result -> {
197 assertTrue(result.size() == 2);
198 assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
199 }).join();
200
201 map.values().thenAccept(result -> {
202 assertTrue(result.size() == 2);
203 List<String> rawValues =
204 result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
205 assertTrue(rawValues.contains("Hello foo!"));
206 assertTrue(rawValues.contains("Hello bar!"));
207 }).join();
208
209 map.entrySet().thenAccept(result -> {
210 assertTrue(result.size() == 2);
211 // TODO: check entries
212 }).join();
213
214 map.get("foo").thenAccept(result -> {
215 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
216 }).join();
217
218 map.remove("foo").thenAccept(result -> {
219 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
220 }).join();
221
222 map.containsKey("foo").thenAccept(result -> {
223 assertFalse(result);
224 }).join();
225
226 map.get("foo").thenAccept(result -> {
227 assertNull(result);
228 }).join();
229
230 map.get("bar").thenAccept(result -> {
231 assertNotNull(result);
232 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
233 }).join();
234
235 map.containsKey("bar").thenAccept(result -> {
236 assertTrue(result);
237 }).join();
238
239 map.size().thenAccept(result -> {
240 assertTrue(result == 1);
241 }).join();
242
243 map.containsValue(rawBarValue).thenAccept(result -> {
244 assertTrue(result);
245 }).join();
246
247 map.containsValue(rawFooValue).thenAccept(result -> {
248 assertFalse(result);
249 }).join();
250
251 map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
252 assertNotNull(result);
253 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
254 }).join();
255
256 map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
257 assertNull(result);
258 }).join();
259
260 // try replace_if_value_match for a non-existent key
261 map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
262 assertFalse(result);
263 }).join();
264
265 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
266 assertTrue(result);
267 }).join();
268
269 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
270 assertFalse(result);
271 }).join();
272
273 Versioned<byte[]> barValue = map.get("bar").join();
274 map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
275 assertTrue(result);
276 }).join();
277
278 map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
279 assertFalse(result);
280 }).join();
281
282 map.clear().join();
283
284 map.size().thenAccept(result -> {
285 assertTrue(result == 0);
286 }).join();
287 }
288
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700289 public void mapComputeOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800290 final byte[] value1 = Tools.getBytesUtf8("value1");
291 final byte[] value2 = Tools.getBytesUtf8("value2");
292 final byte[] value3 = Tools.getBytesUtf8("value3");
293
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700294 AtomixConsistentMap map = newPrimitive("testMapComputeOperationsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800295
296 map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
297 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
298 }).join();
299
300 map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
301 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
302 }).join();
303
304 map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
305 assertNull(result);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700306 }).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800307
308 map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
309 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
310 }).join();
311
312 map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
313 assertNull(result);
314 }).join();
315
316 map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
317 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
318 }).join();
319
320 map.compute("foo", (k, v) -> value2).thenAccept(result -> {
321 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
322 }).join();
323 }
324
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700325 protected void mapListenerTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800326 final byte[] value1 = Tools.getBytesUtf8("value1");
327 final byte[] value2 = Tools.getBytesUtf8("value2");
328 final byte[] value3 = Tools.getBytesUtf8("value3");
329
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700330 AtomixConsistentMap map = newPrimitive("testMapListenerMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800331 TestMapEventListener listener = new TestMapEventListener();
332
333 // add listener; insert new value into map and verify an INSERT event is received.
Madan Jampani40f022e2016-03-02 21:35:14 -0800334 map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
335 MapEvent<String, byte[]> event = listener.event();
336 assertNotNull(event);
337 assertEquals(MapEvent.Type.INSERT, event.type());
338 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800339
340 // remove listener and verify listener is not notified.
Madan Jampani40f022e2016-03-02 21:35:14 -0800341 map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
342 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800343
344 // add the listener back and verify UPDATE events are received correctly
Madan Jampani40f022e2016-03-02 21:35:14 -0800345 map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
346 event = listener.event();
347 assertNotNull(event);
348 assertEquals(MapEvent.Type.UPDATE, event.type());
349 assertTrue(Arrays.equals(value3, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800350
351 // perform a non-state changing operation and verify no events are received.
352 map.putIfAbsent("foo", value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800353 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800354
355 // verify REMOVE events are received correctly.
356 map.remove("foo").join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800357 event = listener.event();
358 assertNotNull(event);
359 assertEquals(MapEvent.Type.REMOVE, event.type());
360 assertTrue(Arrays.equals(value3, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800361
362 // verify compute methods also generate events.
363 map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800364 event = listener.event();
365 assertNotNull(event);
366 assertEquals(MapEvent.Type.INSERT, event.type());
367 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800368
369 map.compute("foo", (k, v) -> value2).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800370 event = listener.event();
371 assertNotNull(event);
372 assertEquals(MapEvent.Type.UPDATE, event.type());
373 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800374
375 map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800376 event = listener.event();
377 assertNotNull(event);
378 assertEquals(MapEvent.Type.REMOVE, event.type());
379 assertTrue(Arrays.equals(value2, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800380
381 map.removeListener(listener).join();
382 }
383
Jordan Halterman5f97a302017-04-26 23:41:31 -0700384 protected void transactionPrepareTests() throws Throwable {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700385 AtomixConsistentMap map = newPrimitive("testPrepareTestsMap");
Jordan Halterman5f97a302017-04-26 23:41:31 -0700386
387 TransactionId transactionId1 = TransactionId.from("tx1");
388 TransactionId transactionId2 = TransactionId.from("tx2");
389 TransactionId transactionId3 = TransactionId.from("tx3");
390 TransactionId transactionId4 = TransactionId.from("tx4");
391
392 Version lock1 = map.begin(transactionId1).join();
393
394 MapUpdate<String, byte[]> update1 =
395 MapUpdate.<String, byte[]>newBuilder()
396 .withType(MapUpdate.Type.LOCK)
397 .withKey("foo")
398 .withVersion(lock1.value())
399 .build();
400 MapUpdate<String, byte[]> update2 =
401 MapUpdate.<String, byte[]>newBuilder()
402 .withType(MapUpdate.Type.LOCK)
403 .withKey("bar")
404 .withVersion(lock1.value())
405 .build();
406
407 map.prepare(new TransactionLog<>(transactionId1, lock1.value(), Arrays.asList(update1, update2)))
408 .thenAccept(result -> {
409 assertTrue(result);
410 }).join();
411
412 Version lock2 = map.begin(transactionId2).join();
413
414 MapUpdate<String, byte[]> update3 =
415 MapUpdate.<String, byte[]>newBuilder()
416 .withType(MapUpdate.Type.LOCK)
417 .withKey("foo")
418 .withVersion(lock2.value())
419 .build();
420
421 map.prepare(new TransactionLog<>(transactionId2, lock2.value(), Arrays.asList(update3)))
422 .thenAccept(result -> {
423 assertFalse(result);
424 }).join();
425 map.rollback(transactionId2).join();
426
427 Version lock3 = map.begin(transactionId3).join();
428
429 MapUpdate<String, byte[]> update4 =
430 MapUpdate.<String, byte[]>newBuilder()
431 .withType(MapUpdate.Type.LOCK)
432 .withKey("baz")
433 .withVersion(0)
434 .build();
435
436 map.prepare(new TransactionLog<>(transactionId3, lock3.value(), Arrays.asList(update4)))
437 .thenAccept(result -> {
438 assertFalse(result);
439 }).join();
440 map.rollback(transactionId3).join();
441
442 Version lock4 = map.begin(transactionId4).join();
443
444 MapUpdate<String, byte[]> update5 =
445 MapUpdate.<String, byte[]>newBuilder()
446 .withType(MapUpdate.Type.LOCK)
447 .withKey("baz")
448 .withVersion(lock4.value())
449 .build();
450
451 map.prepare(new TransactionLog<>(transactionId4, lock4.value(), Arrays.asList(update5)))
452 .thenAccept(result -> {
453 assertTrue(result);
454 }).join();
455 }
456
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700457 protected void transactionCommitTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800458 final byte[] value1 = Tools.getBytesUtf8("value1");
459 final byte[] value2 = Tools.getBytesUtf8("value2");
460
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700461 AtomixConsistentMap map = newPrimitive("testCommitTestsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800462 TestMapEventListener listener = new TestMapEventListener();
463
464 map.addListener(listener).join();
465
Jordan Halterman5f97a302017-04-26 23:41:31 -0700466 TransactionId transactionId = TransactionId.from("tx1");
467
468 // Begin the transaction.
469 Version lock = map.begin(transactionId).join();
470
471 // PUT_IF_VERSION_MATCH
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800472 MapUpdate<String, byte[]> update1 =
Jordan Halterman5f97a302017-04-26 23:41:31 -0700473 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
474 .withKey("foo")
475 .withValue(value1)
476 .withVersion(lock.value())
477 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800478
Jordan Halterman5f97a302017-04-26 23:41:31 -0700479 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800480 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800481 }).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700482 // verify changes in Tx is not visible yet until commit
Madan Jampani40f022e2016-03-02 21:35:14 -0800483 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800484
485 map.size().thenAccept(result -> {
486 assertTrue(result == 0);
487 }).join();
488
489 map.get("foo").thenAccept(result -> {
490 assertNull(result);
491 }).join();
492
493 try {
494 map.put("foo", value2).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700495 fail("update to map entry in open tx should fail with Exception");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800496 } catch (CompletionException e) {
497 assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
498 }
499
Madan Jampani40f022e2016-03-02 21:35:14 -0800500 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800501
Jordan Halterman5f97a302017-04-26 23:41:31 -0700502 map.commit(transactionId).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800503 MapEvent<String, byte[]> event = listener.event();
504 assertNotNull(event);
505 assertEquals(MapEvent.Type.INSERT, event.type());
506 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800507
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700508 // map should be update-able after commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800509 map.put("foo", value2).thenAccept(result -> {
510 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
511 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800512 event = listener.event();
513 assertNotNull(event);
514 assertEquals(MapEvent.Type.UPDATE, event.type());
515 assertTrue(Arrays.equals(value2, event.newValue().value()));
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700516
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700517 // REMOVE_IF_VERSION_MATCH
518 byte[] currFoo = map.get("foo").get().value();
519 long currFooVersion = map.get("foo").get().version();
520 MapUpdate<String, byte[]> remove1 =
521 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
Jordan Halterman5f97a302017-04-26 23:41:31 -0700522 .withKey("foo")
523 .withVersion(currFooVersion)
524 .build();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700525
Jordan Halterman5f97a302017-04-26 23:41:31 -0700526 transactionId = TransactionId.from("tx2");
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700527
Jordan Halterman5f97a302017-04-26 23:41:31 -0700528 // Begin the transaction.
529 map.begin(transactionId).join();
530
531 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(remove1))).thenAccept(result -> {
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700532 assertTrue("prepare should succeed", result);
533 }).join();
534 // verify changes in Tx is not visible yet until commit
535 assertFalse(listener.eventReceived());
536
537 map.size().thenAccept(size -> {
538 assertThat(size, is(1));
539 }).join();
540
541 map.get("foo").thenAccept(result -> {
542 assertThat(result.value(), is(currFoo));
543 }).join();
544
Jordan Halterman5f97a302017-04-26 23:41:31 -0700545 map.commit(transactionId).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700546 event = listener.event();
547 assertNotNull(event);
548 assertEquals(MapEvent.Type.REMOVE, event.type());
549 assertArrayEquals(currFoo, event.oldValue().value());
550
551 map.size().thenAccept(size -> {
552 assertThat(size, is(0));
553 }).join();
554
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800555 }
556
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700557 protected void transactionRollbackTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800558 final byte[] value1 = Tools.getBytesUtf8("value1");
559 final byte[] value2 = Tools.getBytesUtf8("value2");
560
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700561 AtomixConsistentMap map = newPrimitive("testTransactionRollbackTestsMap");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800562 TestMapEventListener listener = new TestMapEventListener();
563
564 map.addListener(listener).join();
565
Jordan Halterman5f97a302017-04-26 23:41:31 -0700566 TransactionId transactionId = TransactionId.from("tx1");
567
568 Version lock = map.begin(transactionId).join();
569
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800570 MapUpdate<String, byte[]> update1 =
Jordan Halterman5f97a302017-04-26 23:41:31 -0700571 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
572 .withKey("foo")
573 .withValue(value1)
574 .withVersion(lock.value())
575 .build();
576
577 map.prepare(new TransactionLog<>(transactionId, lock.value(), Arrays.asList(update1))).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800578 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800579 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800580 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800581
Jordan Halterman5f97a302017-04-26 23:41:31 -0700582 map.rollback(transactionId).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800583 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800584
585 map.get("foo").thenAccept(result -> {
586 assertNull(result);
587 }).join();
588
589 map.put("foo", value2).thenAccept(result -> {
590 assertNull(result);
591 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800592 MapEvent<String, byte[]> event = listener.event();
593 assertNotNull(event);
594 assertEquals(MapEvent.Type.INSERT, event.type());
595 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800596 }
597
Jordan Haltermanca7660a2018-04-04 23:43:23 -0700598 @Test
599 public void testCompatibilityFunction() throws Throwable {
600 AtomixConsistentMap atomixMap = newPrimitive("testCompatibilityFunction");
601
602 Serializer rawSerializer = Serializer.using(KryoNamespaces.API, CompatibleValue.class);
603 Serializer valueSerializer = Serializer.using(KryoNamespaces.BASIC);
604
605 // Convert the byte[] value to CompatibleValue<byte[]>
606 AsyncConsistentMap<String, CompatibleValue<byte[]>> rawMap = DistributedPrimitives.newTranscodingMap(
607 atomixMap,
608 key -> HexString.toHexString(rawSerializer.encode(key)),
609 string -> rawSerializer.decode(HexString.fromHexString(string)),
610 value -> value == null ? null : rawSerializer.encode(value),
611 bytes -> rawSerializer.decode(bytes));
612
613 // Convert the CompatibleValue<byte[]> value to CompatibleValue<V> using the user-provided serializer.
614 AsyncConsistentMap<String, CompatibleValue<String>> compatibleMap =
615 DistributedPrimitives.newTranscodingMap(
616 rawMap,
617 key -> key,
618 key -> key,
619 value -> value == null ? null :
620 new CompatibleValue<byte[]>(valueSerializer.encode(value.value()), value.version()),
621 value -> value == null ? null :
622 new CompatibleValue<String>(valueSerializer.decode(value.value()), value.version()));
623
624 AsyncConsistentMap<String, String> map1 = DistributedPrimitives.newCompatibleMap(
625 compatibleMap,
626 (value, version) -> version + ":" + value,
627 org.onosproject.core.Version.version("1.0.0"));
628 AsyncConsistentMap<String, String> map2 = DistributedPrimitives.newCompatibleMap(
629 compatibleMap,
630 (value, version) -> version + ":" + value,
631 org.onosproject.core.Version.version("1.0.1"));
632
633 map1.put("foo", "Hello world!").join();
634 assertEquals("Hello world!", map1.get("foo").join().value());
635 assertEquals("1.0.0:Hello world!", map2.get("foo").join().value());
636
637 map2.put("bar", "Hello world again!").join();
638 assertEquals("Hello world again!", map2.get("bar").join().value());
639 assertEquals("1.0.1:Hello world again!", map1.get("bar").join().value());
640 }
641
Jordan Haltermandae11602018-07-03 00:00:47 -0700642 @Test
643 public void testIterator() throws Exception {
644 AtomixConsistentMap map = newPrimitive("testIterator");
645 for (int i = 0; i < 100; i++) {
646 for (int j = 0; j < 100; j++) {
647 map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
648 }
649 }
650
651 List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
652 AsyncIterator<Map.Entry<String, Versioned<byte[]>>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
653 while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
654 map.put("foo", UUID.randomUUID().toString().getBytes()).join();
655 entries.add(iterator.next().get(5, TimeUnit.SECONDS));
656 }
657 assertEquals(100, entries.size());
658 assertEquals(101, map.asConsistentMap().stream().count());
659 }
660
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800661 private static class TestMapEventListener implements MapEventListener<String, byte[]> {
662
Madan Jampani40f022e2016-03-02 21:35:14 -0800663 private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800664
665 @Override
666 public void event(MapEvent<String, byte[]> event) {
Madan Jampani40f022e2016-03-02 21:35:14 -0800667 try {
668 queue.put(event);
669 } catch (InterruptedException e) {
Ray Milkey6a51cb92018-03-06 09:03:03 -0800670 Thread.currentThread().interrupt();
671 throw new IllegalStateException(e);
Madan Jampani40f022e2016-03-02 21:35:14 -0800672 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800673 }
674
Madan Jampani40f022e2016-03-02 21:35:14 -0800675 public boolean eventReceived() {
676 return !queue.isEmpty();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800677 }
678
Madan Jampani40f022e2016-03-02 21:35:14 -0800679 public MapEvent<String, byte[]> event() throws InterruptedException {
680 return queue.take();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800681 }
682 }
683}