blob: 3d9dc4e155779f1ffeb858bd79723837354d6290 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jonathan Hart584d2f32015-01-27 19:46:14 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static junit.framework.TestCase.assertFalse;
20import static org.easymock.EasyMock.anyObject;
21import static org.easymock.EasyMock.createMock;
22import static org.easymock.EasyMock.eq;
23import static org.easymock.EasyMock.expect;
24import static org.easymock.EasyMock.expectLastCall;
25import static org.easymock.EasyMock.replay;
26import static org.easymock.EasyMock.reset;
27import static org.easymock.EasyMock.verify;
28import static org.junit.Assert.assertEquals;
29import static org.junit.Assert.assertNull;
30import static org.junit.Assert.assertTrue;
31import static org.junit.Assert.fail;
Jonathan Hart584d2f32015-01-27 19:46:14 -080032
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070033import java.util.ArrayList;
34import java.util.Collection;
35import java.util.HashMap;
36import java.util.HashSet;
37import java.util.List;
38import java.util.Map;
39import java.util.Objects;
40import java.util.Optional;
41import java.util.Set;
42import java.util.concurrent.CompletableFuture;
43import java.util.concurrent.CountDownLatch;
44import java.util.concurrent.Executor;
45import java.util.concurrent.TimeUnit;
46import java.util.concurrent.atomic.AtomicLong;
47import java.util.function.Consumer;
48import java.util.function.Function;
Madan Jampani3e033bd2015-04-08 13:03:49 -070049
Jonathan Hart584d2f32015-01-27 19:46:14 -080050import org.junit.After;
51import org.junit.Before;
52import org.junit.Test;
53import org.onlab.packet.IpAddress;
54import org.onlab.util.KryoNamespace;
55import org.onosproject.cluster.ClusterService;
56import org.onosproject.cluster.ControllerNode;
57import org.onosproject.cluster.DefaultControllerNode;
58import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070059import org.onosproject.event.AbstractEvent;
Madan Jampani2b8e1892015-12-04 11:16:47 -080060import org.onosproject.persistence.PersistenceService;
Madan Jampanif4c88502016-01-21 12:35:36 -080061import org.onosproject.store.LogicalTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080062import org.onosproject.store.Timestamp;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070064import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
Jonathan Hart584d2f32015-01-27 19:46:14 -080065import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani2b8e1892015-12-04 11:16:47 -080066import org.onosproject.store.persistence.TestPersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080067import org.onosproject.store.serializers.KryoNamespaces;
68import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070069import org.onosproject.store.service.EventuallyConsistentMap;
70import org.onosproject.store.service.EventuallyConsistentMapEvent;
71import org.onosproject.store.service.EventuallyConsistentMapListener;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070072import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080073
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070074import com.google.common.collect.ComparisonChain;
75import com.google.common.collect.ImmutableList;
76import com.google.common.collect.ImmutableSet;
77import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080078
Jonathan Hart584d2f32015-01-27 19:46:14 -080079/**
80 * Unit tests for EventuallyConsistentMapImpl.
81 */
82public class EventuallyConsistentMapImplTest {
83
84 private EventuallyConsistentMap<String, String> ecMap;
85
Madan Jampani2b8e1892015-12-04 11:16:47 -080086 private PersistenceService persistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080087 private ClusterService clusterService;
88 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080089 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080090
91 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080092 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080093 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080094 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
95 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
96
97 private static final String KEY1 = "one";
98 private static final String KEY2 = "two";
99 private static final String VALUE1 = "oneValue";
100 private static final String VALUE2 = "twoValue";
101
102 private final ControllerNode self =
103 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
104
Madan Jampani3d76c942015-06-29 23:37:10 -0700105 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700106 private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800107
108 /*
109 * Serialization is a bit tricky here. We need to serialize in the tests
110 * to set the expectations, which will use this serializer here, but the
111 * EventuallyConsistentMap will use its own internal serializer. This means
112 * this serializer must be set up exactly the same as map's internal
113 * serializer.
114 */
115 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
116 @Override
117 protected void setupKryoPool() {
118 serializerPool = KryoNamespace.newBuilder()
119 // Classes we give to the map
120 .register(KryoNamespaces.API)
121 .register(TestTimestamp.class)
122 // Below is the classes that the map internally registers
Madan Jampani3e033bd2015-04-08 13:03:49 -0700123 .register(LogicalTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800124 .register(WallClockTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800125 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800126 .register(AntiEntropyAdvertisement.class)
127 .register(HashMap.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700128 .register(Optional.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800129 .build();
130 }
131 };
132
133 @Before
134 public void setUp() throws Exception {
135 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800136 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
137 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800138 replay(clusterService);
139
140 clusterCommunicator = createMock(ClusterCommunicationService.class);
141
Madan Jampani2b8e1892015-12-04 11:16:47 -0800142 persistenceService = new TestPersistenceService();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800143 // Add expectation for adding cluster message subscribers which
144 // delegate to our ClusterCommunicationService implementation. This
145 // allows us to get a reference to the map's internal cluster message
146 // handlers so we can induce events coming in from a peer.
Madan Jampani29f52a32016-04-18 15:20:52 -0700147 clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
Madan Jampani3d76c942015-06-29 23:37:10 -0700148 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
Madan Jampani29f52a32016-04-18 15:20:52 -0700149 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
150 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
151 anyObject(Function.class),
152 anyObject(Function.class),
153 anyObject(Function.class),
154 anyObject(Executor.class));
155 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800156
157 replay(clusterCommunicator);
158
159 clockService = new SequentialClockService<>();
160
161 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
162 .register(KryoNamespaces.API)
163 .register(TestTimestamp.class);
164
Madan Jampani175e8fd2015-05-20 14:10:45 -0700165 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700166 clusterService, clusterCommunicator, persistenceService)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700167 .withName(MAP_NAME)
168 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700169 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700170 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700171 .withPersistence()
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700172 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800173
174 // Reset ready for tests to add their own expectations
175 reset(clusterCommunicator);
176 }
177
178 @After
179 public void tearDown() {
180 reset(clusterCommunicator);
181 ecMap.destroy();
182 }
183
Ray Milkey8dc82082015-02-20 16:22:38 -0800184 @SuppressWarnings("unchecked")
185 private EventuallyConsistentMapListener<String, String> getListener() {
186 return createMock(EventuallyConsistentMapListener.class);
187 }
188
Jonathan Hart584d2f32015-01-27 19:46:14 -0800189 @Test
190 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800191 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800192
193 assertEquals(0, ecMap.size());
194 ecMap.put(KEY1, VALUE1);
195 assertEquals(1, ecMap.size());
196 ecMap.put(KEY1, VALUE2);
197 assertEquals(1, ecMap.size());
198 ecMap.put(KEY2, VALUE2);
199 assertEquals(2, ecMap.size());
200 for (int i = 0; i < 10; i++) {
201 ecMap.put("" + i, "" + i);
202 }
203 assertEquals(12, ecMap.size());
204 ecMap.remove(KEY1);
205 assertEquals(11, ecMap.size());
206 ecMap.remove(KEY1);
207 assertEquals(11, ecMap.size());
208 }
209
210 @Test
211 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800212 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800213
214 assertTrue(ecMap.isEmpty());
215 ecMap.put(KEY1, VALUE1);
216 assertFalse(ecMap.isEmpty());
217 ecMap.remove(KEY1);
218 assertTrue(ecMap.isEmpty());
219 }
220
221 @Test
222 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800223 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800224
225 assertFalse(ecMap.containsKey(KEY1));
226 ecMap.put(KEY1, VALUE1);
227 assertTrue(ecMap.containsKey(KEY1));
228 assertFalse(ecMap.containsKey(KEY2));
229 ecMap.remove(KEY1);
230 assertFalse(ecMap.containsKey(KEY1));
231 }
232
233 @Test
234 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800235 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800236
237 assertFalse(ecMap.containsValue(VALUE1));
238 ecMap.put(KEY1, VALUE1);
239 assertTrue(ecMap.containsValue(VALUE1));
240 assertFalse(ecMap.containsValue(VALUE2));
241 ecMap.put(KEY1, VALUE2);
242 assertFalse(ecMap.containsValue(VALUE1));
243 assertTrue(ecMap.containsValue(VALUE2));
244 ecMap.remove(KEY1);
245 assertFalse(ecMap.containsValue(VALUE2));
246 }
247
248 @Test
249 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800250 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800251
252 CountDownLatch latch;
253
254 // Local put
255 assertNull(ecMap.get(KEY1));
256 ecMap.put(KEY1, VALUE1);
257 assertEquals(VALUE1, ecMap.get(KEY1));
258
259 // Remote put
Madan Jampani3d76c942015-06-29 23:37:10 -0700260 List<UpdateEntry<String, String>> message
261 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800262
263 // Create a latch so we know when the put operation has finished
264 latch = new CountDownLatch(1);
265 ecMap.addListener(new TestListener(latch));
266
267 assertNull(ecMap.get(KEY2));
Madan Jampani3d76c942015-06-29 23:37:10 -0700268 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800269 assertTrue("External listener never got notified of internal event",
270 latch.await(100, TimeUnit.MILLISECONDS));
271 assertEquals(VALUE2, ecMap.get(KEY2));
272
273 // Local remove
274 ecMap.remove(KEY2);
275 assertNull(ecMap.get(KEY2));
276
277 // Remote remove
Madan Jampani3d76c942015-06-29 23:37:10 -0700278 message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800279
280 // Create a latch so we know when the remove operation has finished
281 latch = new CountDownLatch(1);
282 ecMap.addListener(new TestListener(latch));
283
Madan Jampani3d76c942015-06-29 23:37:10 -0700284 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800285 assertTrue("External listener never got notified of internal event",
286 latch.await(100, TimeUnit.MILLISECONDS));
287 assertNull(ecMap.get(KEY1));
288 }
289
290 @Test
291 public void testPut() throws Exception {
292 // Set up expectations of external events to be sent to listeners during
293 // the test. These don't use timestamps so we can set them all up at once.
294 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800295 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800296 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700297 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800298 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700299 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800300 replay(listener);
301
302 ecMap.addListener(listener);
303
304 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800305 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700306 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800307
308 // Put first value
309 assertNull(ecMap.get(KEY1));
310 ecMap.put(KEY1, VALUE1);
311 assertEquals(VALUE1, ecMap.get(KEY1));
312
313 verify(clusterCommunicator);
314
315 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800316 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700317 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800318
319 // Update same key to a new value
320 ecMap.put(KEY1, VALUE2);
321 assertEquals(VALUE2, ecMap.get(KEY1));
322
323 verify(clusterCommunicator);
324
325 // Do a put with a older timestamp than the value already there.
326 // The map data should not be changed and no notifications should be sent.
327 reset(clusterCommunicator);
328 replay(clusterCommunicator);
329
330 clockService.turnBackTime();
331 ecMap.put(KEY1, VALUE1);
332 // Value should not have changed.
333 assertEquals(VALUE2, ecMap.get(KEY1));
334
335 verify(clusterCommunicator);
336
337 // Check that our listener received the correct events during the test
338 verify(listener);
339 }
340
341 @Test
342 public void testRemove() throws Exception {
343 // Set up expectations of external events to be sent to listeners during
344 // the test. These don't use timestamps so we can set them all up at once.
345 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800346 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800347 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700348 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700349 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700350 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800351 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700352 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800353 replay(listener);
354
355 ecMap.addListener(listener);
356
357 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800358 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800359 ecMap.put(KEY1, VALUE1);
360 assertEquals(VALUE1, ecMap.get(KEY1));
361
362 // Remove the value and check the correct internal cluster messages
363 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800364 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700365 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800366
367 ecMap.remove(KEY1);
368 assertNull(ecMap.get(KEY1));
369
370 verify(clusterCommunicator);
371
372 // Remove the same value again. Even though the value is no longer in
373 // the map, we expect that the tombstone is updated and another remove
374 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800375 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700376 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800377
378 ecMap.remove(KEY1);
379 assertNull(ecMap.get(KEY1));
380
381 verify(clusterCommunicator);
382
383
384 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800385 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800386
387 ecMap.put(KEY2, VALUE2);
388
389 clockService.turnBackTime();
390
391 // Remove should have no effect, since it has an older timestamp than
392 // the put. Expect no notifications to be sent out
393 reset(clusterCommunicator);
394 replay(clusterCommunicator);
395
396 ecMap.remove(KEY2);
397
398 verify(clusterCommunicator);
399
400 // Check that our listener received the correct events during the test
401 verify(listener);
402 }
403
404 @Test
Madan Jampani4727a112015-07-16 12:12:58 -0700405 public void testCompute() throws Exception {
406 // Set up expectations of external events to be sent to listeners during
407 // the test. These don't use timestamps so we can set them all up at once.
408 EventuallyConsistentMapListener<String, String> listener
409 = getListener();
410 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700411 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700412 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700413 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700414 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700415 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Madan Jampani4727a112015-07-16 12:12:58 -0700416 replay(listener);
417
418 ecMap.addListener(listener);
419
420 // Put in an initial value
421 expectPeerMessage(clusterCommunicator);
422 ecMap.compute(KEY1, (k, v) -> VALUE1);
423 assertEquals(VALUE1, ecMap.get(KEY1));
424
425 // Remove the value and check the correct internal cluster messages
426 // are sent
427 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
428 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
429
430 ecMap.compute(KEY1, (k, v) -> null);
431 assertNull(ecMap.get(KEY1));
432
433 verify(clusterCommunicator);
434
435 // Remove the same value again. Even though the value is no longer in
436 // the map, we expect that the tombstone is updated and another remove
437 // event is sent to the cluster and external listeners.
438 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
439 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
440
441 ecMap.compute(KEY1, (k, v) -> null);
442 assertNull(ecMap.get(KEY1));
443
444 verify(clusterCommunicator);
445
446 // Put in a new value for us to try and remove
447 expectPeerMessage(clusterCommunicator);
448
449 ecMap.compute(KEY2, (k, v) -> VALUE2);
450
451 clockService.turnBackTime();
452
453 // Remove should have no effect, since it has an older timestamp than
454 // the put. Expect no notifications to be sent out
455 reset(clusterCommunicator);
456 replay(clusterCommunicator);
457
458 ecMap.compute(KEY2, (k, v) -> null);
459
460 verify(clusterCommunicator);
461
462 // Check that our listener received the correct events during the test
463 verify(listener);
464 }
465
466 @Test
Jonathan Hart584d2f32015-01-27 19:46:14 -0800467 public void testPutAll() throws Exception {
468 // putAll() with an empty map is a no-op - no messages will be sent
469 reset(clusterCommunicator);
470 replay(clusterCommunicator);
471
472 ecMap.putAll(new HashMap<>());
473
474 verify(clusterCommunicator);
475
476 // Set up the listener with our expected events
477 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800478 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800479 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700480 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800481 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700482 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800483 replay(listener);
484
485 ecMap.addListener(listener);
486
487 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700488 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800489 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800490
491 Map<String, String> putAllValues = new HashMap<>();
492 putAllValues.put(KEY1, VALUE1);
493 putAllValues.put(KEY2, VALUE2);
494
495 // Put the values in the map
496 ecMap.putAll(putAllValues);
497
498 // Check the correct messages and events were sent
499 verify(clusterCommunicator);
500 verify(listener);
501 }
502
503 @Test
504 public void testClear() throws Exception {
505 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800506 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800507 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700508 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800509 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700510 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800511 replay(listener);
512
513 // clear() on an empty map is a no-op - no messages will be sent
514 reset(clusterCommunicator);
515 replay(clusterCommunicator);
516
517 assertTrue(ecMap.isEmpty());
518 ecMap.clear();
519 verify(clusterCommunicator);
520
521 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800522 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800523 ecMap.put(KEY1, VALUE1);
524 ecMap.put(KEY2, VALUE2);
525
526 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700527 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800528
529 ecMap.clear();
530
531 verify(clusterCommunicator);
532 verify(listener);
533 }
534
535 @Test
536 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800537 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800538
539 assertTrue(ecMap.keySet().isEmpty());
540
541 // Generate some keys
542 Set<String> keys = new HashSet<>();
543 for (int i = 1; i <= 10; i++) {
544 keys.add("" + i);
545 }
546
547 // Put each key in the map
548 keys.forEach(k -> ecMap.put(k, "value" + k));
549
550 // Check keySet() returns the correct value
551 assertEquals(keys, ecMap.keySet());
552
553 // Update the value for one of the keys
554 ecMap.put(keys.iterator().next(), "new-value");
555
556 // Check the key set is still the same
557 assertEquals(keys, ecMap.keySet());
558
559 // Remove a key
560 String removeKey = keys.iterator().next();
561 keys.remove(removeKey);
562 ecMap.remove(removeKey);
563
564 // Check the key set is still correct
565 assertEquals(keys, ecMap.keySet());
566 }
567
568 @Test
569 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800570 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800571
572 assertTrue(ecMap.values().isEmpty());
573
574 // Generate some values
575 Map<String, String> expectedValues = new HashMap<>();
576 for (int i = 1; i <= 10; i++) {
577 expectedValues.put("" + i, "value" + i);
578 }
579
580 // Add them into the map
581 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
582
583 // Check the values collection is correct
584 assertEquals(expectedValues.values().size(), ecMap.values().size());
585 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
586
587 // Update the value for one of the keys
588 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
589 expectedValues.put(first.getKey(), "new-value");
590 ecMap.put(first.getKey(), "new-value");
591
592 // Check the values collection is still correct
593 assertEquals(expectedValues.values().size(), ecMap.values().size());
594 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
595
596 // Remove a key
597 String removeKey = expectedValues.keySet().iterator().next();
598 expectedValues.remove(removeKey);
599 ecMap.remove(removeKey);
600
601 // Check the values collection is still correct
602 assertEquals(expectedValues.values().size(), ecMap.values().size());
603 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
604 }
605
606 @Test
607 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800608 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800609
610 assertTrue(ecMap.entrySet().isEmpty());
611
612 // Generate some values
613 Map<String, String> expectedValues = new HashMap<>();
614 for (int i = 1; i <= 10; i++) {
615 expectedValues.put("" + i, "value" + i);
616 }
617
618 // Add them into the map
619 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
620
621 // Check the entry set is correct
622 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
623
624 // Update the value for one of the keys
625 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
626 expectedValues.put(first.getKey(), "new-value");
627 ecMap.put(first.getKey(), "new-value");
628
629 // Check the entry set is still correct
630 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
631
632 // Remove a key
633 String removeKey = expectedValues.keySet().iterator().next();
634 expectedValues.remove(removeKey);
635 ecMap.remove(removeKey);
636
637 // Check the entry set is still correct
638 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
639 }
640
641 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
642 if (expectedMap.entrySet().size() != actual.size()) {
643 return false;
644 }
645
646 for (Map.Entry<String, String> e : actual) {
647 if (!expectedMap.containsKey(e.getKey())) {
648 return false;
649 }
650 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
651 return false;
652 }
653 }
654 return true;
655 }
656
657 @Test
658 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800659 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800660 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
661
662 replay(clusterCommunicator);
663
664 ecMap.destroy();
665
666 verify(clusterCommunicator);
667
668 try {
669 ecMap.get(KEY1);
670 fail("get after destroy should throw exception");
671 } catch (IllegalStateException e) {
672 assertTrue(true);
673 }
674
675 try {
676 ecMap.put(KEY1, VALUE1);
677 fail("put after destroy should throw exception");
678 } catch (IllegalStateException e) {
679 assertTrue(true);
680 }
681 }
682
Madan Jampani3d76c942015-06-29 23:37:10 -0700683 private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
684 return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800685 }
686
Madan Jampani3d76c942015-06-29 23:37:10 -0700687 private List<UpdateEntry<String, String>> generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700688 String key1, String value1, String key2, String value2) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700689 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800690
691 Timestamp timestamp1 = clockService.peek(1);
692 Timestamp timestamp2 = clockService.peek(2);
693
Madan Jampani3d76c942015-06-29 23:37:10 -0700694 list.add(generatePutMessage(key1, value1, timestamp1));
695 list.add(generatePutMessage(key2, value2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800696
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700697 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800698 }
699
Madan Jampani3d76c942015-06-29 23:37:10 -0700700 private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
701 return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800702 }
703
Madan Jampani3d76c942015-06-29 23:37:10 -0700704 private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
705 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800706
707 Timestamp timestamp1 = clockService.peek(1);
708 Timestamp timestamp2 = clockService.peek(2);
709
Madan Jampani3d76c942015-06-29 23:37:10 -0700710 list.add(generateRemoveMessage(key1, timestamp1));
711 list.add(generateRemoveMessage(key2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800712
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700713 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800714 }
715
716 /**
717 * Sets up a mock ClusterCommunicationService to expect a specific cluster
718 * message to be broadcast to the cluster.
719 *
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700720 * @param message message we expect to be sent
Jonathan Hart584d2f32015-01-27 19:46:14 -0800721 * @param clusterCommunicator a mock ClusterCommunicationService to set up
722 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800723 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700724 private static <T> void expectSpecificBroadcastMessage(
725 T message,
726 MessageSubject subject,
727 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800728 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700729 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
730 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800731 replay(clusterCommunicator);
732 }
733
734 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800735 * Sets up a mock ClusterCommunicationService to expect a specific cluster
736 * message to be multicast to the cluster.
737 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700738 * @param message message we expect to be sent
739 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800740 * @param clusterCommunicator a mock ClusterCommunicationService to set up
741 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800742 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700743 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800744 ClusterCommunicationService clusterCommunicator) {
745 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700746 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
747 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800748 replay(clusterCommunicator);
749 }
750
751
752 /**
753 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800754 * that is sent to it. This is useful for unit tests where we aren't
755 * interested in testing the messaging component.
756 *
757 * @param clusterCommunicator a mock ClusterCommunicationService to set up
758 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800759 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700760 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800761 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800762// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
763// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700764 expect(clusterCommunicator.<T>unicast(
765 anyObject(),
766 anyObject(MessageSubject.class),
767 anyObject(Function.class),
768 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700769 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800770 .anyTimes();
771 replay(clusterCommunicator);
772 }
773
774 /**
775 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
776 * that is sent to it. This is useful for unit tests where we aren't
777 * interested in testing the messaging component.
778 *
779 * @param clusterCommunicator a mock ClusterCommunicationService to set up
780 */
781 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800782 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700783 clusterCommunicator.<AbstractEvent>multicast(
784 anyObject(AbstractEvent.class),
785 anyObject(MessageSubject.class),
786 anyObject(Function.class),
787 anyObject(Set.class));
788 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800789 replay(clusterCommunicator);
790 }
791
792 /**
793 * ClusterCommunicationService implementation that the map's addSubscriber
794 * call will delegate to. This means we can get a reference to the
795 * internal cluster message handler used by the map, so that we can simulate
796 * events coming in from other instances.
797 */
798 private final class TestClusterCommunicationService
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700799 extends ClusterCommunicationServiceAdapter {
Madan Jampani27b69c62015-05-15 15:49:02 -0700800
801 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700802 public <M> void addSubscriber(MessageSubject subject,
803 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700804 Executor executor) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700805 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
806 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700807 } else {
808 throw new RuntimeException("Unexpected message subject " + subject.toString());
809 }
810 }
811
812 @Override
813 public <M, R> void addSubscriber(MessageSubject subject,
814 Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
815 if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
816 antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
Madan Jampani3d76c942015-06-29 23:37:10 -0700817 } else {
818 throw new RuntimeException("Unexpected message subject " + subject.toString());
819 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700820 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800821 }
822
823 /**
824 * ClockService implementation that gives out timestamps based on a
825 * sequential counter. This clock service enables more control over the
826 * timestamps that are given out, including being able to "turn back time"
827 * to give out timestamps from the past.
828 *
829 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800830 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800831 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700832 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800833
834 private static final long INITIAL_VALUE = 1;
835 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
836
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800837 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800838 return new TestTimestamp(counter.getAndIncrement());
839 }
840
841 /**
842 * Returns what the next timestamp will be without consuming the
843 * timestamp. This allows test code to set expectations correctly while
844 * still allowing the CUT to get the same timestamp.
845 *
846 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800847 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800848 */
849 public Timestamp peekAtNextTimestamp() {
850 return peek(1);
851 }
852
853 /**
854 * Returns the ith timestamp to be given out in the future without
855 * consuming the timestamp. For example, i=1 returns the next timestamp,
856 * i=2 returns the timestamp after that, and so on.
857 *
858 * @param i number of the timestamp to peek at
859 * @return the ith timestamp that will be given out
860 */
861 public Timestamp peek(int i) {
862 checkArgument(i > 0, "i must be a positive integer");
863
864 return new TestTimestamp(counter.get() + i - 1);
865 }
866
867 /**
868 * Turns the clock back two ticks, so the next call to getTimestamp will
869 * return an older timestamp than the previous call to getTimestamp.
870 */
871 public void turnBackTime() {
872 // Not atomic, but should be OK for these tests.
873 counter.decrementAndGet();
874 counter.decrementAndGet();
875 }
876
877 }
878
879 /**
880 * Timestamp implementation where the value of the timestamp can be
881 * specified explicitly at creation time.
882 */
883 private class TestTimestamp implements Timestamp {
884
885 private final long timestamp;
886
887 /**
888 * Creates a new timestamp that has the specified value.
889 *
890 * @param timestamp value of the timestamp
891 */
892 public TestTimestamp(long timestamp) {
893 this.timestamp = timestamp;
894 }
895
896 @Override
897 public int compareTo(Timestamp o) {
898 checkArgument(o instanceof TestTimestamp);
899 TestTimestamp otherTimestamp = (TestTimestamp) o;
900 return ComparisonChain.start()
901 .compare(this.timestamp, otherTimestamp.timestamp)
902 .result();
903 }
904 }
905
906 /**
907 * EventuallyConsistentMapListener implementation which triggers a latch
908 * when it receives an event.
909 */
910 private class TestListener implements EventuallyConsistentMapListener<String, String> {
911 private CountDownLatch latch;
912
913 /**
914 * Creates a new listener that will trigger the specified latch when it
915 * receives and event.
916 *
917 * @param latch the latch to trigger on events
918 */
919 public TestListener(CountDownLatch latch) {
920 this.latch = latch;
921 }
922
923 @Override
924 public void event(EventuallyConsistentMapEvent<String, String> event) {
925 latch.countDown();
926 }
927 }
928}