blob: 0a3d5e6c70928fa7794cee2d06b38553a84bc9d6 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080019import com.google.common.collect.ImmutableSet;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080020import com.google.common.collect.Lists;
Jonathan Hart584d2f32015-01-27 19:46:14 -080021import com.google.common.util.concurrent.ListenableFuture;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080022import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani3e033bd2015-04-08 13:03:49 -070023
Jonathan Hart584d2f32015-01-27 19:46:14 -080024import org.junit.After;
25import org.junit.Before;
26import org.junit.Test;
27import org.onlab.packet.IpAddress;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.ControllerNode;
31import org.onosproject.cluster.DefaultControllerNode;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.store.Timestamp;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.ClusterMessage;
36import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070038import org.onosproject.store.service.ClockService;
Madan Jampani3e033bd2015-04-08 13:03:49 -070039import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart77bdd262015-02-03 09:07:48 -080040import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080041import org.onosproject.store.serializers.KryoNamespaces;
42import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070043import org.onosproject.store.service.EventuallyConsistentMap;
44import org.onosproject.store.service.EventuallyConsistentMapEvent;
45import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hart584d2f32015-01-27 19:46:14 -080046
47import java.io.IOException;
48import java.util.ArrayList;
49import java.util.HashMap;
50import java.util.HashSet;
51import java.util.Map;
52import java.util.Objects;
53import java.util.Set;
54import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080055import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080056import java.util.concurrent.TimeUnit;
57import java.util.concurrent.atomic.AtomicLong;
58
59import static com.google.common.base.Preconditions.checkArgument;
60import static junit.framework.TestCase.assertFalse;
61import static org.easymock.EasyMock.*;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080062import static org.junit.Assert.*;
Jonathan Hart584d2f32015-01-27 19:46:14 -080063
64/**
65 * Unit tests for EventuallyConsistentMapImpl.
66 */
67public class EventuallyConsistentMapImplTest {
68
69 private EventuallyConsistentMap<String, String> ecMap;
70
71 private ClusterService clusterService;
72 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080073 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080074
75 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080076 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080077 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080078 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
79 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
80
81 private static final String KEY1 = "one";
82 private static final String KEY2 = "two";
83 private static final String VALUE1 = "oneValue";
84 private static final String VALUE2 = "twoValue";
85
86 private final ControllerNode self =
87 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
88
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080089 private ClusterMessageHandler updateHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -080090 private ClusterMessageHandler antiEntropyHandler;
91
92 /*
93 * Serialization is a bit tricky here. We need to serialize in the tests
94 * to set the expectations, which will use this serializer here, but the
95 * EventuallyConsistentMap will use its own internal serializer. This means
96 * this serializer must be set up exactly the same as map's internal
97 * serializer.
98 */
99 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
100 @Override
101 protected void setupKryoPool() {
102 serializerPool = KryoNamespace.newBuilder()
103 // Classes we give to the map
104 .register(KryoNamespaces.API)
105 .register(TestTimestamp.class)
106 // Below is the classes that the map internally registers
Madan Jampani3e033bd2015-04-08 13:03:49 -0700107 .register(LogicalTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800108 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800109 .register(PutEntry.class)
110 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800111 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800112 .register(AntiEntropyAdvertisement.class)
113 .register(HashMap.class)
114 .build();
115 }
116 };
117
118 @Before
119 public void setUp() throws Exception {
120 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800121 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
122 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800123 replay(clusterService);
124
125 clusterCommunicator = createMock(ClusterCommunicationService.class);
126
127 // Add expectation for adding cluster message subscribers which
128 // delegate to our ClusterCommunicationService implementation. This
129 // allows us to get a reference to the map's internal cluster message
130 // handlers so we can induce events coming in from a peer.
131 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
Madan Jampani2af244a2015-02-22 13:12:01 -0800132 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800133 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
134
135 replay(clusterCommunicator);
136
137 clockService = new SequentialClockService<>();
138
139 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
140 .register(KryoNamespaces.API)
141 .register(TestTimestamp.class);
142
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 ecMap = new EventuallyConsistentMapBuilderImpl<>(
144 clusterService, clusterCommunicator)
145 .withName(MAP_NAME)
146 .withSerializer(serializer)
147 .withClockService(clockService)
148 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
149 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800150
151 // Reset ready for tests to add their own expectations
152 reset(clusterCommunicator);
153 }
154
155 @After
156 public void tearDown() {
157 reset(clusterCommunicator);
158 ecMap.destroy();
159 }
160
Ray Milkey8dc82082015-02-20 16:22:38 -0800161 @SuppressWarnings("unchecked")
162 private EventuallyConsistentMapListener<String, String> getListener() {
163 return createMock(EventuallyConsistentMapListener.class);
164 }
165
Jonathan Hart584d2f32015-01-27 19:46:14 -0800166 @Test
167 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800168 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800169
170 assertEquals(0, ecMap.size());
171 ecMap.put(KEY1, VALUE1);
172 assertEquals(1, ecMap.size());
173 ecMap.put(KEY1, VALUE2);
174 assertEquals(1, ecMap.size());
175 ecMap.put(KEY2, VALUE2);
176 assertEquals(2, ecMap.size());
177 for (int i = 0; i < 10; i++) {
178 ecMap.put("" + i, "" + i);
179 }
180 assertEquals(12, ecMap.size());
181 ecMap.remove(KEY1);
182 assertEquals(11, ecMap.size());
183 ecMap.remove(KEY1);
184 assertEquals(11, ecMap.size());
185 }
186
187 @Test
188 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800189 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800190
191 assertTrue(ecMap.isEmpty());
192 ecMap.put(KEY1, VALUE1);
193 assertFalse(ecMap.isEmpty());
194 ecMap.remove(KEY1);
195 assertTrue(ecMap.isEmpty());
196 }
197
198 @Test
199 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800200 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800201
202 assertFalse(ecMap.containsKey(KEY1));
203 ecMap.put(KEY1, VALUE1);
204 assertTrue(ecMap.containsKey(KEY1));
205 assertFalse(ecMap.containsKey(KEY2));
206 ecMap.remove(KEY1);
207 assertFalse(ecMap.containsKey(KEY1));
208 }
209
210 @Test
211 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800212 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800213
214 assertFalse(ecMap.containsValue(VALUE1));
215 ecMap.put(KEY1, VALUE1);
216 assertTrue(ecMap.containsValue(VALUE1));
217 assertFalse(ecMap.containsValue(VALUE2));
218 ecMap.put(KEY1, VALUE2);
219 assertFalse(ecMap.containsValue(VALUE1));
220 assertTrue(ecMap.containsValue(VALUE2));
221 ecMap.remove(KEY1);
222 assertFalse(ecMap.containsValue(VALUE2));
223 }
224
225 @Test
226 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800227 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800228
229 CountDownLatch latch;
230
231 // Local put
232 assertNull(ecMap.get(KEY1));
233 ecMap.put(KEY1, VALUE1);
234 assertEquals(VALUE1, ecMap.get(KEY1));
235
236 // Remote put
237 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800238 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800239
240 // Create a latch so we know when the put operation has finished
241 latch = new CountDownLatch(1);
242 ecMap.addListener(new TestListener(latch));
243
244 assertNull(ecMap.get(KEY2));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800245 updateHandler.handle(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800246 assertTrue("External listener never got notified of internal event",
247 latch.await(100, TimeUnit.MILLISECONDS));
248 assertEquals(VALUE2, ecMap.get(KEY2));
249
250 // Local remove
251 ecMap.remove(KEY2);
252 assertNull(ecMap.get(KEY2));
253
254 // Remote remove
255 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800256 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800257
258 // Create a latch so we know when the remove operation has finished
259 latch = new CountDownLatch(1);
260 ecMap.addListener(new TestListener(latch));
261
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800262 updateHandler.handle(removeMessage);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800263 assertTrue("External listener never got notified of internal event",
264 latch.await(100, TimeUnit.MILLISECONDS));
265 assertNull(ecMap.get(KEY1));
266 }
267
268 @Test
269 public void testPut() throws Exception {
270 // Set up expectations of external events to be sent to listeners during
271 // the test. These don't use timestamps so we can set them all up at once.
272 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800273 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800274 listener.event(new EventuallyConsistentMapEvent<>(
275 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
276 listener.event(new EventuallyConsistentMapEvent<>(
277 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
278 replay(listener);
279
280 ecMap.addListener(listener);
281
282 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800283 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Jonathan Hart584d2f32015-01-27 19:46:14 -0800284 .peekAtNextTimestamp()), clusterCommunicator);
285
286 // Put first value
287 assertNull(ecMap.get(KEY1));
288 ecMap.put(KEY1, VALUE1);
289 assertEquals(VALUE1, ecMap.get(KEY1));
290
291 verify(clusterCommunicator);
292
293 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800294 expectSpecificMulticastMessage(generatePutMessage(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800295 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
296
297 // Update same key to a new value
298 ecMap.put(KEY1, VALUE2);
299 assertEquals(VALUE2, ecMap.get(KEY1));
300
301 verify(clusterCommunicator);
302
303 // Do a put with a older timestamp than the value already there.
304 // The map data should not be changed and no notifications should be sent.
305 reset(clusterCommunicator);
306 replay(clusterCommunicator);
307
308 clockService.turnBackTime();
309 ecMap.put(KEY1, VALUE1);
310 // Value should not have changed.
311 assertEquals(VALUE2, ecMap.get(KEY1));
312
313 verify(clusterCommunicator);
314
315 // Check that our listener received the correct events during the test
316 verify(listener);
317 }
318
319 @Test
320 public void testRemove() throws Exception {
321 // Set up expectations of external events to be sent to listeners during
322 // the test. These don't use timestamps so we can set them all up at once.
323 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800324 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800325 listener.event(new EventuallyConsistentMapEvent<>(
326 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
327 expectLastCall().times(2);
328 listener.event(new EventuallyConsistentMapEvent<>(
329 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
330 listener.event(new EventuallyConsistentMapEvent<>(
331 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
332 replay(listener);
333
334 ecMap.addListener(listener);
335
336 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800337 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800338 ecMap.put(KEY1, VALUE1);
339 assertEquals(VALUE1, ecMap.get(KEY1));
340
341 // Remove the value and check the correct internal cluster messages
342 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800343 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
344 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800345
346 ecMap.remove(KEY1);
347 assertNull(ecMap.get(KEY1));
348
349 verify(clusterCommunicator);
350
351 // Remove the same value again. Even though the value is no longer in
352 // the map, we expect that the tombstone is updated and another remove
353 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800354 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
355 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800356
357 ecMap.remove(KEY1);
358 assertNull(ecMap.get(KEY1));
359
360 verify(clusterCommunicator);
361
362
363 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800364 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800365
366 ecMap.put(KEY2, VALUE2);
367
368 clockService.turnBackTime();
369
370 // Remove should have no effect, since it has an older timestamp than
371 // the put. Expect no notifications to be sent out
372 reset(clusterCommunicator);
373 replay(clusterCommunicator);
374
375 ecMap.remove(KEY2);
376
377 verify(clusterCommunicator);
378
379 // Check that our listener received the correct events during the test
380 verify(listener);
381 }
382
383 @Test
384 public void testPutAll() throws Exception {
385 // putAll() with an empty map is a no-op - no messages will be sent
386 reset(clusterCommunicator);
387 replay(clusterCommunicator);
388
389 ecMap.putAll(new HashMap<>());
390
391 verify(clusterCommunicator);
392
393 // Set up the listener with our expected events
394 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800395 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800396 listener.event(new EventuallyConsistentMapEvent<>(
397 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
398 listener.event(new EventuallyConsistentMapEvent<>(
399 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
400 replay(listener);
401
402 ecMap.addListener(listener);
403
404 // Expect a multi-update inter-instance message
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800405 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
406 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800407
408 Map<String, String> putAllValues = new HashMap<>();
409 putAllValues.put(KEY1, VALUE1);
410 putAllValues.put(KEY2, VALUE2);
411
412 // Put the values in the map
413 ecMap.putAll(putAllValues);
414
415 // Check the correct messages and events were sent
416 verify(clusterCommunicator);
417 verify(listener);
418 }
419
420 @Test
421 public void testClear() throws Exception {
422 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800423 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800424 listener.event(new EventuallyConsistentMapEvent<>(
425 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
426 listener.event(new EventuallyConsistentMapEvent<>(
427 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
428 replay(listener);
429
430 // clear() on an empty map is a no-op - no messages will be sent
431 reset(clusterCommunicator);
432 replay(clusterCommunicator);
433
434 assertTrue(ecMap.isEmpty());
435 ecMap.clear();
436 verify(clusterCommunicator);
437
438 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800439 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800440 ecMap.put(KEY1, VALUE1);
441 ecMap.put(KEY2, VALUE2);
442
443 ecMap.addListener(listener);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800444 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800445
446 ecMap.clear();
447
448 verify(clusterCommunicator);
449 verify(listener);
450 }
451
452 @Test
453 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800454 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800455
456 assertTrue(ecMap.keySet().isEmpty());
457
458 // Generate some keys
459 Set<String> keys = new HashSet<>();
460 for (int i = 1; i <= 10; i++) {
461 keys.add("" + i);
462 }
463
464 // Put each key in the map
465 keys.forEach(k -> ecMap.put(k, "value" + k));
466
467 // Check keySet() returns the correct value
468 assertEquals(keys, ecMap.keySet());
469
470 // Update the value for one of the keys
471 ecMap.put(keys.iterator().next(), "new-value");
472
473 // Check the key set is still the same
474 assertEquals(keys, ecMap.keySet());
475
476 // Remove a key
477 String removeKey = keys.iterator().next();
478 keys.remove(removeKey);
479 ecMap.remove(removeKey);
480
481 // Check the key set is still correct
482 assertEquals(keys, ecMap.keySet());
483 }
484
485 @Test
486 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800487 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800488
489 assertTrue(ecMap.values().isEmpty());
490
491 // Generate some values
492 Map<String, String> expectedValues = new HashMap<>();
493 for (int i = 1; i <= 10; i++) {
494 expectedValues.put("" + i, "value" + i);
495 }
496
497 // Add them into the map
498 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
499
500 // Check the values collection is correct
501 assertEquals(expectedValues.values().size(), ecMap.values().size());
502 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
503
504 // Update the value for one of the keys
505 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
506 expectedValues.put(first.getKey(), "new-value");
507 ecMap.put(first.getKey(), "new-value");
508
509 // Check the values collection is still correct
510 assertEquals(expectedValues.values().size(), ecMap.values().size());
511 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
512
513 // Remove a key
514 String removeKey = expectedValues.keySet().iterator().next();
515 expectedValues.remove(removeKey);
516 ecMap.remove(removeKey);
517
518 // Check the values collection is still correct
519 assertEquals(expectedValues.values().size(), ecMap.values().size());
520 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
521 }
522
523 @Test
524 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800525 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800526
527 assertTrue(ecMap.entrySet().isEmpty());
528
529 // Generate some values
530 Map<String, String> expectedValues = new HashMap<>();
531 for (int i = 1; i <= 10; i++) {
532 expectedValues.put("" + i, "value" + i);
533 }
534
535 // Add them into the map
536 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
537
538 // Check the entry set is correct
539 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
540
541 // Update the value for one of the keys
542 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
543 expectedValues.put(first.getKey(), "new-value");
544 ecMap.put(first.getKey(), "new-value");
545
546 // Check the entry set is still correct
547 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
548
549 // Remove a key
550 String removeKey = expectedValues.keySet().iterator().next();
551 expectedValues.remove(removeKey);
552 ecMap.remove(removeKey);
553
554 // Check the entry set is still correct
555 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
556 }
557
558 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
559 if (expectedMap.entrySet().size() != actual.size()) {
560 return false;
561 }
562
563 for (Map.Entry<String, String> e : actual) {
564 if (!expectedMap.containsKey(e.getKey())) {
565 return false;
566 }
567 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
568 return false;
569 }
570 }
571 return true;
572 }
573
574 @Test
575 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800576 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800577 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
578
579 replay(clusterCommunicator);
580
581 ecMap.destroy();
582
583 verify(clusterCommunicator);
584
585 try {
586 ecMap.get(KEY1);
587 fail("get after destroy should throw exception");
588 } catch (IllegalStateException e) {
589 assertTrue(true);
590 }
591
592 try {
593 ecMap.put(KEY1, VALUE1);
594 fail("put after destroy should throw exception");
595 } catch (IllegalStateException e) {
596 assertTrue(true);
597 }
598 }
599
600 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800601 PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800602
603 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800604 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
605 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800606 }
607
608 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800609 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800610
611 Timestamp timestamp1 = clockService.peek(1);
612 Timestamp timestamp2 = clockService.peek(2);
613
Jonathan Hartf9108232015-02-02 16:37:35 -0800614 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
615 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800616
617 list.add(pe1);
618 list.add(pe2);
619
Jonathan Hart584d2f32015-01-27 19:46:14 -0800620
621 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800622 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
623 SERIALIZER.encode(list));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800624 }
625
626 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800627 RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800628
629 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800630 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
631 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800632 }
633
634 private ClusterMessage generateRemoveMessage(String key1, String key2) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800635 ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800636
637 Timestamp timestamp1 = clockService.peek(1);
638 Timestamp timestamp2 = clockService.peek(2);
639
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800640 RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
641 RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800642
643 list.add(re1);
644 list.add(re2);
645
Jonathan Hart584d2f32015-01-27 19:46:14 -0800646 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800647 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
648 SERIALIZER.encode(list));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800649 }
650
651 /**
652 * Sets up a mock ClusterCommunicationService to expect a specific cluster
653 * message to be broadcast to the cluster.
654 *
655 * @param m message we expect to be sent
656 * @param clusterCommunicator a mock ClusterCommunicationService to set up
657 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800658 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800659 private static void expectSpecificBroadcastMessage(ClusterMessage m,
660 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800661 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800662// expect(clusterCommunicator.broadcast(m)).andReturn(true);
663 expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
664 .andReturn(true)
665 .anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800666 replay(clusterCommunicator);
667 }
668
669 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800670 * Sets up a mock ClusterCommunicationService to expect a specific cluster
671 * message to be multicast to the cluster.
672 *
673 * @param m message we expect to be sent
674 * @param clusterCommunicator a mock ClusterCommunicationService to set up
675 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800676 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800677 private static void expectSpecificMulticastMessage(ClusterMessage m,
678 ClusterCommunicationService clusterCommunicator) {
679 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800680// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
681 expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
682 .andReturn(true)
683 .anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800684 replay(clusterCommunicator);
685 }
686
687
688 /**
689 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800690 * that is sent to it. This is useful for unit tests where we aren't
691 * interested in testing the messaging component.
692 *
693 * @param clusterCommunicator a mock ClusterCommunicationService to set up
694 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800695 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800696 private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
697 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800698// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
699// anyObject(Iterable.class)))
700 expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
701 anyObject(NodeId.class)))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800702 .andReturn(true)
703 .anyTimes();
704 replay(clusterCommunicator);
705 }
706
707 /**
708 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
709 * that is sent to it. This is useful for unit tests where we aren't
710 * interested in testing the messaging component.
711 *
712 * @param clusterCommunicator a mock ClusterCommunicationService to set up
713 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800714 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800715 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800716 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800717// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
718// .andReturn(true)
719// .anyTimes();
720 expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
Jonathan Hart584d2f32015-01-27 19:46:14 -0800721 .andReturn(true)
722 .anyTimes();
723 replay(clusterCommunicator);
724 }
725
726 /**
727 * ClusterCommunicationService implementation that the map's addSubscriber
728 * call will delegate to. This means we can get a reference to the
729 * internal cluster message handler used by the map, so that we can simulate
730 * events coming in from other instances.
731 */
732 private final class TestClusterCommunicationService
733 implements ClusterCommunicationService {
734
735 @Override
736 public boolean broadcast(ClusterMessage message) {
737 return false;
738 }
739
740 @Override
741 public boolean broadcastIncludeSelf(ClusterMessage message) {
742 return false;
743 }
744
745 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800746 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800747 return false;
748 }
749
750 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800751 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800752 return false;
753 }
754
755 @Override
756 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
757 NodeId toNodeId)
758 throws IOException {
759 return null;
760 }
761
762 @Override
763 public void addSubscriber(MessageSubject subject,
764 ClusterMessageHandler subscriber) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800765 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
766 updateHandler = subscriber;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800767 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
768 antiEntropyHandler = subscriber;
769 } else {
770 throw new RuntimeException("Unexpected message subject " + subject.toString());
771 }
772 }
773
774 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800775 public void addSubscriber(MessageSubject subject,
776 ClusterMessageHandler subscriber,
777 ExecutorService executor) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800778 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
779 updateHandler = subscriber;
Madan Jampani2af244a2015-02-22 13:12:01 -0800780 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
781 antiEntropyHandler = subscriber;
782 } else {
783 throw new RuntimeException("Unexpected message subject " + subject.toString());
784 }
785 }
786
787 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800788 public void removeSubscriber(MessageSubject subject) {}
789 }
790
791 /**
792 * ClockService implementation that gives out timestamps based on a
793 * sequential counter. This clock service enables more control over the
794 * timestamps that are given out, including being able to "turn back time"
795 * to give out timestamps from the past.
796 *
797 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800798 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800799 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800800 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800801
802 private static final long INITIAL_VALUE = 1;
803 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
804
805 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800806 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800807 return new TestTimestamp(counter.getAndIncrement());
808 }
809
810 /**
811 * Returns what the next timestamp will be without consuming the
812 * timestamp. This allows test code to set expectations correctly while
813 * still allowing the CUT to get the same timestamp.
814 *
815 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800816 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800817 */
818 public Timestamp peekAtNextTimestamp() {
819 return peek(1);
820 }
821
822 /**
823 * Returns the ith timestamp to be given out in the future without
824 * consuming the timestamp. For example, i=1 returns the next timestamp,
825 * i=2 returns the timestamp after that, and so on.
826 *
827 * @param i number of the timestamp to peek at
828 * @return the ith timestamp that will be given out
829 */
830 public Timestamp peek(int i) {
831 checkArgument(i > 0, "i must be a positive integer");
832
833 return new TestTimestamp(counter.get() + i - 1);
834 }
835
836 /**
837 * Turns the clock back two ticks, so the next call to getTimestamp will
838 * return an older timestamp than the previous call to getTimestamp.
839 */
840 public void turnBackTime() {
841 // Not atomic, but should be OK for these tests.
842 counter.decrementAndGet();
843 counter.decrementAndGet();
844 }
845
846 }
847
848 /**
849 * Timestamp implementation where the value of the timestamp can be
850 * specified explicitly at creation time.
851 */
852 private class TestTimestamp implements Timestamp {
853
854 private final long timestamp;
855
856 /**
857 * Creates a new timestamp that has the specified value.
858 *
859 * @param timestamp value of the timestamp
860 */
861 public TestTimestamp(long timestamp) {
862 this.timestamp = timestamp;
863 }
864
865 @Override
866 public int compareTo(Timestamp o) {
867 checkArgument(o instanceof TestTimestamp);
868 TestTimestamp otherTimestamp = (TestTimestamp) o;
869 return ComparisonChain.start()
870 .compare(this.timestamp, otherTimestamp.timestamp)
871 .result();
872 }
873 }
874
875 /**
876 * EventuallyConsistentMapListener implementation which triggers a latch
877 * when it receives an event.
878 */
879 private class TestListener implements EventuallyConsistentMapListener<String, String> {
880 private CountDownLatch latch;
881
882 /**
883 * Creates a new listener that will trigger the specified latch when it
884 * receives and event.
885 *
886 * @param latch the latch to trigger on events
887 */
888 public TestListener(CountDownLatch latch) {
889 this.latch = latch;
890 }
891
892 @Override
893 public void event(EventuallyConsistentMapEvent<String, String> event) {
894 latch.countDown();
895 }
896 }
897}