blob: b788050ba9f07b23005346ef45c86988a2d23875 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
Thomas Vachuskab6d31672018-07-27 17:03:46 -07002 * Copyright 2018-present Open Networking Foundation
Jonathan Hart584d2f32015-01-27 19:46:14 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuskab6d31672018-07-27 17:03:46 -070016package org.onosproject.store.atomix.primitives.impl;
Madan Jampanif4c88502016-01-21 12:35:36 -080017
18import static com.google.common.base.Preconditions.checkArgument;
19import static junit.framework.TestCase.assertFalse;
20import static org.easymock.EasyMock.anyObject;
21import static org.easymock.EasyMock.createMock;
22import static org.easymock.EasyMock.eq;
23import static org.easymock.EasyMock.expect;
24import static org.easymock.EasyMock.expectLastCall;
25import static org.easymock.EasyMock.replay;
26import static org.easymock.EasyMock.reset;
27import static org.easymock.EasyMock.verify;
28import static org.junit.Assert.assertEquals;
29import static org.junit.Assert.assertNull;
30import static org.junit.Assert.assertTrue;
31import static org.junit.Assert.fail;
Jonathan Hart584d2f32015-01-27 19:46:14 -080032
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070033import java.util.ArrayList;
34import java.util.Collection;
35import java.util.HashMap;
36import java.util.HashSet;
37import java.util.List;
38import java.util.Map;
39import java.util.Objects;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070040import java.util.Set;
41import java.util.concurrent.CompletableFuture;
42import java.util.concurrent.CountDownLatch;
43import java.util.concurrent.Executor;
44import java.util.concurrent.TimeUnit;
45import java.util.concurrent.atomic.AtomicLong;
46import java.util.function.Consumer;
47import java.util.function.Function;
slowr878625f2017-10-24 14:53:49 -070048import java.util.function.Supplier;
Madan Jampani3e033bd2015-04-08 13:03:49 -070049
slowr878625f2017-10-24 14:53:49 -070050import com.google.common.collect.ComparisonChain;
51import com.google.common.collect.ImmutableList;
Jonathan Hart584d2f32015-01-27 19:46:14 -080052import org.junit.After;
53import org.junit.Before;
54import org.junit.Test;
55import org.onlab.packet.IpAddress;
56import org.onlab.util.KryoNamespace;
Jonathan Hart584d2f32015-01-27 19:46:14 -080057import org.onosproject.cluster.ControllerNode;
58import org.onosproject.cluster.DefaultControllerNode;
59import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070060import org.onosproject.event.AbstractEvent;
Madan Jampani2b8e1892015-12-04 11:16:47 -080061import org.onosproject.persistence.PersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080062import org.onosproject.store.Timestamp;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070064import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
Jonathan Hart584d2f32015-01-27 19:46:14 -080065import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani2b8e1892015-12-04 11:16:47 -080066import org.onosproject.store.persistence.TestPersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080067import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070068import org.onosproject.store.service.EventuallyConsistentMap;
69import org.onosproject.store.service.EventuallyConsistentMapEvent;
70import org.onosproject.store.service.EventuallyConsistentMapListener;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070071import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080072
Jonathan Hart584d2f32015-01-27 19:46:14 -080073/**
74 * Unit tests for EventuallyConsistentMapImpl.
75 */
76public class EventuallyConsistentMapImplTest {
77
78 private EventuallyConsistentMap<String, String> ecMap;
79
Madan Jampani2b8e1892015-12-04 11:16:47 -080080 private PersistenceService persistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080081 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080082 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080083
84 private static final String MAP_NAME = "test";
Jordan Halterman12d5ec42017-05-17 00:53:44 -070085 private static final MessageSubject BOOTSTRAP_MESSAGE_SUBJECT
86 = new MessageSubject("ecm-" + MAP_NAME + "-bootstrap");
87 private static final MessageSubject INITIALIZE_MESSAGE_SUBJECT
88 = new MessageSubject("ecm-" + MAP_NAME + "-initialize");
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080089 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080090 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080091 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
92 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
Jon Halld198b882016-05-18 16:44:40 -070093 private static final MessageSubject UPDATE_REQUEST_SUBJECT
94 = new MessageSubject("ecm-" + MAP_NAME + "-update-request");
Jonathan Hart584d2f32015-01-27 19:46:14 -080095
96 private static final String KEY1 = "one";
97 private static final String KEY2 = "two";
98 private static final String VALUE1 = "oneValue";
99 private static final String VALUE2 = "twoValue";
100
101 private final ControllerNode self =
102 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
103
Madan Jampani3d76c942015-06-29 23:37:10 -0700104 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
Jon Halld198b882016-05-18 16:44:40 -0700105 private Consumer<Collection<UpdateRequest<String>>> requestHandler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700106 private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
slowr878625f2017-10-24 14:53:49 -0700107 private Supplier<List<NodeId>> peersHandler = ArrayList::new;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800108
Jonathan Hart584d2f32015-01-27 19:46:14 -0800109 @Before
110 public void setUp() throws Exception {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800111 clusterCommunicator = createMock(ClusterCommunicationService.class);
112
Madan Jampani2b8e1892015-12-04 11:16:47 -0800113 persistenceService = new TestPersistenceService();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800114 // Add expectation for adding cluster message subscribers which
115 // delegate to our ClusterCommunicationService implementation. This
116 // allows us to get a reference to the map's internal cluster message
117 // handlers so we can induce events coming in from a peer.
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700118 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
119 anyObject(Function.class),
120 anyObject(Function.class),
121 anyObject(Function.class));
122 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
123 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
124 anyObject(Function.class),
125 anyObject(Function.class),
126 anyObject(Function.class),
127 anyObject(Executor.class));
128 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Madan Jampani29f52a32016-04-18 15:20:52 -0700129 clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
Madan Jampani3d76c942015-06-29 23:37:10 -0700130 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
Madan Jampani29f52a32016-04-18 15:20:52 -0700131 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
132 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
133 anyObject(Function.class),
134 anyObject(Function.class),
135 anyObject(Function.class),
136 anyObject(Executor.class));
137 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Jon Halld198b882016-05-18 16:44:40 -0700138 clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
139 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
140 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800141
142 replay(clusterCommunicator);
143
144 clockService = new SequentialClockService<>();
145
146 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
147 .register(KryoNamespaces.API)
148 .register(TestTimestamp.class);
149
Madan Jampani175e8fd2015-05-20 14:10:45 -0700150 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
slowr878625f2017-10-24 14:53:49 -0700151 NodeId.nodeId("0"),
152 clusterCommunicator,
153 persistenceService,
154 peersHandler,
155 peersHandler
156 )
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700157 .withName(MAP_NAME)
158 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700159 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700160 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700161 .withPersistence()
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700162 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800163
164 // Reset ready for tests to add their own expectations
165 reset(clusterCommunicator);
166 }
167
168 @After
169 public void tearDown() {
170 reset(clusterCommunicator);
171 ecMap.destroy();
172 }
173
Ray Milkey8dc82082015-02-20 16:22:38 -0800174 @SuppressWarnings("unchecked")
175 private EventuallyConsistentMapListener<String, String> getListener() {
176 return createMock(EventuallyConsistentMapListener.class);
177 }
178
Jonathan Hart584d2f32015-01-27 19:46:14 -0800179 @Test
180 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800181 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800182
183 assertEquals(0, ecMap.size());
184 ecMap.put(KEY1, VALUE1);
185 assertEquals(1, ecMap.size());
186 ecMap.put(KEY1, VALUE2);
187 assertEquals(1, ecMap.size());
188 ecMap.put(KEY2, VALUE2);
189 assertEquals(2, ecMap.size());
190 for (int i = 0; i < 10; i++) {
191 ecMap.put("" + i, "" + i);
192 }
193 assertEquals(12, ecMap.size());
194 ecMap.remove(KEY1);
195 assertEquals(11, ecMap.size());
196 ecMap.remove(KEY1);
197 assertEquals(11, ecMap.size());
198 }
199
200 @Test
201 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800202 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800203
204 assertTrue(ecMap.isEmpty());
205 ecMap.put(KEY1, VALUE1);
206 assertFalse(ecMap.isEmpty());
207 ecMap.remove(KEY1);
208 assertTrue(ecMap.isEmpty());
209 }
210
211 @Test
212 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800213 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800214
215 assertFalse(ecMap.containsKey(KEY1));
216 ecMap.put(KEY1, VALUE1);
217 assertTrue(ecMap.containsKey(KEY1));
218 assertFalse(ecMap.containsKey(KEY2));
219 ecMap.remove(KEY1);
220 assertFalse(ecMap.containsKey(KEY1));
221 }
222
223 @Test
224 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800225 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800226
227 assertFalse(ecMap.containsValue(VALUE1));
228 ecMap.put(KEY1, VALUE1);
229 assertTrue(ecMap.containsValue(VALUE1));
230 assertFalse(ecMap.containsValue(VALUE2));
231 ecMap.put(KEY1, VALUE2);
232 assertFalse(ecMap.containsValue(VALUE1));
233 assertTrue(ecMap.containsValue(VALUE2));
234 ecMap.remove(KEY1);
235 assertFalse(ecMap.containsValue(VALUE2));
236 }
237
238 @Test
239 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800240 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800241
242 CountDownLatch latch;
243
244 // Local put
245 assertNull(ecMap.get(KEY1));
246 ecMap.put(KEY1, VALUE1);
247 assertEquals(VALUE1, ecMap.get(KEY1));
248
249 // Remote put
Madan Jampani3d76c942015-06-29 23:37:10 -0700250 List<UpdateEntry<String, String>> message
251 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800252
253 // Create a latch so we know when the put operation has finished
254 latch = new CountDownLatch(1);
255 ecMap.addListener(new TestListener(latch));
256
257 assertNull(ecMap.get(KEY2));
Madan Jampani3d76c942015-06-29 23:37:10 -0700258 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800259 assertTrue("External listener never got notified of internal event",
260 latch.await(100, TimeUnit.MILLISECONDS));
261 assertEquals(VALUE2, ecMap.get(KEY2));
262
263 // Local remove
264 ecMap.remove(KEY2);
265 assertNull(ecMap.get(KEY2));
266
267 // Remote remove
Madan Jampani3d76c942015-06-29 23:37:10 -0700268 message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800269
270 // Create a latch so we know when the remove operation has finished
271 latch = new CountDownLatch(1);
272 ecMap.addListener(new TestListener(latch));
273
Madan Jampani3d76c942015-06-29 23:37:10 -0700274 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800275 assertTrue("External listener never got notified of internal event",
276 latch.await(100, TimeUnit.MILLISECONDS));
277 assertNull(ecMap.get(KEY1));
278 }
279
280 @Test
281 public void testPut() throws Exception {
282 // Set up expectations of external events to be sent to listeners during
283 // the test. These don't use timestamps so we can set them all up at once.
284 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800285 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800286 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700287 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800288 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700289 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800290 replay(listener);
291
292 ecMap.addListener(listener);
293
294 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800295 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700296 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800297
298 // Put first value
299 assertNull(ecMap.get(KEY1));
300 ecMap.put(KEY1, VALUE1);
301 assertEquals(VALUE1, ecMap.get(KEY1));
302
303 verify(clusterCommunicator);
304
305 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800306 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700307 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800308
309 // Update same key to a new value
310 ecMap.put(KEY1, VALUE2);
311 assertEquals(VALUE2, ecMap.get(KEY1));
312
313 verify(clusterCommunicator);
314
315 // Do a put with a older timestamp than the value already there.
316 // The map data should not be changed and no notifications should be sent.
317 reset(clusterCommunicator);
318 replay(clusterCommunicator);
319
320 clockService.turnBackTime();
321 ecMap.put(KEY1, VALUE1);
322 // Value should not have changed.
323 assertEquals(VALUE2, ecMap.get(KEY1));
324
325 verify(clusterCommunicator);
326
327 // Check that our listener received the correct events during the test
328 verify(listener);
329 }
330
331 @Test
332 public void testRemove() throws Exception {
333 // Set up expectations of external events to be sent to listeners during
334 // the test. These don't use timestamps so we can set them all up at once.
335 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800336 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800337 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700338 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700339 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700340 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800341 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700342 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800343 replay(listener);
344
345 ecMap.addListener(listener);
346
347 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800348 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800349 ecMap.put(KEY1, VALUE1);
350 assertEquals(VALUE1, ecMap.get(KEY1));
351
352 // Remove the value and check the correct internal cluster messages
353 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800354 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700355 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800356
357 ecMap.remove(KEY1);
358 assertNull(ecMap.get(KEY1));
359
360 verify(clusterCommunicator);
361
362 // Remove the same value again. Even though the value is no longer in
363 // the map, we expect that the tombstone is updated and another remove
364 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800365 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700366 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800367
368 ecMap.remove(KEY1);
369 assertNull(ecMap.get(KEY1));
370
371 verify(clusterCommunicator);
372
373
374 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800375 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800376
377 ecMap.put(KEY2, VALUE2);
378
379 clockService.turnBackTime();
380
381 // Remove should have no effect, since it has an older timestamp than
382 // the put. Expect no notifications to be sent out
383 reset(clusterCommunicator);
384 replay(clusterCommunicator);
385
386 ecMap.remove(KEY2);
387
388 verify(clusterCommunicator);
389
390 // Check that our listener received the correct events during the test
391 verify(listener);
392 }
393
394 @Test
Madan Jampani4727a112015-07-16 12:12:58 -0700395 public void testCompute() throws Exception {
396 // Set up expectations of external events to be sent to listeners during
397 // the test. These don't use timestamps so we can set them all up at once.
398 EventuallyConsistentMapListener<String, String> listener
399 = getListener();
400 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700401 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700402 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700403 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700404 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700405 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Madan Jampani4727a112015-07-16 12:12:58 -0700406 replay(listener);
407
408 ecMap.addListener(listener);
409
410 // Put in an initial value
411 expectPeerMessage(clusterCommunicator);
412 ecMap.compute(KEY1, (k, v) -> VALUE1);
413 assertEquals(VALUE1, ecMap.get(KEY1));
414
415 // Remove the value and check the correct internal cluster messages
416 // are sent
417 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
418 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
419
420 ecMap.compute(KEY1, (k, v) -> null);
421 assertNull(ecMap.get(KEY1));
422
423 verify(clusterCommunicator);
424
425 // Remove the same value again. Even though the value is no longer in
426 // the map, we expect that the tombstone is updated and another remove
427 // event is sent to the cluster and external listeners.
428 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
429 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
430
431 ecMap.compute(KEY1, (k, v) -> null);
432 assertNull(ecMap.get(KEY1));
433
434 verify(clusterCommunicator);
435
436 // Put in a new value for us to try and remove
437 expectPeerMessage(clusterCommunicator);
438
439 ecMap.compute(KEY2, (k, v) -> VALUE2);
440
441 clockService.turnBackTime();
442
443 // Remove should have no effect, since it has an older timestamp than
444 // the put. Expect no notifications to be sent out
445 reset(clusterCommunicator);
446 replay(clusterCommunicator);
447
448 ecMap.compute(KEY2, (k, v) -> null);
449
450 verify(clusterCommunicator);
451
452 // Check that our listener received the correct events during the test
453 verify(listener);
454 }
455
456 @Test
Jonathan Hart584d2f32015-01-27 19:46:14 -0800457 public void testPutAll() throws Exception {
458 // putAll() with an empty map is a no-op - no messages will be sent
459 reset(clusterCommunicator);
460 replay(clusterCommunicator);
461
462 ecMap.putAll(new HashMap<>());
463
464 verify(clusterCommunicator);
465
466 // Set up the listener with our expected events
467 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800468 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800469 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700470 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800471 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700472 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800473 replay(listener);
474
475 ecMap.addListener(listener);
476
477 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700478 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800479 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800480
481 Map<String, String> putAllValues = new HashMap<>();
482 putAllValues.put(KEY1, VALUE1);
483 putAllValues.put(KEY2, VALUE2);
484
485 // Put the values in the map
486 ecMap.putAll(putAllValues);
487
488 // Check the correct messages and events were sent
489 verify(clusterCommunicator);
490 verify(listener);
491 }
492
493 @Test
494 public void testClear() throws Exception {
495 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800496 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800497 listener.event(new EventuallyConsistentMapEvent<>(
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700498 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
499 listener.event(new EventuallyConsistentMapEvent<>(
500 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
501 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700502 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800503 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700504 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800505 replay(listener);
506
507 // clear() on an empty map is a no-op - no messages will be sent
508 reset(clusterCommunicator);
509 replay(clusterCommunicator);
510
511 assertTrue(ecMap.isEmpty());
512 ecMap.clear();
513 verify(clusterCommunicator);
514
515 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800516 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800517 ecMap.put(KEY1, VALUE1);
518 ecMap.put(KEY2, VALUE2);
519
520 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700521 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800522
523 ecMap.clear();
524
525 verify(clusterCommunicator);
526 verify(listener);
527 }
528
529 @Test
530 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800531 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800532
533 assertTrue(ecMap.keySet().isEmpty());
534
535 // Generate some keys
536 Set<String> keys = new HashSet<>();
537 for (int i = 1; i <= 10; i++) {
538 keys.add("" + i);
539 }
540
541 // Put each key in the map
542 keys.forEach(k -> ecMap.put(k, "value" + k));
543
544 // Check keySet() returns the correct value
545 assertEquals(keys, ecMap.keySet());
546
547 // Update the value for one of the keys
548 ecMap.put(keys.iterator().next(), "new-value");
549
550 // Check the key set is still the same
551 assertEquals(keys, ecMap.keySet());
552
553 // Remove a key
554 String removeKey = keys.iterator().next();
555 keys.remove(removeKey);
556 ecMap.remove(removeKey);
557
558 // Check the key set is still correct
559 assertEquals(keys, ecMap.keySet());
560 }
561
562 @Test
563 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800564 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800565
566 assertTrue(ecMap.values().isEmpty());
567
568 // Generate some values
569 Map<String, String> expectedValues = new HashMap<>();
570 for (int i = 1; i <= 10; i++) {
571 expectedValues.put("" + i, "value" + i);
572 }
573
574 // Add them into the map
575 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
576
577 // Check the values collection is correct
578 assertEquals(expectedValues.values().size(), ecMap.values().size());
579 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
580
581 // Update the value for one of the keys
582 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
583 expectedValues.put(first.getKey(), "new-value");
584 ecMap.put(first.getKey(), "new-value");
585
586 // Check the values collection is still correct
587 assertEquals(expectedValues.values().size(), ecMap.values().size());
588 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
589
590 // Remove a key
591 String removeKey = expectedValues.keySet().iterator().next();
592 expectedValues.remove(removeKey);
593 ecMap.remove(removeKey);
594
595 // Check the values collection is still correct
596 assertEquals(expectedValues.values().size(), ecMap.values().size());
597 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
598 }
599
600 @Test
601 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800602 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800603
604 assertTrue(ecMap.entrySet().isEmpty());
605
606 // Generate some values
607 Map<String, String> expectedValues = new HashMap<>();
608 for (int i = 1; i <= 10; i++) {
609 expectedValues.put("" + i, "value" + i);
610 }
611
612 // Add them into the map
613 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
614
615 // Check the entry set is correct
616 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
617
618 // Update the value for one of the keys
619 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
620 expectedValues.put(first.getKey(), "new-value");
621 ecMap.put(first.getKey(), "new-value");
622
623 // Check the entry set is still correct
624 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
625
626 // Remove a key
627 String removeKey = expectedValues.keySet().iterator().next();
628 expectedValues.remove(removeKey);
629 ecMap.remove(removeKey);
630
631 // Check the entry set is still correct
632 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
633 }
634
635 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
636 if (expectedMap.entrySet().size() != actual.size()) {
637 return false;
638 }
639
640 for (Map.Entry<String, String> e : actual) {
641 if (!expectedMap.containsKey(e.getKey())) {
642 return false;
643 }
644 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
645 return false;
646 }
647 }
648 return true;
649 }
650
651 @Test
652 public void testDestroy() throws Exception {
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700653 clusterCommunicator.removeSubscriber(BOOTSTRAP_MESSAGE_SUBJECT);
654 clusterCommunicator.removeSubscriber(INITIALIZE_MESSAGE_SUBJECT);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800655 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jon Halld198b882016-05-18 16:44:40 -0700656 clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800657 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
658
659 replay(clusterCommunicator);
660
661 ecMap.destroy();
662
663 verify(clusterCommunicator);
664
665 try {
666 ecMap.get(KEY1);
667 fail("get after destroy should throw exception");
668 } catch (IllegalStateException e) {
669 assertTrue(true);
670 }
671
672 try {
673 ecMap.put(KEY1, VALUE1);
674 fail("put after destroy should throw exception");
675 } catch (IllegalStateException e) {
676 assertTrue(true);
677 }
678 }
679
Madan Jampani3d76c942015-06-29 23:37:10 -0700680 private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
681 return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800682 }
683
Madan Jampani3d76c942015-06-29 23:37:10 -0700684 private List<UpdateEntry<String, String>> generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700685 String key1, String value1, String key2, String value2) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700686 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800687
688 Timestamp timestamp1 = clockService.peek(1);
689 Timestamp timestamp2 = clockService.peek(2);
690
Madan Jampani3d76c942015-06-29 23:37:10 -0700691 list.add(generatePutMessage(key1, value1, timestamp1));
692 list.add(generatePutMessage(key2, value2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800693
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700694 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800695 }
696
Madan Jampani3d76c942015-06-29 23:37:10 -0700697 private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
698 return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800699 }
700
Madan Jampani3d76c942015-06-29 23:37:10 -0700701 private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
702 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800703
704 Timestamp timestamp1 = clockService.peek(1);
705 Timestamp timestamp2 = clockService.peek(2);
706
Madan Jampani3d76c942015-06-29 23:37:10 -0700707 list.add(generateRemoveMessage(key1, timestamp1));
708 list.add(generateRemoveMessage(key2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800709
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700710 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800711 }
712
713 /**
714 * Sets up a mock ClusterCommunicationService to expect a specific cluster
715 * message to be broadcast to the cluster.
716 *
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700717 * @param message message we expect to be sent
Jonathan Hart584d2f32015-01-27 19:46:14 -0800718 * @param clusterCommunicator a mock ClusterCommunicationService to set up
719 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800720 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700721 private static <T> void expectSpecificBroadcastMessage(
722 T message,
723 MessageSubject subject,
724 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800725 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700726 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
727 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800728 replay(clusterCommunicator);
729 }
730
731 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800732 * Sets up a mock ClusterCommunicationService to expect a specific cluster
733 * message to be multicast to the cluster.
734 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700735 * @param message message we expect to be sent
736 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800737 * @param clusterCommunicator a mock ClusterCommunicationService to set up
738 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800739 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700740 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800741 ClusterCommunicationService clusterCommunicator) {
742 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700743 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
744 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800745 replay(clusterCommunicator);
746 }
747
748
749 /**
750 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800751 * that is sent to it. This is useful for unit tests where we aren't
752 * interested in testing the messaging component.
753 *
754 * @param clusterCommunicator a mock ClusterCommunicationService to set up
755 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800756 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700757 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800758 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800759// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
760// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700761 expect(clusterCommunicator.<T>unicast(
762 anyObject(),
763 anyObject(MessageSubject.class),
764 anyObject(Function.class),
765 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700766 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800767 .anyTimes();
768 replay(clusterCommunicator);
769 }
770
771 /**
772 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
773 * that is sent to it. This is useful for unit tests where we aren't
774 * interested in testing the messaging component.
775 *
776 * @param clusterCommunicator a mock ClusterCommunicationService to set up
777 */
778 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800779 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700780 clusterCommunicator.<AbstractEvent>multicast(
781 anyObject(AbstractEvent.class),
782 anyObject(MessageSubject.class),
783 anyObject(Function.class),
784 anyObject(Set.class));
785 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800786 replay(clusterCommunicator);
787 }
788
789 /**
790 * ClusterCommunicationService implementation that the map's addSubscriber
791 * call will delegate to. This means we can get a reference to the
792 * internal cluster message handler used by the map, so that we can simulate
793 * events coming in from other instances.
794 */
795 private final class TestClusterCommunicationService
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700796 extends ClusterCommunicationServiceAdapter {
Madan Jampani27b69c62015-05-15 15:49:02 -0700797
798 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700799 public <M> void addSubscriber(MessageSubject subject,
800 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700801 Executor executor) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700802 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
803 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
Jon Halld198b882016-05-18 16:44:40 -0700804 } else if (subject.equals(UPDATE_REQUEST_SUBJECT)) {
805 requestHandler = (Consumer<Collection<UpdateRequest<String>>>) handler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700806 } else {
Ray Milkeydbd38212018-07-02 09:18:09 -0700807 throw new IllegalStateException("Unexpected message subject " + subject.toString());
Madan Jampani29f52a32016-04-18 15:20:52 -0700808 }
809 }
810
811 @Override
812 public <M, R> void addSubscriber(MessageSubject subject,
813 Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
814 if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
815 antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
Jordan Halterman12d5ec42017-05-17 00:53:44 -0700816 } else if (!subject.equals(INITIALIZE_MESSAGE_SUBJECT)) {
Ray Milkeydbd38212018-07-02 09:18:09 -0700817 throw new IllegalStateException("Unexpected message subject " + subject.toString());
Madan Jampani3d76c942015-06-29 23:37:10 -0700818 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700819 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800820 }
821
822 /**
823 * ClockService implementation that gives out timestamps based on a
824 * sequential counter. This clock service enables more control over the
825 * timestamps that are given out, including being able to "turn back time"
826 * to give out timestamps from the past.
827 *
828 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800829 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800830 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700831 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800832
833 private static final long INITIAL_VALUE = 1;
834 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
835
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800836 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800837 return new TestTimestamp(counter.getAndIncrement());
838 }
839
840 /**
841 * Returns what the next timestamp will be without consuming the
842 * timestamp. This allows test code to set expectations correctly while
843 * still allowing the CUT to get the same timestamp.
844 *
845 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800846 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800847 */
848 public Timestamp peekAtNextTimestamp() {
849 return peek(1);
850 }
851
852 /**
853 * Returns the ith timestamp to be given out in the future without
854 * consuming the timestamp. For example, i=1 returns the next timestamp,
855 * i=2 returns the timestamp after that, and so on.
856 *
857 * @param i number of the timestamp to peek at
858 * @return the ith timestamp that will be given out
859 */
860 public Timestamp peek(int i) {
861 checkArgument(i > 0, "i must be a positive integer");
862
863 return new TestTimestamp(counter.get() + i - 1);
864 }
865
866 /**
867 * Turns the clock back two ticks, so the next call to getTimestamp will
868 * return an older timestamp than the previous call to getTimestamp.
869 */
870 public void turnBackTime() {
871 // Not atomic, but should be OK for these tests.
872 counter.decrementAndGet();
873 counter.decrementAndGet();
874 }
875
876 }
877
878 /**
879 * Timestamp implementation where the value of the timestamp can be
880 * specified explicitly at creation time.
881 */
882 private class TestTimestamp implements Timestamp {
883
884 private final long timestamp;
885
886 /**
887 * Creates a new timestamp that has the specified value.
888 *
889 * @param timestamp value of the timestamp
890 */
891 public TestTimestamp(long timestamp) {
892 this.timestamp = timestamp;
893 }
894
895 @Override
896 public int compareTo(Timestamp o) {
897 checkArgument(o instanceof TestTimestamp);
898 TestTimestamp otherTimestamp = (TestTimestamp) o;
899 return ComparisonChain.start()
900 .compare(this.timestamp, otherTimestamp.timestamp)
901 .result();
902 }
903 }
904
905 /**
906 * EventuallyConsistentMapListener implementation which triggers a latch
907 * when it receives an event.
908 */
909 private class TestListener implements EventuallyConsistentMapListener<String, String> {
910 private CountDownLatch latch;
911
912 /**
913 * Creates a new listener that will trigger the specified latch when it
914 * receives and event.
915 *
916 * @param latch the latch to trigger on events
917 */
918 public TestListener(CountDownLatch latch) {
919 this.latch = latch;
920 }
921
922 @Override
923 public void event(EventuallyConsistentMapEvent<String, String> event) {
924 latch.countDown();
925 }
926 }
927}