blob: ac4c60dca463f69a5d6c5893e4eaa38d823361b6 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070018import com.google.common.base.Throwables;
19import com.google.common.collect.Sets;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceType;
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -070021import org.junit.AfterClass;
22import org.junit.BeforeClass;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import org.junit.Test;
24import org.onlab.util.Tools;
Madan Jampani74da78b2016-02-09 21:18:36 -080025import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080026import org.onosproject.store.primitives.TransactionId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import org.onosproject.store.service.MapEvent;
28import org.onosproject.store.service.MapEventListener;
Jordan Halterman948d6592017-04-20 17:18:24 -070029import org.onosproject.store.service.TransactionLog;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030import org.onosproject.store.service.Versioned;
31
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070032import java.util.Arrays;
33import java.util.ConcurrentModificationException;
34import java.util.List;
35import java.util.concurrent.ArrayBlockingQueue;
36import java.util.concurrent.BlockingQueue;
37import java.util.concurrent.CompletionException;
38import java.util.stream.Collectors;
39
40import static org.hamcrest.Matchers.is;
41import static org.junit.Assert.assertArrayEquals;
42import static org.junit.Assert.assertEquals;
43import static org.junit.Assert.assertFalse;
Jordan Haltermanf6272442017-04-20 02:18:08 -070044import static org.junit.Assert.assertNotEquals;
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070045import static org.junit.Assert.assertNotNull;
46import static org.junit.Assert.assertNull;
47import static org.junit.Assert.assertThat;
48import static org.junit.Assert.assertTrue;
49import static org.junit.Assert.fail;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080050
51/**
52 * Unit tests for {@link AtomixConsistentMap}.
53 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054public class AtomixConsistentMapTest extends AtomixTestBase {
55
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -070056 @BeforeClass
57 public static void preTestSetup() throws Throwable {
58 createCopycatServers(3);
59 }
60
61 @AfterClass
62 public static void postTestCleanup() throws Exception {
63 clearTests();
64 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065 @Override
66 protected ResourceType resourceType() {
67 return new ResourceType(AtomixConsistentMap.class);
68 }
69
70 /**
71 * Tests various basic map operations.
72 */
73 @Test
74 public void testBasicMapOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070075 basicMapOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076 }
77
78 /**
79 * Tests various map compute* operations on different cluster sizes.
80 */
81 @Test
82 public void testMapComputeOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070083 mapComputeOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 }
85
86 /**
87 * Tests map event notifications.
88 */
89 @Test
90 public void testMapListeners() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070091 mapListenerTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080092 }
93
94 /**
95 * Tests map transaction commit.
96 */
97 @Test
98 public void testTransactionCommit() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070099 transactionCommitTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800100 }
101
102 /**
103 * Tests map transaction rollback.
104 */
105 @Test
106 public void testTransactionRollback() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700107 transactionRollbackTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 }
109
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700110 protected void basicMapOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800111 final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
112 final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
113
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700114 AtomixConsistentMap map = createAtomixClient().getResource("testBasicMapOperationMap",
115 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800116
117 map.isEmpty().thenAccept(result -> {
118 assertTrue(result);
119 }).join();
120
Jordan Haltermanf6272442017-04-20 02:18:08 -0700121 map.getOrDefault("nothing", null).thenAccept(result -> {
122 assertEquals(0, result.version());
123 assertNull(result.value());
124 }).join();
125
126 map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
127 assertEquals(0, result.version());
128 assertArrayEquals("bar".getBytes(), result.value());
129 }).join();
130
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800131 map.put("foo", rawFooValue).thenAccept(result -> {
132 assertNull(result);
133 }).join();
134
135 map.size().thenAccept(result -> {
136 assertTrue(result == 1);
137 }).join();
138
139 map.isEmpty().thenAccept(result -> {
140 assertFalse(result);
141 }).join();
142
143 map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
144 assertNotNull(result);
145 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
146 }).join();
147
148 map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
149 assertNull(result);
150 }).join();
151
152 map.size().thenAccept(result -> {
153 assertTrue(result == 2);
154 }).join();
155
156 map.keySet().thenAccept(result -> {
157 assertTrue(result.size() == 2);
158 assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
159 }).join();
160
161 map.values().thenAccept(result -> {
162 assertTrue(result.size() == 2);
163 List<String> rawValues =
164 result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
165 assertTrue(rawValues.contains("Hello foo!"));
166 assertTrue(rawValues.contains("Hello bar!"));
167 }).join();
168
169 map.entrySet().thenAccept(result -> {
170 assertTrue(result.size() == 2);
171 // TODO: check entries
172 }).join();
173
174 map.get("foo").thenAccept(result -> {
175 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
176 }).join();
177
Jordan Haltermanf6272442017-04-20 02:18:08 -0700178 map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
179 assertNotEquals(0, result.version());
180 assertArrayEquals(rawFooValue, result.value());
181 }).join();
182
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800183 map.remove("foo").thenAccept(result -> {
184 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
185 }).join();
186
187 map.containsKey("foo").thenAccept(result -> {
188 assertFalse(result);
189 }).join();
190
191 map.get("foo").thenAccept(result -> {
192 assertNull(result);
193 }).join();
194
195 map.get("bar").thenAccept(result -> {
196 assertNotNull(result);
197 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
198 }).join();
199
200 map.containsKey("bar").thenAccept(result -> {
201 assertTrue(result);
202 }).join();
203
204 map.size().thenAccept(result -> {
205 assertTrue(result == 1);
206 }).join();
207
208 map.containsValue(rawBarValue).thenAccept(result -> {
209 assertTrue(result);
210 }).join();
211
212 map.containsValue(rawFooValue).thenAccept(result -> {
213 assertFalse(result);
214 }).join();
215
216 map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
217 assertNotNull(result);
218 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
219 }).join();
220
221 map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
222 assertNull(result);
223 }).join();
224
225 // try replace_if_value_match for a non-existent key
226 map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
227 assertFalse(result);
228 }).join();
229
230 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
231 assertTrue(result);
232 }).join();
233
234 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
235 assertFalse(result);
236 }).join();
237
238 Versioned<byte[]> barValue = map.get("bar").join();
239 map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
240 assertTrue(result);
241 }).join();
242
243 map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
244 assertFalse(result);
245 }).join();
246
247 map.clear().join();
248
249 map.size().thenAccept(result -> {
250 assertTrue(result == 0);
251 }).join();
252 }
253
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700254 public void mapComputeOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800255 final byte[] value1 = Tools.getBytesUtf8("value1");
256 final byte[] value2 = Tools.getBytesUtf8("value2");
257 final byte[] value3 = Tools.getBytesUtf8("value3");
258
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700259 AtomixConsistentMap map = createAtomixClient().getResource("testMapComputeOperationsMap",
260 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800261
262 map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
263 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
264 }).join();
265
266 map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
267 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
268 }).join();
269
270 map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
271 assertNull(result);
272 });
273
274 map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
275 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
276 }).join();
277
278 map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
279 assertNull(result);
280 }).join();
281
282 map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
283 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
284 }).join();
285
286 map.compute("foo", (k, v) -> value2).thenAccept(result -> {
287 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
288 }).join();
289 }
290
291
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700292 protected void mapListenerTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800293 final byte[] value1 = Tools.getBytesUtf8("value1");
294 final byte[] value2 = Tools.getBytesUtf8("value2");
295 final byte[] value3 = Tools.getBytesUtf8("value3");
296
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700297 AtomixConsistentMap map = createAtomixClient().getResource("testMapListenerMap",
298 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800299 TestMapEventListener listener = new TestMapEventListener();
300
301 // add listener; insert new value into map and verify an INSERT event is received.
Madan Jampani40f022e2016-03-02 21:35:14 -0800302 map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
303 MapEvent<String, byte[]> event = listener.event();
304 assertNotNull(event);
305 assertEquals(MapEvent.Type.INSERT, event.type());
306 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800307
308 // remove listener and verify listener is not notified.
Madan Jampani40f022e2016-03-02 21:35:14 -0800309 map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
310 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800311
312 // add the listener back and verify UPDATE events are received correctly
Madan Jampani40f022e2016-03-02 21:35:14 -0800313 map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
314 event = listener.event();
315 assertNotNull(event);
316 assertEquals(MapEvent.Type.UPDATE, event.type());
317 assertTrue(Arrays.equals(value3, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800318
319 // perform a non-state changing operation and verify no events are received.
320 map.putIfAbsent("foo", value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800321 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800322
323 // verify REMOVE events are received correctly.
324 map.remove("foo").join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800325 event = listener.event();
326 assertNotNull(event);
327 assertEquals(MapEvent.Type.REMOVE, event.type());
328 assertTrue(Arrays.equals(value3, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800329
330 // verify compute methods also generate events.
331 map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800332 event = listener.event();
333 assertNotNull(event);
334 assertEquals(MapEvent.Type.INSERT, event.type());
335 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800336
337 map.compute("foo", (k, v) -> value2).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800338 event = listener.event();
339 assertNotNull(event);
340 assertEquals(MapEvent.Type.UPDATE, event.type());
341 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800342
343 map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800344 event = listener.event();
345 assertNotNull(event);
346 assertEquals(MapEvent.Type.REMOVE, event.type());
347 assertTrue(Arrays.equals(value2, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800348
349 map.removeListener(listener).join();
350 }
351
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700352 protected void transactionCommitTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800353 final byte[] value1 = Tools.getBytesUtf8("value1");
354 final byte[] value2 = Tools.getBytesUtf8("value2");
355
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700356 AtomixConsistentMap map = createAtomixClient().getResource("testCommitTestsMap",
357 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800358 TestMapEventListener listener = new TestMapEventListener();
359
360 map.addListener(listener).join();
361
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700362 // PUT_IF_ABSENT
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800363 MapUpdate<String, byte[]> update1 =
364 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
365 .withKey("foo")
366 .withValue(value1)
367 .build();
368
Jordan Halterman948d6592017-04-20 17:18:24 -0700369 TransactionLog<MapUpdate<String, byte[]>> tx =
370 new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800371
Madan Jampanicadd70b2016-02-08 13:45:43 -0800372 map.prepare(tx).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800373 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800374 }).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700375 // verify changes in Tx is not visible yet until commit
Madan Jampani40f022e2016-03-02 21:35:14 -0800376 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800377
378 map.size().thenAccept(result -> {
379 assertTrue(result == 0);
380 }).join();
381
382 map.get("foo").thenAccept(result -> {
383 assertNull(result);
384 }).join();
385
386 try {
387 map.put("foo", value2).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700388 fail("update to map entry in open tx should fail with Exception");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800389 } catch (CompletionException e) {
390 assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
391 }
392
Madan Jampani40f022e2016-03-02 21:35:14 -0800393 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800394
Madan Jampani74da78b2016-02-09 21:18:36 -0800395 map.commit(tx.transactionId()).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800396 MapEvent<String, byte[]> event = listener.event();
397 assertNotNull(event);
398 assertEquals(MapEvent.Type.INSERT, event.type());
399 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800400
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700401 // map should be update-able after commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800402 map.put("foo", value2).thenAccept(result -> {
403 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
404 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800405 event = listener.event();
406 assertNotNull(event);
407 assertEquals(MapEvent.Type.UPDATE, event.type());
408 assertTrue(Arrays.equals(value2, event.newValue().value()));
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700409
410
411 // REMOVE_IF_VERSION_MATCH
412 byte[] currFoo = map.get("foo").get().value();
413 long currFooVersion = map.get("foo").get().version();
414 MapUpdate<String, byte[]> remove1 =
415 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
416 .withKey("foo")
417 .withCurrentVersion(currFooVersion)
418 .build();
419
Jordan Halterman948d6592017-04-20 17:18:24 -0700420 tx = new TransactionLog<>(TransactionId.from("tx2"), Arrays.asList(remove1));
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700421
422 map.prepare(tx).thenAccept(result -> {
423 assertTrue("prepare should succeed", result);
424 }).join();
425 // verify changes in Tx is not visible yet until commit
426 assertFalse(listener.eventReceived());
427
428 map.size().thenAccept(size -> {
429 assertThat(size, is(1));
430 }).join();
431
432 map.get("foo").thenAccept(result -> {
433 assertThat(result.value(), is(currFoo));
434 }).join();
435
436 map.commit(tx.transactionId()).join();
437 event = listener.event();
438 assertNotNull(event);
439 assertEquals(MapEvent.Type.REMOVE, event.type());
440 assertArrayEquals(currFoo, event.oldValue().value());
441
442 map.size().thenAccept(size -> {
443 assertThat(size, is(0));
444 }).join();
445
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800446 }
447
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700448 protected void transactionRollbackTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800449 final byte[] value1 = Tools.getBytesUtf8("value1");
450 final byte[] value2 = Tools.getBytesUtf8("value2");
451
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700452 AtomixConsistentMap map = createAtomixClient().getResource("testTransactionRollbackTestsMap",
453 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800454 TestMapEventListener listener = new TestMapEventListener();
455
456 map.addListener(listener).join();
457
458 MapUpdate<String, byte[]> update1 =
459 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
460 .withKey("foo")
461 .withValue(value1)
462 .build();
Jordan Halterman948d6592017-04-20 17:18:24 -0700463 TransactionLog<MapUpdate<String, byte[]>> tx =
464 new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
Madan Jampanicadd70b2016-02-08 13:45:43 -0800465 map.prepare(tx).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800466 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800467 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800468 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800469
Madan Jampani74da78b2016-02-09 21:18:36 -0800470 map.rollback(tx.transactionId()).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800471 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800472
473 map.get("foo").thenAccept(result -> {
474 assertNull(result);
475 }).join();
476
477 map.put("foo", value2).thenAccept(result -> {
478 assertNull(result);
479 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800480 MapEvent<String, byte[]> event = listener.event();
481 assertNotNull(event);
482 assertEquals(MapEvent.Type.INSERT, event.type());
483 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800484 }
485
486 private static class TestMapEventListener implements MapEventListener<String, byte[]> {
487
Madan Jampani40f022e2016-03-02 21:35:14 -0800488 private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800489
490 @Override
491 public void event(MapEvent<String, byte[]> event) {
Madan Jampani40f022e2016-03-02 21:35:14 -0800492 try {
493 queue.put(event);
494 } catch (InterruptedException e) {
495 Throwables.propagate(e);
496 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800497 }
498
Madan Jampani40f022e2016-03-02 21:35:14 -0800499 public boolean eventReceived() {
500 return !queue.isEmpty();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800501 }
502
Madan Jampani40f022e2016-03-02 21:35:14 -0800503 public MapEvent<String, byte[]> event() throws InterruptedException {
504 return queue.take();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800505 }
506 }
507}