blob: 5d4ef11dff7c1fe34dd4970f32010e764fd82e1d [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070018import com.google.common.base.Throwables;
19import com.google.common.collect.Sets;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceType;
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -070021import org.junit.AfterClass;
22import org.junit.BeforeClass;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import org.junit.Test;
24import org.onlab.util.Tools;
Madan Jampani74da78b2016-02-09 21:18:36 -080025import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080026import org.onosproject.store.primitives.TransactionId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import org.onosproject.store.service.MapEvent;
28import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080029import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030import org.onosproject.store.service.Versioned;
31
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070032import java.util.Arrays;
33import java.util.ConcurrentModificationException;
34import java.util.List;
35import java.util.concurrent.ArrayBlockingQueue;
36import java.util.concurrent.BlockingQueue;
37import java.util.concurrent.CompletionException;
38import java.util.stream.Collectors;
39
40import static org.hamcrest.Matchers.is;
41import static org.junit.Assert.assertArrayEquals;
42import static org.junit.Assert.assertEquals;
43import static org.junit.Assert.assertFalse;
44import static org.junit.Assert.assertNotNull;
45import static org.junit.Assert.assertNull;
46import static org.junit.Assert.assertThat;
47import static org.junit.Assert.assertTrue;
48import static org.junit.Assert.fail;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080049
50/**
51 * Unit tests for {@link AtomixConsistentMap}.
52 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -080053public class AtomixConsistentMapTest extends AtomixTestBase {
54
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -070055 @BeforeClass
56 public static void preTestSetup() throws Throwable {
57 createCopycatServers(3);
58 }
59
60 @AfterClass
61 public static void postTestCleanup() throws Exception {
62 clearTests();
63 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064 @Override
65 protected ResourceType resourceType() {
66 return new ResourceType(AtomixConsistentMap.class);
67 }
68
69 /**
70 * Tests various basic map operations.
71 */
72 @Test
73 public void testBasicMapOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070074 basicMapOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080075 }
76
77 /**
78 * Tests various map compute* operations on different cluster sizes.
79 */
80 @Test
81 public void testMapComputeOperations() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070082 mapComputeOperationTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080083 }
84
85 /**
86 * Tests map event notifications.
87 */
88 @Test
89 public void testMapListeners() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070090 mapListenerTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080091 }
92
93 /**
94 * Tests map transaction commit.
95 */
96 @Test
97 public void testTransactionCommit() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -070098 transactionCommitTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099 }
100
101 /**
102 * Tests map transaction rollback.
103 */
104 @Test
105 public void testTransactionRollback() throws Throwable {
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700106 transactionRollbackTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800107 }
108
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700109 protected void basicMapOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
111 final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
112
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700113 AtomixConsistentMap map = createAtomixClient().getResource("testBasicMapOperationMap",
114 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800115
116 map.isEmpty().thenAccept(result -> {
117 assertTrue(result);
118 }).join();
119
120 map.put("foo", rawFooValue).thenAccept(result -> {
121 assertNull(result);
122 }).join();
123
124 map.size().thenAccept(result -> {
125 assertTrue(result == 1);
126 }).join();
127
128 map.isEmpty().thenAccept(result -> {
129 assertFalse(result);
130 }).join();
131
132 map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
133 assertNotNull(result);
134 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
135 }).join();
136
137 map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
138 assertNull(result);
139 }).join();
140
141 map.size().thenAccept(result -> {
142 assertTrue(result == 2);
143 }).join();
144
145 map.keySet().thenAccept(result -> {
146 assertTrue(result.size() == 2);
147 assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
148 }).join();
149
150 map.values().thenAccept(result -> {
151 assertTrue(result.size() == 2);
152 List<String> rawValues =
153 result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
154 assertTrue(rawValues.contains("Hello foo!"));
155 assertTrue(rawValues.contains("Hello bar!"));
156 }).join();
157
158 map.entrySet().thenAccept(result -> {
159 assertTrue(result.size() == 2);
160 // TODO: check entries
161 }).join();
162
163 map.get("foo").thenAccept(result -> {
164 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
165 }).join();
166
167 map.remove("foo").thenAccept(result -> {
168 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
169 }).join();
170
171 map.containsKey("foo").thenAccept(result -> {
172 assertFalse(result);
173 }).join();
174
175 map.get("foo").thenAccept(result -> {
176 assertNull(result);
177 }).join();
178
179 map.get("bar").thenAccept(result -> {
180 assertNotNull(result);
181 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
182 }).join();
183
184 map.containsKey("bar").thenAccept(result -> {
185 assertTrue(result);
186 }).join();
187
188 map.size().thenAccept(result -> {
189 assertTrue(result == 1);
190 }).join();
191
192 map.containsValue(rawBarValue).thenAccept(result -> {
193 assertTrue(result);
194 }).join();
195
196 map.containsValue(rawFooValue).thenAccept(result -> {
197 assertFalse(result);
198 }).join();
199
200 map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
201 assertNotNull(result);
202 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
203 }).join();
204
205 map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
206 assertNull(result);
207 }).join();
208
209 // try replace_if_value_match for a non-existent key
210 map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
211 assertFalse(result);
212 }).join();
213
214 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
215 assertTrue(result);
216 }).join();
217
218 map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
219 assertFalse(result);
220 }).join();
221
222 Versioned<byte[]> barValue = map.get("bar").join();
223 map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
224 assertTrue(result);
225 }).join();
226
227 map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
228 assertFalse(result);
229 }).join();
230
231 map.clear().join();
232
233 map.size().thenAccept(result -> {
234 assertTrue(result == 0);
235 }).join();
236 }
237
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700238 public void mapComputeOperationTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800239 final byte[] value1 = Tools.getBytesUtf8("value1");
240 final byte[] value2 = Tools.getBytesUtf8("value2");
241 final byte[] value3 = Tools.getBytesUtf8("value3");
242
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700243 AtomixConsistentMap map = createAtomixClient().getResource("testMapComputeOperationsMap",
244 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800245
246 map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
247 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
248 }).join();
249
250 map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
251 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
252 }).join();
253
254 map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
255 assertNull(result);
256 });
257
258 map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
259 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
260 }).join();
261
262 map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
263 assertNull(result);
264 }).join();
265
266 map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
267 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
268 }).join();
269
270 map.compute("foo", (k, v) -> value2).thenAccept(result -> {
271 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
272 }).join();
273 }
274
275
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700276 protected void mapListenerTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800277 final byte[] value1 = Tools.getBytesUtf8("value1");
278 final byte[] value2 = Tools.getBytesUtf8("value2");
279 final byte[] value3 = Tools.getBytesUtf8("value3");
280
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700281 AtomixConsistentMap map = createAtomixClient().getResource("testMapListenerMap",
282 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800283 TestMapEventListener listener = new TestMapEventListener();
284
285 // add listener; insert new value into map and verify an INSERT event is received.
Madan Jampani40f022e2016-03-02 21:35:14 -0800286 map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
287 MapEvent<String, byte[]> event = listener.event();
288 assertNotNull(event);
289 assertEquals(MapEvent.Type.INSERT, event.type());
290 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800291
292 // remove listener and verify listener is not notified.
Madan Jampani40f022e2016-03-02 21:35:14 -0800293 map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
294 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800295
296 // add the listener back and verify UPDATE events are received correctly
Madan Jampani40f022e2016-03-02 21:35:14 -0800297 map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
298 event = listener.event();
299 assertNotNull(event);
300 assertEquals(MapEvent.Type.UPDATE, event.type());
301 assertTrue(Arrays.equals(value3, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800302
303 // perform a non-state changing operation and verify no events are received.
304 map.putIfAbsent("foo", value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800305 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800306
307 // verify REMOVE events are received correctly.
308 map.remove("foo").join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800309 event = listener.event();
310 assertNotNull(event);
311 assertEquals(MapEvent.Type.REMOVE, event.type());
312 assertTrue(Arrays.equals(value3, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800313
314 // verify compute methods also generate events.
315 map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800316 event = listener.event();
317 assertNotNull(event);
318 assertEquals(MapEvent.Type.INSERT, event.type());
319 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800320
321 map.compute("foo", (k, v) -> value2).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800322 event = listener.event();
323 assertNotNull(event);
324 assertEquals(MapEvent.Type.UPDATE, event.type());
325 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800326
327 map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800328 event = listener.event();
329 assertNotNull(event);
330 assertEquals(MapEvent.Type.REMOVE, event.type());
331 assertTrue(Arrays.equals(value2, event.oldValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800332
333 map.removeListener(listener).join();
334 }
335
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700336 protected void transactionCommitTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800337 final byte[] value1 = Tools.getBytesUtf8("value1");
338 final byte[] value2 = Tools.getBytesUtf8("value2");
339
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700340 AtomixConsistentMap map = createAtomixClient().getResource("testCommitTestsMap",
341 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800342 TestMapEventListener listener = new TestMapEventListener();
343
344 map.addListener(listener).join();
345
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700346 // PUT_IF_ABSENT
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800347 MapUpdate<String, byte[]> update1 =
348 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
349 .withKey("foo")
350 .withValue(value1)
351 .build();
352
Madan Jampani74da78b2016-02-09 21:18:36 -0800353 MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800354
Madan Jampanicadd70b2016-02-08 13:45:43 -0800355 map.prepare(tx).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800356 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800357 }).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700358 // verify changes in Tx is not visible yet until commit
Madan Jampani40f022e2016-03-02 21:35:14 -0800359 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800360
361 map.size().thenAccept(result -> {
362 assertTrue(result == 0);
363 }).join();
364
365 map.get("foo").thenAccept(result -> {
366 assertNull(result);
367 }).join();
368
369 try {
370 map.put("foo", value2).join();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700371 fail("update to map entry in open tx should fail with Exception");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800372 } catch (CompletionException e) {
373 assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
374 }
375
Madan Jampani40f022e2016-03-02 21:35:14 -0800376 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800377
Madan Jampani74da78b2016-02-09 21:18:36 -0800378 map.commit(tx.transactionId()).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800379 MapEvent<String, byte[]> event = listener.event();
380 assertNotNull(event);
381 assertEquals(MapEvent.Type.INSERT, event.type());
382 assertTrue(Arrays.equals(value1, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800383
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700384 // map should be update-able after commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800385 map.put("foo", value2).thenAccept(result -> {
386 assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
387 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800388 event = listener.event();
389 assertNotNull(event);
390 assertEquals(MapEvent.Type.UPDATE, event.type());
391 assertTrue(Arrays.equals(value2, event.newValue().value()));
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700392
393
394 // REMOVE_IF_VERSION_MATCH
395 byte[] currFoo = map.get("foo").get().value();
396 long currFooVersion = map.get("foo").get().version();
397 MapUpdate<String, byte[]> remove1 =
398 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
399 .withKey("foo")
400 .withCurrentVersion(currFooVersion)
401 .build();
402
403 tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1));
404
405 map.prepare(tx).thenAccept(result -> {
406 assertTrue("prepare should succeed", result);
407 }).join();
408 // verify changes in Tx is not visible yet until commit
409 assertFalse(listener.eventReceived());
410
411 map.size().thenAccept(size -> {
412 assertThat(size, is(1));
413 }).join();
414
415 map.get("foo").thenAccept(result -> {
416 assertThat(result.value(), is(currFoo));
417 }).join();
418
419 map.commit(tx.transactionId()).join();
420 event = listener.event();
421 assertNotNull(event);
422 assertEquals(MapEvent.Type.REMOVE, event.type());
423 assertArrayEquals(currFoo, event.oldValue().value());
424
425 map.size().thenAccept(size -> {
426 assertThat(size, is(0));
427 }).join();
428
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800429 }
430
Aaron Kruglikovc38dc7f2016-07-19 10:00:11 -0700431 protected void transactionRollbackTests() throws Throwable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800432 final byte[] value1 = Tools.getBytesUtf8("value1");
433 final byte[] value2 = Tools.getBytesUtf8("value2");
434
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700435 AtomixConsistentMap map = createAtomixClient().getResource("testTransactionRollbackTestsMap",
436 AtomixConsistentMap.class).join();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800437 TestMapEventListener listener = new TestMapEventListener();
438
439 map.addListener(listener).join();
440
441 MapUpdate<String, byte[]> update1 =
442 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
443 .withKey("foo")
444 .withValue(value1)
445 .build();
Madan Jampani74da78b2016-02-09 21:18:36 -0800446 MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
Madan Jampanicadd70b2016-02-08 13:45:43 -0800447 map.prepare(tx).thenAccept(result -> {
Madan Jampani74da78b2016-02-09 21:18:36 -0800448 assertEquals(true, result);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800449 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800450 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800451
Madan Jampani74da78b2016-02-09 21:18:36 -0800452 map.rollback(tx.transactionId()).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800453 assertFalse(listener.eventReceived());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800454
455 map.get("foo").thenAccept(result -> {
456 assertNull(result);
457 }).join();
458
459 map.put("foo", value2).thenAccept(result -> {
460 assertNull(result);
461 }).join();
Madan Jampani40f022e2016-03-02 21:35:14 -0800462 MapEvent<String, byte[]> event = listener.event();
463 assertNotNull(event);
464 assertEquals(MapEvent.Type.INSERT, event.type());
465 assertTrue(Arrays.equals(value2, event.newValue().value()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800466 }
467
468 private static class TestMapEventListener implements MapEventListener<String, byte[]> {
469
Madan Jampani40f022e2016-03-02 21:35:14 -0800470 private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800471
472 @Override
473 public void event(MapEvent<String, byte[]> event) {
Madan Jampani40f022e2016-03-02 21:35:14 -0800474 try {
475 queue.put(event);
476 } catch (InterruptedException e) {
477 Throwables.propagate(e);
478 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800479 }
480
Madan Jampani40f022e2016-03-02 21:35:14 -0800481 public boolean eventReceived() {
482 return !queue.isEmpty();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800483 }
484
Madan Jampani40f022e2016-03-02 21:35:14 -0800485 public MapEvent<String, byte[]> event() throws InterruptedException {
486 return queue.take();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800487 }
488 }
489}