blob: 23d528bd09c0d35114dbb7d9a20358399a7182b7 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070018import com.google.common.base.Throwables;
19import com.google.common.collect.Sets;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceType;
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -070021import org.junit.AfterClass;
22import org.junit.BeforeClass;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import org.junit.Test;
24import org.onlab.util.Tools;
Madan Jampani74da78b2016-02-09 21:18:36 -080025import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080026import org.onosproject.store.primitives.TransactionId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import org.onosproject.store.service.MapEvent;
28import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080029import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030import org.onosproject.store.service.Versioned;
31
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070032import java.util.Arrays;
33import java.util.ConcurrentModificationException;
34import java.util.List;
35import java.util.concurrent.ArrayBlockingQueue;
36import java.util.concurrent.BlockingQueue;
37import java.util.concurrent.CompletionException;
38import java.util.stream.Collectors;
39
40import static org.hamcrest.Matchers.is;
41import static org.junit.Assert.assertArrayEquals;
42import static org.junit.Assert.assertEquals;
43import static org.junit.Assert.assertFalse;
Jordan Haltermanf6272442017-04-20 02:18:08 -070044import static org.junit.Assert.assertNotEquals;
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070045import static org.junit.Assert.assertNotNull;
46import static org.junit.Assert.assertNull;
47import static org.junit.Assert.assertThat;
48import static org.junit.Assert.assertTrue;
49import static org.junit.Assert.fail;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080050
51/**
52 * Unit tests for {@link AtomixConsistentMap}.
53 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054public class AtomixConsistentMapTest extends AtomixTestBase {
55
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -070056 @BeforeClass
57 public static void preTestSetup() throws Throwable {
58 createCopycatServers(3);
59 }
60
61 @AfterClass
62 public static void postTestCleanup() throws Exception {
63 clearTests();
64 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065 @Override
66 protected ResourceType resourceType() {
67 return new ResourceType(AtomixConsistentMap.class);
68 }
69
70 /**
71 * Tests various basic map operations.
72 */
73 @Test
74 public void testBasicMapOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070075 basicMapOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076 }
77
78 /**
79 * Tests various map compute* operations on different cluster sizes.
80 */
81 @Test
82 public void testMapComputeOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070083 mapComputeOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 }
85
86 /**
87 * Tests map event notifications.
88 */
89 @Test
90 public void testMapListeners() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070091 mapListenerTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080092 }
93
94 /**
95 * Tests map transaction commit.
96 */
97 @Test
98 public void testTransactionCommit() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070099 transactionCommitTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800100 }
101
102 /**
103 * Tests map transaction rollback.
104 */
105 @Test
106 public void testTransactionRollback() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700107 transactionRollbackTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 }
109
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700110 protected void basicMapOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800111 final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
112 final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
113
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700114 AtomixConsistentMap map = createAtomixClient().getResource("testBasicMapOperationMap",
115 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800116
117 map.isEmpty().thenAccept(result -> {
118 assertTrue(result);
119 }).join();
120
Jordan Haltermanf6272442017-04-20 02:18:08 -0700121 map.getOrDefault("nothing", null).thenAccept(result -> {
122 assertEquals(0, result.version());
123 assertNull(result.value());
124 }).join();
125
126 map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
127 assertEquals(0, result.version());
128 assertArrayEquals("bar".getBytes(), result.value());
129 }).join();
130
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800131 map.put("foo", rawFooValue).thenAccept(result -> {
132 assertNull(result);
133 }).join();
134
135 map.size().thenAccept(result -> {
136 assertTrue(result == 1);
137 }).join();
138
139 map.isEmpty().thenAccept(result -> {
140 assertFalse(result);
141 }).join();
142
143 map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
144 assertNotNull(result);
145 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
146 }).join();
147
148 map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
149 assertNull(result);
150 }).join();
151
152 map.size().thenAccept(result -> {
153 assertTrue(result == 2);
154 }).join();
155
156 map.keySet().thenAccept(result -> {
157 assertTrue(result.size() == 2);
158 assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
159 }).join();
160
161 map.values().thenAccept(result -> {
162 assertTrue(result.size() == 2);
163 List<String> rawValues =
164 result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
165 assertTrue(rawValues.contains("Hello foo!"));
166 assertTrue(rawValues.contains("Hello bar!"));
167 }).join();
168
169 map.entrySet().thenAccept(result -> {
170 assertTrue(result.size() == 2);
171 // TODO: check entries
172 }).join();
173
174 map.get("foo").thenAccept(result -> {
175 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
176 }).join();
177
Jordan Haltermanf6272442017-04-20 02:18:08 -0700178 map.getOrDefault("foo", "bar".getBytes()).thenAccept(result -> {
179 assertNotEquals(0, result.version());
180 assertArrayEquals(rawFooValue, result.value());
181 }).join();
182
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800183 map.remove("foo").thenAccept(result -> {
184 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
185 }).join();
186
187 map.containsKey("foo").thenAccept(result -> {
188 assertFalse(result);
189 }).join();
190
191 map.get("foo").thenAccept(result -> {
192 assertNull(result);
193 }).join();
194
195 map.get("bar").thenAccept(result -> {
196 assertNotNull(result);
197 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
198 }).join();
199
200 map.containsKey("bar").thenAccept(result -> {
201 assertTrue(result);
202 }).join();
203
204 map.size().thenAccept(result -> {
205 assertTrue(result == 1);
206 }).join();
207
208 map.containsValue(rawBarValue).thenAccept(result -> {
209 assertTrue(result);
210 }).join();
211
212 map.containsValue(rawFooValue).thenAccept(result -> {
213 assertFalse(result);
214 }).join();
215
216 map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
217 assertNotNull(result);
218 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
219 }).join();
220
221 map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
222 assertNull(result);
223 }).join();
224
225 // try replace_if_value_match for a non-existent key
226 map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
227 assertFalse(result);
228 }).join();
229
230 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
231 assertTrue(result);
232 }).join();
233
234 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
235 assertFalse(result);
236 }).join();
237
238 Versioned<byte[]> barValue = map.get("bar").join();
239 map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
240 assertTrue(result);
241 }).join();
242
243 map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
244 assertFalse(result);
245 }).join();
246
247 map.clear().join();
248
249 map.size().thenAccept(result -> {
250 assertTrue(result == 0);
251 }).join();
252 }
253
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700254 public void mapComputeOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800255 final byte[] value1 = Tools.getBytesUtf8("value1");
256 final byte[] value2 = Tools.getBytesUtf8("value2");
257 final byte[] value3 = Tools.getBytesUtf8("value3");
258
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700259 AtomixConsistentMap map = createAtomixClient().getResource("testMapComputeOperationsMap",
260 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800261
262 map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
263 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
264 }).join();
265
266 map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
267 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
268 }).join();
269
270 map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
271 assertNull(result);
272 });
273
274 map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
275 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
276 }).join();
277
278 map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
279 assertNull(result);
280 }).join();
281
282 map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
283 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
284 }).join();
285
286 map.compute("foo", (k, v) -> value2).thenAccept(result -> {
287 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
288 }).join();
289 }
290
291
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700292 protected void mapListenerTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800293 final byte[] value1 = Tools.getBytesUtf8("value1");
294 final byte[] value2 = Tools.getBytesUtf8("value2");
295 final byte[] value3 = Tools.getBytesUtf8("value3");
296
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700297 AtomixConsistentMap map = createAtomixClient().getResource("testMapListenerMap",
298 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800299 TestMapEventListener listener = new TestMapEventListener();
300
301 // add listener; insert new value into map and verify an INSERT event is received.
Madan Jampani40f022e2016-03-02 21:35:14 -0800302 map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
303 MapEvent<String, byte[]> event = listener.event();
304 assertNotNull(event);
305 assertEquals(MapEvent.Type.INSERT, event.type());
306 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800307
308 // remove listener and verify listener is not notified.
Madan Jampani40f022e2016-03-02 21:35:14 -0800309 map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
310 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800311
312 // add the listener back and verify UPDATE events are received correctly
Madan Jampani40f022e2016-03-02 21:35:14 -0800313 map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
314 event = listener.event();
315 assertNotNull(event);
316 assertEquals(MapEvent.Type.UPDATE, event.type());
317 assertTrue(Arrays.equals(value3, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800318
319 // perform a non-state changing operation and verify no events are received.
320 map.putIfAbsent("foo", value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800321 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800322
323 // verify REMOVE events are received correctly.
324 map.remove("foo").join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800325 event = listener.event();
326 assertNotNull(event);
327 assertEquals(MapEvent.Type.REMOVE, event.type());
328 assertTrue(Arrays.equals(value3, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800329
330 // verify compute methods also generate events.
331 map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800332 event = listener.event();
333 assertNotNull(event);
334 assertEquals(MapEvent.Type.INSERT, event.type());
335 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800336
337 map.compute("foo", (k, v) -> value2).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800338 event = listener.event();
339 assertNotNull(event);
340 assertEquals(MapEvent.Type.UPDATE, event.type());
341 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800342
343 map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800344 event = listener.event();
345 assertNotNull(event);
346 assertEquals(MapEvent.Type.REMOVE, event.type());
347 assertTrue(Arrays.equals(value2, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800348
349 map.removeListener(listener).join();
350 }
351
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700352 protected void transactionCommitTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800353 final byte[] value1 = Tools.getBytesUtf8("value1");
354 final byte[] value2 = Tools.getBytesUtf8("value2");
355
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700356 AtomixConsistentMap map = createAtomixClient().getResource("testCommitTestsMap",
357 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800358 TestMapEventListener listener = new TestMapEventListener();
359
360 map.addListener(listener).join();
361
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700362 // PUT_IF_ABSENT
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800363 MapUpdate<String, byte[]> update1 =
364 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
365 .withKey("foo")
366 .withValue(value1)
367 .build();
368
Madan Jampani74da78b2016-02-09 21:18:36 -0800369 MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800370
Madan Jampanicadd70b2016-02-08 13:45:43 -0800371 map.prepare(tx).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800372 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800373 }).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700374 // verify changes in Tx is not visible yet until commit
Madan Jampani40f022e2016-03-02 21:35:14 -0800375 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800376
377 map.size().thenAccept(result -> {
378 assertTrue(result == 0);
379 }).join();
380
381 map.get("foo").thenAccept(result -> {
382 assertNull(result);
383 }).join();
384
385 try {
386 map.put("foo", value2).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700387 fail("update to map entry in open tx should fail with Exception");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800388 } catch (CompletionException e) {
389 assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
390 }
391
Madan Jampani40f022e2016-03-02 21:35:14 -0800392 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800393
Madan Jampani74da78b2016-02-09 21:18:36 -0800394 map.commit(tx.transactionId()).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800395 MapEvent<String, byte[]> event = listener.event();
396 assertNotNull(event);
397 assertEquals(MapEvent.Type.INSERT, event.type());
398 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800399
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700400 // map should be update-able after commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800401 map.put("foo", value2).thenAccept(result -> {
402 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
403 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800404 event = listener.event();
405 assertNotNull(event);
406 assertEquals(MapEvent.Type.UPDATE, event.type());
407 assertTrue(Arrays.equals(value2, event.newValue().value()));
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700408
409
410 // REMOVE_IF_VERSION_MATCH
411 byte[] currFoo = map.get("foo").get().value();
412 long currFooVersion = map.get("foo").get().version();
413 MapUpdate<String, byte[]> remove1 =
414 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
415 .withKey("foo")
416 .withCurrentVersion(currFooVersion)
417 .build();
418
419 tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1));
420
421 map.prepare(tx).thenAccept(result -> {
422 assertTrue("prepare should succeed", result);
423 }).join();
424 // verify changes in Tx is not visible yet until commit
425 assertFalse(listener.eventReceived());
426
427 map.size().thenAccept(size -> {
428 assertThat(size, is(1));
429 }).join();
430
431 map.get("foo").thenAccept(result -> {
432 assertThat(result.value(), is(currFoo));
433 }).join();
434
435 map.commit(tx.transactionId()).join();
436 event = listener.event();
437 assertNotNull(event);
438 assertEquals(MapEvent.Type.REMOVE, event.type());
439 assertArrayEquals(currFoo, event.oldValue().value());
440
441 map.size().thenAccept(size -> {
442 assertThat(size, is(0));
443 }).join();
444
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800445 }
446
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700447 protected void transactionRollbackTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800448 final byte[] value1 = Tools.getBytesUtf8("value1");
449 final byte[] value2 = Tools.getBytesUtf8("value2");
450
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700451 AtomixConsistentMap map = createAtomixClient().getResource("testTransactionRollbackTestsMap",
452 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800453 TestMapEventListener listener = new TestMapEventListener();
454
455 map.addListener(listener).join();
456
457 MapUpdate<String, byte[]> update1 =
458 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
459 .withKey("foo")
460 .withValue(value1)
461 .build();
Madan Jampani74da78b2016-02-09 21:18:36 -0800462 MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
Madan Jampanicadd70b2016-02-08 13:45:43 -0800463 map.prepare(tx).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800464 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800465 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800466 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800467
Madan Jampani74da78b2016-02-09 21:18:36 -0800468 map.rollback(tx.transactionId()).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800469 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800470
471 map.get("foo").thenAccept(result -> {
472 assertNull(result);
473 }).join();
474
475 map.put("foo", value2).thenAccept(result -> {
476 assertNull(result);
477 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800478 MapEvent<String, byte[]> event = listener.event();
479 assertNotNull(event);
480 assertEquals(MapEvent.Type.INSERT, event.type());
481 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800482 }
483
484 private static class TestMapEventListener implements MapEventListener<String, byte[]> {
485
Madan Jampani40f022e2016-03-02 21:35:14 -0800486 private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800487
488 @Override
489 public void event(MapEvent<String, byte[]> event) {
Madan Jampani40f022e2016-03-02 21:35:14 -0800490 try {
491 queue.put(event);
492 } catch (InterruptedException e) {
493 Throwables.propagate(e);
494 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800495 }
496
Madan Jampani40f022e2016-03-02 21:35:14 -0800497 public boolean eventReceived() {
498 return !queue.isEmpty();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800499 }
500
Madan Jampani40f022e2016-03-02 21:35:14 -0800501 public MapEvent<String, byte[]> event() throws InterruptedException {
502 return queue.take();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800503 }
504 }
505}