blob: ccf6ee714a32f69360baffae443a8ca7be7d987e [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070018import java.util.ArrayList;
19import java.util.Collection;
20import java.util.HashMap;
21import java.util.HashSet;
22import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Optional;
26import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.CountDownLatch;
29import java.util.concurrent.Executor;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.atomic.AtomicLong;
32import java.util.function.Consumer;
33import java.util.function.Function;
Madan Jampani3e033bd2015-04-08 13:03:49 -070034
Jonathan Hart584d2f32015-01-27 19:46:14 -080035import org.junit.After;
36import org.junit.Before;
37import org.junit.Test;
38import org.onlab.packet.IpAddress;
39import org.onlab.util.KryoNamespace;
40import org.onosproject.cluster.ClusterService;
41import org.onosproject.cluster.ControllerNode;
42import org.onosproject.cluster.DefaultControllerNode;
43import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070044import org.onosproject.event.AbstractEvent;
Jonathan Hart584d2f32015-01-27 19:46:14 -080045import org.onosproject.store.Timestamp;
46import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070047import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
Jonathan Hart584d2f32015-01-27 19:46:14 -080048import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070049import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080050import org.onosproject.store.serializers.KryoNamespaces;
51import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070052import org.onosproject.store.service.EventuallyConsistentMap;
53import org.onosproject.store.service.EventuallyConsistentMapEvent;
54import org.onosproject.store.service.EventuallyConsistentMapListener;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070055import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080056
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070057import com.google.common.collect.ComparisonChain;
58import com.google.common.collect.ImmutableList;
59import com.google.common.collect.ImmutableSet;
60import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080061
62import static com.google.common.base.Preconditions.checkArgument;
63import static junit.framework.TestCase.assertFalse;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070064import static org.easymock.EasyMock.anyObject;
65import static org.easymock.EasyMock.createMock;
66import static org.easymock.EasyMock.eq;
67import static org.easymock.EasyMock.expect;
68import static org.easymock.EasyMock.expectLastCall;
69import static org.easymock.EasyMock.replay;
70import static org.easymock.EasyMock.reset;
71import static org.easymock.EasyMock.verify;
72import static org.junit.Assert.assertEquals;
73import static org.junit.Assert.assertNull;
74import static org.junit.Assert.assertTrue;
75import static org.junit.Assert.fail;
Jonathan Hart584d2f32015-01-27 19:46:14 -080076
77/**
78 * Unit tests for EventuallyConsistentMapImpl.
79 */
80public class EventuallyConsistentMapImplTest {
81
82 private EventuallyConsistentMap<String, String> ecMap;
83
84 private ClusterService clusterService;
85 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080086 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080087
88 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080089 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080090 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080091 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
92 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
93
94 private static final String KEY1 = "one";
95 private static final String KEY2 = "two";
96 private static final String VALUE1 = "oneValue";
97 private static final String VALUE2 = "twoValue";
98
99 private final ControllerNode self =
100 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
101
Madan Jampani3d76c942015-06-29 23:37:10 -0700102 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
103 private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800104
105 /*
106 * Serialization is a bit tricky here. We need to serialize in the tests
107 * to set the expectations, which will use this serializer here, but the
108 * EventuallyConsistentMap will use its own internal serializer. This means
109 * this serializer must be set up exactly the same as map's internal
110 * serializer.
111 */
112 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
113 @Override
114 protected void setupKryoPool() {
115 serializerPool = KryoNamespace.newBuilder()
116 // Classes we give to the map
117 .register(KryoNamespaces.API)
118 .register(TestTimestamp.class)
119 // Below is the classes that the map internally registers
Madan Jampani3e033bd2015-04-08 13:03:49 -0700120 .register(LogicalTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800121 .register(WallClockTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800122 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800123 .register(AntiEntropyAdvertisement.class)
124 .register(HashMap.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700125 .register(Optional.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800126 .build();
127 }
128 };
129
130 @Before
131 public void setUp() throws Exception {
132 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800133 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
134 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800135 replay(clusterService);
136
137 clusterCommunicator = createMock(ClusterCommunicationService.class);
138
139 // Add expectation for adding cluster message subscribers which
140 // delegate to our ClusterCommunicationService implementation. This
141 // allows us to get a reference to the map's internal cluster message
142 // handlers so we can induce events coming in from a peer.
Madan Jampani3d76c942015-06-29 23:37:10 -0700143 clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
144 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
145 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800146
147 replay(clusterCommunicator);
148
149 clockService = new SequentialClockService<>();
150
151 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
152 .register(KryoNamespaces.API)
153 .register(TestTimestamp.class);
154
Madan Jampani175e8fd2015-05-20 14:10:45 -0700155 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700156 clusterService, clusterCommunicator)
157 .withName(MAP_NAME)
158 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700159 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700160 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
161 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800162
163 // Reset ready for tests to add their own expectations
164 reset(clusterCommunicator);
165 }
166
167 @After
168 public void tearDown() {
169 reset(clusterCommunicator);
170 ecMap.destroy();
171 }
172
Ray Milkey8dc82082015-02-20 16:22:38 -0800173 @SuppressWarnings("unchecked")
174 private EventuallyConsistentMapListener<String, String> getListener() {
175 return createMock(EventuallyConsistentMapListener.class);
176 }
177
Jonathan Hart584d2f32015-01-27 19:46:14 -0800178 @Test
179 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800180 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800181
182 assertEquals(0, ecMap.size());
183 ecMap.put(KEY1, VALUE1);
184 assertEquals(1, ecMap.size());
185 ecMap.put(KEY1, VALUE2);
186 assertEquals(1, ecMap.size());
187 ecMap.put(KEY2, VALUE2);
188 assertEquals(2, ecMap.size());
189 for (int i = 0; i < 10; i++) {
190 ecMap.put("" + i, "" + i);
191 }
192 assertEquals(12, ecMap.size());
193 ecMap.remove(KEY1);
194 assertEquals(11, ecMap.size());
195 ecMap.remove(KEY1);
196 assertEquals(11, ecMap.size());
197 }
198
199 @Test
200 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800201 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800202
203 assertTrue(ecMap.isEmpty());
204 ecMap.put(KEY1, VALUE1);
205 assertFalse(ecMap.isEmpty());
206 ecMap.remove(KEY1);
207 assertTrue(ecMap.isEmpty());
208 }
209
210 @Test
211 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800212 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800213
214 assertFalse(ecMap.containsKey(KEY1));
215 ecMap.put(KEY1, VALUE1);
216 assertTrue(ecMap.containsKey(KEY1));
217 assertFalse(ecMap.containsKey(KEY2));
218 ecMap.remove(KEY1);
219 assertFalse(ecMap.containsKey(KEY1));
220 }
221
222 @Test
223 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800224 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800225
226 assertFalse(ecMap.containsValue(VALUE1));
227 ecMap.put(KEY1, VALUE1);
228 assertTrue(ecMap.containsValue(VALUE1));
229 assertFalse(ecMap.containsValue(VALUE2));
230 ecMap.put(KEY1, VALUE2);
231 assertFalse(ecMap.containsValue(VALUE1));
232 assertTrue(ecMap.containsValue(VALUE2));
233 ecMap.remove(KEY1);
234 assertFalse(ecMap.containsValue(VALUE2));
235 }
236
237 @Test
238 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800239 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800240
241 CountDownLatch latch;
242
243 // Local put
244 assertNull(ecMap.get(KEY1));
245 ecMap.put(KEY1, VALUE1);
246 assertEquals(VALUE1, ecMap.get(KEY1));
247
248 // Remote put
Madan Jampani3d76c942015-06-29 23:37:10 -0700249 List<UpdateEntry<String, String>> message
250 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800251
252 // Create a latch so we know when the put operation has finished
253 latch = new CountDownLatch(1);
254 ecMap.addListener(new TestListener(latch));
255
256 assertNull(ecMap.get(KEY2));
Madan Jampani3d76c942015-06-29 23:37:10 -0700257 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800258 assertTrue("External listener never got notified of internal event",
259 latch.await(100, TimeUnit.MILLISECONDS));
260 assertEquals(VALUE2, ecMap.get(KEY2));
261
262 // Local remove
263 ecMap.remove(KEY2);
264 assertNull(ecMap.get(KEY2));
265
266 // Remote remove
Madan Jampani3d76c942015-06-29 23:37:10 -0700267 message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800268
269 // Create a latch so we know when the remove operation has finished
270 latch = new CountDownLatch(1);
271 ecMap.addListener(new TestListener(latch));
272
Madan Jampani3d76c942015-06-29 23:37:10 -0700273 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800274 assertTrue("External listener never got notified of internal event",
275 latch.await(100, TimeUnit.MILLISECONDS));
276 assertNull(ecMap.get(KEY1));
277 }
278
279 @Test
280 public void testPut() throws Exception {
281 // Set up expectations of external events to be sent to listeners during
282 // the test. These don't use timestamps so we can set them all up at once.
283 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800284 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800285 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700286 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800287 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700288 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800289 replay(listener);
290
291 ecMap.addListener(listener);
292
293 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800294 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700295 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800296
297 // Put first value
298 assertNull(ecMap.get(KEY1));
299 ecMap.put(KEY1, VALUE1);
300 assertEquals(VALUE1, ecMap.get(KEY1));
301
302 verify(clusterCommunicator);
303
304 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800305 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700306 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800307
308 // Update same key to a new value
309 ecMap.put(KEY1, VALUE2);
310 assertEquals(VALUE2, ecMap.get(KEY1));
311
312 verify(clusterCommunicator);
313
314 // Do a put with a older timestamp than the value already there.
315 // The map data should not be changed and no notifications should be sent.
316 reset(clusterCommunicator);
317 replay(clusterCommunicator);
318
319 clockService.turnBackTime();
320 ecMap.put(KEY1, VALUE1);
321 // Value should not have changed.
322 assertEquals(VALUE2, ecMap.get(KEY1));
323
324 verify(clusterCommunicator);
325
326 // Check that our listener received the correct events during the test
327 verify(listener);
328 }
329
330 @Test
331 public void testRemove() throws Exception {
332 // Set up expectations of external events to be sent to listeners during
333 // the test. These don't use timestamps so we can set them all up at once.
334 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800335 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800336 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700337 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700338 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700339 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800340 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700341 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800342 replay(listener);
343
344 ecMap.addListener(listener);
345
346 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800347 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800348 ecMap.put(KEY1, VALUE1);
349 assertEquals(VALUE1, ecMap.get(KEY1));
350
351 // Remove the value and check the correct internal cluster messages
352 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800353 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700354 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800355
356 ecMap.remove(KEY1);
357 assertNull(ecMap.get(KEY1));
358
359 verify(clusterCommunicator);
360
361 // Remove the same value again. Even though the value is no longer in
362 // the map, we expect that the tombstone is updated and another remove
363 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800364 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700365 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800366
367 ecMap.remove(KEY1);
368 assertNull(ecMap.get(KEY1));
369
370 verify(clusterCommunicator);
371
372
373 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800374 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800375
376 ecMap.put(KEY2, VALUE2);
377
378 clockService.turnBackTime();
379
380 // Remove should have no effect, since it has an older timestamp than
381 // the put. Expect no notifications to be sent out
382 reset(clusterCommunicator);
383 replay(clusterCommunicator);
384
385 ecMap.remove(KEY2);
386
387 verify(clusterCommunicator);
388
389 // Check that our listener received the correct events during the test
390 verify(listener);
391 }
392
393 @Test
Madan Jampani4727a112015-07-16 12:12:58 -0700394 public void testCompute() throws Exception {
395 // Set up expectations of external events to be sent to listeners during
396 // the test. These don't use timestamps so we can set them all up at once.
397 EventuallyConsistentMapListener<String, String> listener
398 = getListener();
399 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700400 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700401 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700402 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700403 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700404 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Madan Jampani4727a112015-07-16 12:12:58 -0700405 replay(listener);
406
407 ecMap.addListener(listener);
408
409 // Put in an initial value
410 expectPeerMessage(clusterCommunicator);
411 ecMap.compute(KEY1, (k, v) -> VALUE1);
412 assertEquals(VALUE1, ecMap.get(KEY1));
413
414 // Remove the value and check the correct internal cluster messages
415 // are sent
416 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
417 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
418
419 ecMap.compute(KEY1, (k, v) -> null);
420 assertNull(ecMap.get(KEY1));
421
422 verify(clusterCommunicator);
423
424 // Remove the same value again. Even though the value is no longer in
425 // the map, we expect that the tombstone is updated and another remove
426 // event is sent to the cluster and external listeners.
427 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
428 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
429
430 ecMap.compute(KEY1, (k, v) -> null);
431 assertNull(ecMap.get(KEY1));
432
433 verify(clusterCommunicator);
434
435 // Put in a new value for us to try and remove
436 expectPeerMessage(clusterCommunicator);
437
438 ecMap.compute(KEY2, (k, v) -> VALUE2);
439
440 clockService.turnBackTime();
441
442 // Remove should have no effect, since it has an older timestamp than
443 // the put. Expect no notifications to be sent out
444 reset(clusterCommunicator);
445 replay(clusterCommunicator);
446
447 ecMap.compute(KEY2, (k, v) -> null);
448
449 verify(clusterCommunicator);
450
451 // Check that our listener received the correct events during the test
452 verify(listener);
453 }
454
455 @Test
Jonathan Hart584d2f32015-01-27 19:46:14 -0800456 public void testPutAll() throws Exception {
457 // putAll() with an empty map is a no-op - no messages will be sent
458 reset(clusterCommunicator);
459 replay(clusterCommunicator);
460
461 ecMap.putAll(new HashMap<>());
462
463 verify(clusterCommunicator);
464
465 // Set up the listener with our expected events
466 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800467 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800468 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700469 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800470 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700471 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800472 replay(listener);
473
474 ecMap.addListener(listener);
475
476 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700477 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800478 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800479
480 Map<String, String> putAllValues = new HashMap<>();
481 putAllValues.put(KEY1, VALUE1);
482 putAllValues.put(KEY2, VALUE2);
483
484 // Put the values in the map
485 ecMap.putAll(putAllValues);
486
487 // Check the correct messages and events were sent
488 verify(clusterCommunicator);
489 verify(listener);
490 }
491
492 @Test
493 public void testClear() throws Exception {
494 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800495 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800496 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700497 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800498 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700499 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800500 replay(listener);
501
502 // clear() on an empty map is a no-op - no messages will be sent
503 reset(clusterCommunicator);
504 replay(clusterCommunicator);
505
506 assertTrue(ecMap.isEmpty());
507 ecMap.clear();
508 verify(clusterCommunicator);
509
510 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800511 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800512 ecMap.put(KEY1, VALUE1);
513 ecMap.put(KEY2, VALUE2);
514
515 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700516 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800517
518 ecMap.clear();
519
520 verify(clusterCommunicator);
521 verify(listener);
522 }
523
524 @Test
525 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800526 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800527
528 assertTrue(ecMap.keySet().isEmpty());
529
530 // Generate some keys
531 Set<String> keys = new HashSet<>();
532 for (int i = 1; i <= 10; i++) {
533 keys.add("" + i);
534 }
535
536 // Put each key in the map
537 keys.forEach(k -> ecMap.put(k, "value" + k));
538
539 // Check keySet() returns the correct value
540 assertEquals(keys, ecMap.keySet());
541
542 // Update the value for one of the keys
543 ecMap.put(keys.iterator().next(), "new-value");
544
545 // Check the key set is still the same
546 assertEquals(keys, ecMap.keySet());
547
548 // Remove a key
549 String removeKey = keys.iterator().next();
550 keys.remove(removeKey);
551 ecMap.remove(removeKey);
552
553 // Check the key set is still correct
554 assertEquals(keys, ecMap.keySet());
555 }
556
557 @Test
558 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800559 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800560
561 assertTrue(ecMap.values().isEmpty());
562
563 // Generate some values
564 Map<String, String> expectedValues = new HashMap<>();
565 for (int i = 1; i <= 10; i++) {
566 expectedValues.put("" + i, "value" + i);
567 }
568
569 // Add them into the map
570 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
571
572 // Check the values collection is correct
573 assertEquals(expectedValues.values().size(), ecMap.values().size());
574 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
575
576 // Update the value for one of the keys
577 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
578 expectedValues.put(first.getKey(), "new-value");
579 ecMap.put(first.getKey(), "new-value");
580
581 // Check the values collection is still correct
582 assertEquals(expectedValues.values().size(), ecMap.values().size());
583 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
584
585 // Remove a key
586 String removeKey = expectedValues.keySet().iterator().next();
587 expectedValues.remove(removeKey);
588 ecMap.remove(removeKey);
589
590 // Check the values collection is still correct
591 assertEquals(expectedValues.values().size(), ecMap.values().size());
592 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
593 }
594
595 @Test
596 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800597 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800598
599 assertTrue(ecMap.entrySet().isEmpty());
600
601 // Generate some values
602 Map<String, String> expectedValues = new HashMap<>();
603 for (int i = 1; i <= 10; i++) {
604 expectedValues.put("" + i, "value" + i);
605 }
606
607 // Add them into the map
608 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
609
610 // Check the entry set is correct
611 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
612
613 // Update the value for one of the keys
614 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
615 expectedValues.put(first.getKey(), "new-value");
616 ecMap.put(first.getKey(), "new-value");
617
618 // Check the entry set is still correct
619 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
620
621 // Remove a key
622 String removeKey = expectedValues.keySet().iterator().next();
623 expectedValues.remove(removeKey);
624 ecMap.remove(removeKey);
625
626 // Check the entry set is still correct
627 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
628 }
629
630 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
631 if (expectedMap.entrySet().size() != actual.size()) {
632 return false;
633 }
634
635 for (Map.Entry<String, String> e : actual) {
636 if (!expectedMap.containsKey(e.getKey())) {
637 return false;
638 }
639 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
640 return false;
641 }
642 }
643 return true;
644 }
645
646 @Test
647 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800648 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800649 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
650
651 replay(clusterCommunicator);
652
653 ecMap.destroy();
654
655 verify(clusterCommunicator);
656
657 try {
658 ecMap.get(KEY1);
659 fail("get after destroy should throw exception");
660 } catch (IllegalStateException e) {
661 assertTrue(true);
662 }
663
664 try {
665 ecMap.put(KEY1, VALUE1);
666 fail("put after destroy should throw exception");
667 } catch (IllegalStateException e) {
668 assertTrue(true);
669 }
670 }
671
Madan Jampani3d76c942015-06-29 23:37:10 -0700672 private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
673 return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800674 }
675
Madan Jampani3d76c942015-06-29 23:37:10 -0700676 private List<UpdateEntry<String, String>> generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700677 String key1, String value1, String key2, String value2) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700678 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800679
680 Timestamp timestamp1 = clockService.peek(1);
681 Timestamp timestamp2 = clockService.peek(2);
682
Madan Jampani3d76c942015-06-29 23:37:10 -0700683 list.add(generatePutMessage(key1, value1, timestamp1));
684 list.add(generatePutMessage(key2, value2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800685
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700686 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800687 }
688
Madan Jampani3d76c942015-06-29 23:37:10 -0700689 private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
690 return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800691 }
692
Madan Jampani3d76c942015-06-29 23:37:10 -0700693 private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
694 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800695
696 Timestamp timestamp1 = clockService.peek(1);
697 Timestamp timestamp2 = clockService.peek(2);
698
Madan Jampani3d76c942015-06-29 23:37:10 -0700699 list.add(generateRemoveMessage(key1, timestamp1));
700 list.add(generateRemoveMessage(key2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800701
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700702 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800703 }
704
705 /**
706 * Sets up a mock ClusterCommunicationService to expect a specific cluster
707 * message to be broadcast to the cluster.
708 *
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700709 * @param message message we expect to be sent
Jonathan Hart584d2f32015-01-27 19:46:14 -0800710 * @param clusterCommunicator a mock ClusterCommunicationService to set up
711 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800712 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700713 private static <T> void expectSpecificBroadcastMessage(
714 T message,
715 MessageSubject subject,
716 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800717 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700718 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
719 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800720 replay(clusterCommunicator);
721 }
722
723 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800724 * Sets up a mock ClusterCommunicationService to expect a specific cluster
725 * message to be multicast to the cluster.
726 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700727 * @param message message we expect to be sent
728 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800729 * @param clusterCommunicator a mock ClusterCommunicationService to set up
730 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800731 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700732 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800733 ClusterCommunicationService clusterCommunicator) {
734 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700735 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
736 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800737 replay(clusterCommunicator);
738 }
739
740
741 /**
742 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800743 * that is sent to it. This is useful for unit tests where we aren't
744 * interested in testing the messaging component.
745 *
746 * @param clusterCommunicator a mock ClusterCommunicationService to set up
747 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800748 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700749 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800750 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800751// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
752// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700753 expect(clusterCommunicator.<T>unicast(
754 anyObject(),
755 anyObject(MessageSubject.class),
756 anyObject(Function.class),
757 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700758 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800759 .anyTimes();
760 replay(clusterCommunicator);
761 }
762
763 /**
764 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
765 * that is sent to it. This is useful for unit tests where we aren't
766 * interested in testing the messaging component.
767 *
768 * @param clusterCommunicator a mock ClusterCommunicationService to set up
769 */
770 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800771 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700772 clusterCommunicator.<AbstractEvent>multicast(
773 anyObject(AbstractEvent.class),
774 anyObject(MessageSubject.class),
775 anyObject(Function.class),
776 anyObject(Set.class));
777 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800778 replay(clusterCommunicator);
779 }
780
781 /**
782 * ClusterCommunicationService implementation that the map's addSubscriber
783 * call will delegate to. This means we can get a reference to the
784 * internal cluster message handler used by the map, so that we can simulate
785 * events coming in from other instances.
786 */
787 private final class TestClusterCommunicationService
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700788 extends ClusterCommunicationServiceAdapter {
Madan Jampani27b69c62015-05-15 15:49:02 -0700789
790 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700791 public <M> void addSubscriber(MessageSubject subject,
792 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700793 Executor executor) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700794 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
795 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
796 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
797 antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
798 } else {
799 throw new RuntimeException("Unexpected message subject " + subject.toString());
800 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700801 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800802 }
803
804 /**
805 * ClockService implementation that gives out timestamps based on a
806 * sequential counter. This clock service enables more control over the
807 * timestamps that are given out, including being able to "turn back time"
808 * to give out timestamps from the past.
809 *
810 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800811 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800812 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700813 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800814
815 private static final long INITIAL_VALUE = 1;
816 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
817
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800818 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800819 return new TestTimestamp(counter.getAndIncrement());
820 }
821
822 /**
823 * Returns what the next timestamp will be without consuming the
824 * timestamp. This allows test code to set expectations correctly while
825 * still allowing the CUT to get the same timestamp.
826 *
827 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800828 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800829 */
830 public Timestamp peekAtNextTimestamp() {
831 return peek(1);
832 }
833
834 /**
835 * Returns the ith timestamp to be given out in the future without
836 * consuming the timestamp. For example, i=1 returns the next timestamp,
837 * i=2 returns the timestamp after that, and so on.
838 *
839 * @param i number of the timestamp to peek at
840 * @return the ith timestamp that will be given out
841 */
842 public Timestamp peek(int i) {
843 checkArgument(i > 0, "i must be a positive integer");
844
845 return new TestTimestamp(counter.get() + i - 1);
846 }
847
848 /**
849 * Turns the clock back two ticks, so the next call to getTimestamp will
850 * return an older timestamp than the previous call to getTimestamp.
851 */
852 public void turnBackTime() {
853 // Not atomic, but should be OK for these tests.
854 counter.decrementAndGet();
855 counter.decrementAndGet();
856 }
857
858 }
859
860 /**
861 * Timestamp implementation where the value of the timestamp can be
862 * specified explicitly at creation time.
863 */
864 private class TestTimestamp implements Timestamp {
865
866 private final long timestamp;
867
868 /**
869 * Creates a new timestamp that has the specified value.
870 *
871 * @param timestamp value of the timestamp
872 */
873 public TestTimestamp(long timestamp) {
874 this.timestamp = timestamp;
875 }
876
877 @Override
878 public int compareTo(Timestamp o) {
879 checkArgument(o instanceof TestTimestamp);
880 TestTimestamp otherTimestamp = (TestTimestamp) o;
881 return ComparisonChain.start()
882 .compare(this.timestamp, otherTimestamp.timestamp)
883 .result();
884 }
885 }
886
887 /**
888 * EventuallyConsistentMapListener implementation which triggers a latch
889 * when it receives an event.
890 */
891 private class TestListener implements EventuallyConsistentMapListener<String, String> {
892 private CountDownLatch latch;
893
894 /**
895 * Creates a new listener that will trigger the specified latch when it
896 * receives and event.
897 *
898 * @param latch the latch to trigger on events
899 */
900 public TestListener(CountDownLatch latch) {
901 this.latch = latch;
902 }
903
904 @Override
905 public void event(EventuallyConsistentMapEvent<String, String> event) {
906 latch.countDown();
907 }
908 }
909}