blob: 41cf18c4e199bc3fe4a5d03aeaa80aa737ef454f [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jonathan Hart584d2f32015-01-27 19:46:14 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static junit.framework.TestCase.assertFalse;
20import static org.easymock.EasyMock.anyObject;
21import static org.easymock.EasyMock.createMock;
22import static org.easymock.EasyMock.eq;
23import static org.easymock.EasyMock.expect;
24import static org.easymock.EasyMock.expectLastCall;
25import static org.easymock.EasyMock.replay;
26import static org.easymock.EasyMock.reset;
27import static org.easymock.EasyMock.verify;
28import static org.junit.Assert.assertEquals;
29import static org.junit.Assert.assertNull;
30import static org.junit.Assert.assertTrue;
31import static org.junit.Assert.fail;
Jonathan Hart584d2f32015-01-27 19:46:14 -080032
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070033import java.util.ArrayList;
34import java.util.Collection;
35import java.util.HashMap;
36import java.util.HashSet;
37import java.util.List;
38import java.util.Map;
39import java.util.Objects;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070040import java.util.Set;
41import java.util.concurrent.CompletableFuture;
42import java.util.concurrent.CountDownLatch;
43import java.util.concurrent.Executor;
44import java.util.concurrent.TimeUnit;
45import java.util.concurrent.atomic.AtomicLong;
46import java.util.function.Consumer;
47import java.util.function.Function;
Madan Jampani3e033bd2015-04-08 13:03:49 -070048
Jonathan Hart584d2f32015-01-27 19:46:14 -080049import org.junit.After;
50import org.junit.Before;
51import org.junit.Test;
52import org.onlab.packet.IpAddress;
53import org.onlab.util.KryoNamespace;
54import org.onosproject.cluster.ClusterService;
55import org.onosproject.cluster.ControllerNode;
56import org.onosproject.cluster.DefaultControllerNode;
57import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070058import org.onosproject.event.AbstractEvent;
Madan Jampani2b8e1892015-12-04 11:16:47 -080059import org.onosproject.persistence.PersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080060import org.onosproject.store.Timestamp;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070062import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
Jonathan Hart584d2f32015-01-27 19:46:14 -080063import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani2b8e1892015-12-04 11:16:47 -080064import org.onosproject.store.persistence.TestPersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080065import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapEvent;
68import org.onosproject.store.service.EventuallyConsistentMapListener;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070069import com.google.common.collect.ComparisonChain;
70import com.google.common.collect.ImmutableList;
71import com.google.common.collect.ImmutableSet;
72import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080073
Jonathan Hart584d2f32015-01-27 19:46:14 -080074/**
75 * Unit tests for EventuallyConsistentMapImpl.
76 */
77public class EventuallyConsistentMapImplTest {
78
79 private EventuallyConsistentMap<String, String> ecMap;
80
Madan Jampani2b8e1892015-12-04 11:16:47 -080081 private PersistenceService persistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080082 private ClusterService clusterService;
83 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080084 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080085
86 private static final String MAP_NAME = "test";
Jordan Halterman12d5ec42017-05-17 00:53:44 -070087 private static final MessageSubject BOOTSTRAP_MESSAGE_SUBJECT
88 = new MessageSubject("ecm-" + MAP_NAME + "-bootstrap");
89 private static final MessageSubject INITIALIZE_MESSAGE_SUBJECT
90 = new MessageSubject("ecm-" + MAP_NAME + "-initialize");
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080091 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080092 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080093 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
94 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
Jon Halld198b882016-05-18 16:44:40 -070095 private static final MessageSubject UPDATE_REQUEST_SUBJECT
96 = new MessageSubject("ecm-" + MAP_NAME + "-update-request");
Jonathan Hart584d2f32015-01-27 19:46:14 -080097
98 private static final String KEY1 = "one";
99 private static final String KEY2 = "two";
100 private static final String VALUE1 = "oneValue";
101 private static final String VALUE2 = "twoValue";
102
103 private final ControllerNode self =
104 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
105
Madan Jampani3d76c942015-06-29 23:37:10 -0700106 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
Jon Halld198b882016-05-18 16:44:40 -0700107 private Consumer<Collection<UpdateRequest<String>>> requestHandler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700108 private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800109
Jonathan Hart584d2f32015-01-27 19:46:14 -0800110 @Before
111 public void setUp() throws Exception {
112 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800113 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
114 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800115 replay(clusterService);
116
117 clusterCommunicator = createMock(ClusterCommunicationService.class);
118
Madan Jampani2b8e1892015-12-04 11:16:47 -0800119 persistenceService = new TestPersistenceService();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800120 // Add expectation for adding cluster message subscribers which
121 // delegate to our ClusterCommunicationService implementation. This
122 // allows us to get a reference to the map's internal cluster message
123 // handlers so we can induce events coming in from a peer.
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700124 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
125 anyObject(Function.class),
126 anyObject(Function.class),
127 anyObject(Function.class));
128 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
129 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
130 anyObject(Function.class),
131 anyObject(Function.class),
132 anyObject(Function.class),
133 anyObject(Executor.class));
134 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Madan Jampani29f52a32016-04-18 15:20:52 -0700135 clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
Madan Jampani3d76c942015-06-29 23:37:10 -0700136 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
Madan Jampani29f52a32016-04-18 15:20:52 -0700137 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
138 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
139 anyObject(Function.class),
140 anyObject(Function.class),
141 anyObject(Function.class),
142 anyObject(Executor.class));
143 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Jon Halld198b882016-05-18 16:44:40 -0700144 clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
145 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
146 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800147
148 replay(clusterCommunicator);
149
150 clockService = new SequentialClockService<>();
151
152 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
153 .register(KryoNamespaces.API)
154 .register(TestTimestamp.class);
155
Madan Jampani175e8fd2015-05-20 14:10:45 -0700156 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700157 clusterService, clusterCommunicator, persistenceService)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700158 .withName(MAP_NAME)
159 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700160 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700161 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700162 .withPersistence()
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700163 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800164
165 // Reset ready for tests to add their own expectations
166 reset(clusterCommunicator);
167 }
168
169 @After
170 public void tearDown() {
171 reset(clusterCommunicator);
172 ecMap.destroy();
173 }
174
Ray Milkey8dc82082015-02-20 16:22:38 -0800175 @SuppressWarnings("unchecked")
176 private EventuallyConsistentMapListener<String, String> getListener() {
177 return createMock(EventuallyConsistentMapListener.class);
178 }
179
Jonathan Hart584d2f32015-01-27 19:46:14 -0800180 @Test
181 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800182 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800183
184 assertEquals(0, ecMap.size());
185 ecMap.put(KEY1, VALUE1);
186 assertEquals(1, ecMap.size());
187 ecMap.put(KEY1, VALUE2);
188 assertEquals(1, ecMap.size());
189 ecMap.put(KEY2, VALUE2);
190 assertEquals(2, ecMap.size());
191 for (int i = 0; i < 10; i++) {
192 ecMap.put("" + i, "" + i);
193 }
194 assertEquals(12, ecMap.size());
195 ecMap.remove(KEY1);
196 assertEquals(11, ecMap.size());
197 ecMap.remove(KEY1);
198 assertEquals(11, ecMap.size());
199 }
200
201 @Test
202 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800203 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800204
205 assertTrue(ecMap.isEmpty());
206 ecMap.put(KEY1, VALUE1);
207 assertFalse(ecMap.isEmpty());
208 ecMap.remove(KEY1);
209 assertTrue(ecMap.isEmpty());
210 }
211
212 @Test
213 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800214 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800215
216 assertFalse(ecMap.containsKey(KEY1));
217 ecMap.put(KEY1, VALUE1);
218 assertTrue(ecMap.containsKey(KEY1));
219 assertFalse(ecMap.containsKey(KEY2));
220 ecMap.remove(KEY1);
221 assertFalse(ecMap.containsKey(KEY1));
222 }
223
224 @Test
225 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800226 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800227
228 assertFalse(ecMap.containsValue(VALUE1));
229 ecMap.put(KEY1, VALUE1);
230 assertTrue(ecMap.containsValue(VALUE1));
231 assertFalse(ecMap.containsValue(VALUE2));
232 ecMap.put(KEY1, VALUE2);
233 assertFalse(ecMap.containsValue(VALUE1));
234 assertTrue(ecMap.containsValue(VALUE2));
235 ecMap.remove(KEY1);
236 assertFalse(ecMap.containsValue(VALUE2));
237 }
238
239 @Test
240 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800241 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800242
243 CountDownLatch latch;
244
245 // Local put
246 assertNull(ecMap.get(KEY1));
247 ecMap.put(KEY1, VALUE1);
248 assertEquals(VALUE1, ecMap.get(KEY1));
249
250 // Remote put
Madan Jampani3d76c942015-06-29 23:37:10 -0700251 List<UpdateEntry<String, String>> message
252 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800253
254 // Create a latch so we know when the put operation has finished
255 latch = new CountDownLatch(1);
256 ecMap.addListener(new TestListener(latch));
257
258 assertNull(ecMap.get(KEY2));
Madan Jampani3d76c942015-06-29 23:37:10 -0700259 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800260 assertTrue("External listener never got notified of internal event",
261 latch.await(100, TimeUnit.MILLISECONDS));
262 assertEquals(VALUE2, ecMap.get(KEY2));
263
264 // Local remove
265 ecMap.remove(KEY2);
266 assertNull(ecMap.get(KEY2));
267
268 // Remote remove
Madan Jampani3d76c942015-06-29 23:37:10 -0700269 message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800270
271 // Create a latch so we know when the remove operation has finished
272 latch = new CountDownLatch(1);
273 ecMap.addListener(new TestListener(latch));
274
Madan Jampani3d76c942015-06-29 23:37:10 -0700275 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800276 assertTrue("External listener never got notified of internal event",
277 latch.await(100, TimeUnit.MILLISECONDS));
278 assertNull(ecMap.get(KEY1));
279 }
280
281 @Test
282 public void testPut() throws Exception {
283 // Set up expectations of external events to be sent to listeners during
284 // the test. These don't use timestamps so we can set them all up at once.
285 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800286 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800287 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700288 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800289 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700290 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800291 replay(listener);
292
293 ecMap.addListener(listener);
294
295 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800296 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700297 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800298
299 // Put first value
300 assertNull(ecMap.get(KEY1));
301 ecMap.put(KEY1, VALUE1);
302 assertEquals(VALUE1, ecMap.get(KEY1));
303
304 verify(clusterCommunicator);
305
306 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800307 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700308 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800309
310 // Update same key to a new value
311 ecMap.put(KEY1, VALUE2);
312 assertEquals(VALUE2, ecMap.get(KEY1));
313
314 verify(clusterCommunicator);
315
316 // Do a put with a older timestamp than the value already there.
317 // The map data should not be changed and no notifications should be sent.
318 reset(clusterCommunicator);
319 replay(clusterCommunicator);
320
321 clockService.turnBackTime();
322 ecMap.put(KEY1, VALUE1);
323 // Value should not have changed.
324 assertEquals(VALUE2, ecMap.get(KEY1));
325
326 verify(clusterCommunicator);
327
328 // Check that our listener received the correct events during the test
329 verify(listener);
330 }
331
332 @Test
333 public void testRemove() throws Exception {
334 // Set up expectations of external events to be sent to listeners during
335 // the test. These don't use timestamps so we can set them all up at once.
336 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800337 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800338 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700339 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700340 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700341 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800342 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700343 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800344 replay(listener);
345
346 ecMap.addListener(listener);
347
348 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800349 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800350 ecMap.put(KEY1, VALUE1);
351 assertEquals(VALUE1, ecMap.get(KEY1));
352
353 // Remove the value and check the correct internal cluster messages
354 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800355 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700356 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800357
358 ecMap.remove(KEY1);
359 assertNull(ecMap.get(KEY1));
360
361 verify(clusterCommunicator);
362
363 // Remove the same value again. Even though the value is no longer in
364 // the map, we expect that the tombstone is updated and another remove
365 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800366 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700367 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800368
369 ecMap.remove(KEY1);
370 assertNull(ecMap.get(KEY1));
371
372 verify(clusterCommunicator);
373
374
375 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800376 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800377
378 ecMap.put(KEY2, VALUE2);
379
380 clockService.turnBackTime();
381
382 // Remove should have no effect, since it has an older timestamp than
383 // the put. Expect no notifications to be sent out
384 reset(clusterCommunicator);
385 replay(clusterCommunicator);
386
387 ecMap.remove(KEY2);
388
389 verify(clusterCommunicator);
390
391 // Check that our listener received the correct events during the test
392 verify(listener);
393 }
394
395 @Test
Madan Jampani4727a112015-07-16 12:12:58 -0700396 public void testCompute() throws Exception {
397 // Set up expectations of external events to be sent to listeners during
398 // the test. These don't use timestamps so we can set them all up at once.
399 EventuallyConsistentMapListener<String, String> listener
400 = getListener();
401 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700402 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700403 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700404 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700405 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700406 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Madan Jampani4727a112015-07-16 12:12:58 -0700407 replay(listener);
408
409 ecMap.addListener(listener);
410
411 // Put in an initial value
412 expectPeerMessage(clusterCommunicator);
413 ecMap.compute(KEY1, (k, v) -> VALUE1);
414 assertEquals(VALUE1, ecMap.get(KEY1));
415
416 // Remove the value and check the correct internal cluster messages
417 // are sent
418 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
419 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
420
421 ecMap.compute(KEY1, (k, v) -> null);
422 assertNull(ecMap.get(KEY1));
423
424 verify(clusterCommunicator);
425
426 // Remove the same value again. Even though the value is no longer in
427 // the map, we expect that the tombstone is updated and another remove
428 // event is sent to the cluster and external listeners.
429 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
430 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
431
432 ecMap.compute(KEY1, (k, v) -> null);
433 assertNull(ecMap.get(KEY1));
434
435 verify(clusterCommunicator);
436
437 // Put in a new value for us to try and remove
438 expectPeerMessage(clusterCommunicator);
439
440 ecMap.compute(KEY2, (k, v) -> VALUE2);
441
442 clockService.turnBackTime();
443
444 // Remove should have no effect, since it has an older timestamp than
445 // the put. Expect no notifications to be sent out
446 reset(clusterCommunicator);
447 replay(clusterCommunicator);
448
449 ecMap.compute(KEY2, (k, v) -> null);
450
451 verify(clusterCommunicator);
452
453 // Check that our listener received the correct events during the test
454 verify(listener);
455 }
456
457 @Test
Jonathan Hart584d2f32015-01-27 19:46:14 -0800458 public void testPutAll() throws Exception {
459 // putAll() with an empty map is a no-op - no messages will be sent
460 reset(clusterCommunicator);
461 replay(clusterCommunicator);
462
463 ecMap.putAll(new HashMap<>());
464
465 verify(clusterCommunicator);
466
467 // Set up the listener with our expected events
468 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800469 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800470 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700471 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800472 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700473 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800474 replay(listener);
475
476 ecMap.addListener(listener);
477
478 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700479 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800480 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800481
482 Map<String, String> putAllValues = new HashMap<>();
483 putAllValues.put(KEY1, VALUE1);
484 putAllValues.put(KEY2, VALUE2);
485
486 // Put the values in the map
487 ecMap.putAll(putAllValues);
488
489 // Check the correct messages and events were sent
490 verify(clusterCommunicator);
491 verify(listener);
492 }
493
494 @Test
495 public void testClear() throws Exception {
496 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800497 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800498 listener.event(new EventuallyConsistentMapEvent<>(
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700499 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
500 listener.event(new EventuallyConsistentMapEvent<>(
501 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
502 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700503 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800504 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700505 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800506 replay(listener);
507
508 // clear() on an empty map is a no-op - no messages will be sent
509 reset(clusterCommunicator);
510 replay(clusterCommunicator);
511
512 assertTrue(ecMap.isEmpty());
513 ecMap.clear();
514 verify(clusterCommunicator);
515
516 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800517 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800518 ecMap.put(KEY1, VALUE1);
519 ecMap.put(KEY2, VALUE2);
520
521 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700522 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800523
524 ecMap.clear();
525
526 verify(clusterCommunicator);
527 verify(listener);
528 }
529
530 @Test
531 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800532 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800533
534 assertTrue(ecMap.keySet().isEmpty());
535
536 // Generate some keys
537 Set<String> keys = new HashSet<>();
538 for (int i = 1; i <= 10; i++) {
539 keys.add("" + i);
540 }
541
542 // Put each key in the map
543 keys.forEach(k -> ecMap.put(k, "value" + k));
544
545 // Check keySet() returns the correct value
546 assertEquals(keys, ecMap.keySet());
547
548 // Update the value for one of the keys
549 ecMap.put(keys.iterator().next(), "new-value");
550
551 // Check the key set is still the same
552 assertEquals(keys, ecMap.keySet());
553
554 // Remove a key
555 String removeKey = keys.iterator().next();
556 keys.remove(removeKey);
557 ecMap.remove(removeKey);
558
559 // Check the key set is still correct
560 assertEquals(keys, ecMap.keySet());
561 }
562
563 @Test
564 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800565 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800566
567 assertTrue(ecMap.values().isEmpty());
568
569 // Generate some values
570 Map<String, String> expectedValues = new HashMap<>();
571 for (int i = 1; i <= 10; i++) {
572 expectedValues.put("" + i, "value" + i);
573 }
574
575 // Add them into the map
576 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
577
578 // Check the values collection is correct
579 assertEquals(expectedValues.values().size(), ecMap.values().size());
580 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
581
582 // Update the value for one of the keys
583 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
584 expectedValues.put(first.getKey(), "new-value");
585 ecMap.put(first.getKey(), "new-value");
586
587 // Check the values collection is still correct
588 assertEquals(expectedValues.values().size(), ecMap.values().size());
589 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
590
591 // Remove a key
592 String removeKey = expectedValues.keySet().iterator().next();
593 expectedValues.remove(removeKey);
594 ecMap.remove(removeKey);
595
596 // Check the values collection is still correct
597 assertEquals(expectedValues.values().size(), ecMap.values().size());
598 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
599 }
600
601 @Test
602 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800603 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800604
605 assertTrue(ecMap.entrySet().isEmpty());
606
607 // Generate some values
608 Map<String, String> expectedValues = new HashMap<>();
609 for (int i = 1; i <= 10; i++) {
610 expectedValues.put("" + i, "value" + i);
611 }
612
613 // Add them into the map
614 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
615
616 // Check the entry set is correct
617 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
618
619 // Update the value for one of the keys
620 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
621 expectedValues.put(first.getKey(), "new-value");
622 ecMap.put(first.getKey(), "new-value");
623
624 // Check the entry set is still correct
625 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
626
627 // Remove a key
628 String removeKey = expectedValues.keySet().iterator().next();
629 expectedValues.remove(removeKey);
630 ecMap.remove(removeKey);
631
632 // Check the entry set is still correct
633 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
634 }
635
636 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
637 if (expectedMap.entrySet().size() != actual.size()) {
638 return false;
639 }
640
641 for (Map.Entry<String, String> e : actual) {
642 if (!expectedMap.containsKey(e.getKey())) {
643 return false;
644 }
645 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
646 return false;
647 }
648 }
649 return true;
650 }
651
652 @Test
653 public void testDestroy() throws Exception {
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700654 clusterCommunicator.removeSubscriber(BOOTSTRAP_MESSAGE_SUBJECT);
655 clusterCommunicator.removeSubscriber(INITIALIZE_MESSAGE_SUBJECT);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800656 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jon Halld198b882016-05-18 16:44:40 -0700657 clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800658 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
659
660 replay(clusterCommunicator);
661
662 ecMap.destroy();
663
664 verify(clusterCommunicator);
665
666 try {
667 ecMap.get(KEY1);
668 fail("get after destroy should throw exception");
669 } catch (IllegalStateException e) {
670 assertTrue(true);
671 }
672
673 try {
674 ecMap.put(KEY1, VALUE1);
675 fail("put after destroy should throw exception");
676 } catch (IllegalStateException e) {
677 assertTrue(true);
678 }
679 }
680
Madan Jampani3d76c942015-06-29 23:37:10 -0700681 private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
682 return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800683 }
684
Madan Jampani3d76c942015-06-29 23:37:10 -0700685 private List<UpdateEntry<String, String>> generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700686 String key1, String value1, String key2, String value2) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700687 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800688
689 Timestamp timestamp1 = clockService.peek(1);
690 Timestamp timestamp2 = clockService.peek(2);
691
Madan Jampani3d76c942015-06-29 23:37:10 -0700692 list.add(generatePutMessage(key1, value1, timestamp1));
693 list.add(generatePutMessage(key2, value2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800694
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700695 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800696 }
697
Madan Jampani3d76c942015-06-29 23:37:10 -0700698 private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
699 return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800700 }
701
Madan Jampani3d76c942015-06-29 23:37:10 -0700702 private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
703 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800704
705 Timestamp timestamp1 = clockService.peek(1);
706 Timestamp timestamp2 = clockService.peek(2);
707
Madan Jampani3d76c942015-06-29 23:37:10 -0700708 list.add(generateRemoveMessage(key1, timestamp1));
709 list.add(generateRemoveMessage(key2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800710
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700711 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800712 }
713
714 /**
715 * Sets up a mock ClusterCommunicationService to expect a specific cluster
716 * message to be broadcast to the cluster.
717 *
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700718 * @param message message we expect to be sent
Jonathan Hart584d2f32015-01-27 19:46:14 -0800719 * @param clusterCommunicator a mock ClusterCommunicationService to set up
720 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800721 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700722 private static <T> void expectSpecificBroadcastMessage(
723 T message,
724 MessageSubject subject,
725 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800726 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700727 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
728 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800729 replay(clusterCommunicator);
730 }
731
732 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800733 * Sets up a mock ClusterCommunicationService to expect a specific cluster
734 * message to be multicast to the cluster.
735 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700736 * @param message message we expect to be sent
737 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800738 * @param clusterCommunicator a mock ClusterCommunicationService to set up
739 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800740 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700741 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800742 ClusterCommunicationService clusterCommunicator) {
743 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700744 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
745 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800746 replay(clusterCommunicator);
747 }
748
749
750 /**
751 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800752 * that is sent to it. This is useful for unit tests where we aren't
753 * interested in testing the messaging component.
754 *
755 * @param clusterCommunicator a mock ClusterCommunicationService to set up
756 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800757 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700758 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800759 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800760// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
761// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700762 expect(clusterCommunicator.<T>unicast(
763 anyObject(),
764 anyObject(MessageSubject.class),
765 anyObject(Function.class),
766 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700767 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800768 .anyTimes();
769 replay(clusterCommunicator);
770 }
771
772 /**
773 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
774 * that is sent to it. This is useful for unit tests where we aren't
775 * interested in testing the messaging component.
776 *
777 * @param clusterCommunicator a mock ClusterCommunicationService to set up
778 */
779 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800780 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700781 clusterCommunicator.<AbstractEvent>multicast(
782 anyObject(AbstractEvent.class),
783 anyObject(MessageSubject.class),
784 anyObject(Function.class),
785 anyObject(Set.class));
786 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800787 replay(clusterCommunicator);
788 }
789
790 /**
791 * ClusterCommunicationService implementation that the map's addSubscriber
792 * call will delegate to. This means we can get a reference to the
793 * internal cluster message handler used by the map, so that we can simulate
794 * events coming in from other instances.
795 */
796 private final class TestClusterCommunicationService
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700797 extends ClusterCommunicationServiceAdapter {
Madan Jampani27b69c62015-05-15 15:49:02 -0700798
799 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700800 public <M> void addSubscriber(MessageSubject subject,
801 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700802 Executor executor) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700803 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
804 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
Jon Halld198b882016-05-18 16:44:40 -0700805 } else if (subject.equals(UPDATE_REQUEST_SUBJECT)) {
806 requestHandler = (Consumer<Collection<UpdateRequest<String>>>) handler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700807 } else {
808 throw new RuntimeException("Unexpected message subject " + subject.toString());
809 }
810 }
811
812 @Override
813 public <M, R> void addSubscriber(MessageSubject subject,
814 Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
815 if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
816 antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700817 } else if (!subject.equals(INITIALIZE_MESSAGE_SUBJECT)) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700818 throw new RuntimeException("Unexpected message subject " + subject.toString());
819 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700820 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800821 }
822
823 /**
824 * ClockService implementation that gives out timestamps based on a
825 * sequential counter. This clock service enables more control over the
826 * timestamps that are given out, including being able to "turn back time"
827 * to give out timestamps from the past.
828 *
829 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800830 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800831 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700832 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800833
834 private static final long INITIAL_VALUE = 1;
835 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
836
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800837 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800838 return new TestTimestamp(counter.getAndIncrement());
839 }
840
841 /**
842 * Returns what the next timestamp will be without consuming the
843 * timestamp. This allows test code to set expectations correctly while
844 * still allowing the CUT to get the same timestamp.
845 *
846 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800847 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800848 */
849 public Timestamp peekAtNextTimestamp() {
850 return peek(1);
851 }
852
853 /**
854 * Returns the ith timestamp to be given out in the future without
855 * consuming the timestamp. For example, i=1 returns the next timestamp,
856 * i=2 returns the timestamp after that, and so on.
857 *
858 * @param i number of the timestamp to peek at
859 * @return the ith timestamp that will be given out
860 */
861 public Timestamp peek(int i) {
862 checkArgument(i > 0, "i must be a positive integer");
863
864 return new TestTimestamp(counter.get() + i - 1);
865 }
866
867 /**
868 * Turns the clock back two ticks, so the next call to getTimestamp will
869 * return an older timestamp than the previous call to getTimestamp.
870 */
871 public void turnBackTime() {
872 // Not atomic, but should be OK for these tests.
873 counter.decrementAndGet();
874 counter.decrementAndGet();
875 }
876
877 }
878
879 /**
880 * Timestamp implementation where the value of the timestamp can be
881 * specified explicitly at creation time.
882 */
883 private class TestTimestamp implements Timestamp {
884
885 private final long timestamp;
886
887 /**
888 * Creates a new timestamp that has the specified value.
889 *
890 * @param timestamp value of the timestamp
891 */
892 public TestTimestamp(long timestamp) {
893 this.timestamp = timestamp;
894 }
895
896 @Override
897 public int compareTo(Timestamp o) {
898 checkArgument(o instanceof TestTimestamp);
899 TestTimestamp otherTimestamp = (TestTimestamp) o;
900 return ComparisonChain.start()
901 .compare(this.timestamp, otherTimestamp.timestamp)
902 .result();
903 }
904 }
905
906 /**
907 * EventuallyConsistentMapListener implementation which triggers a latch
908 * when it receives an event.
909 */
910 private class TestListener implements EventuallyConsistentMapListener<String, String> {
911 private CountDownLatch latch;
912
913 /**
914 * Creates a new listener that will trigger the specified latch when it
915 * receives and event.
916 *
917 * @param latch the latch to trigger on events
918 */
919 public TestListener(CountDownLatch latch) {
920 this.latch = latch;
921 }
922
923 @Override
924 public void event(EventuallyConsistentMapEvent<String, String> event) {
925 latch.countDown();
926 }
927 }
928}