blob: 5ed638406b04d8033228f2db2254e740b8a76e10 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080019import com.google.common.collect.ImmutableSet;
Jonathan Hart584d2f32015-01-27 19:46:14 -080020import com.google.common.util.concurrent.ListenableFuture;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080021import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080022import org.junit.After;
23import org.junit.Before;
24import org.junit.Test;
25import org.onlab.packet.IpAddress;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.ControllerNode;
29import org.onosproject.cluster.DefaultControllerNode;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.store.Timestamp;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080036import org.onosproject.store.impl.ClockService;
37import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080038import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.serializers.KryoSerializer;
40
41import java.io.IOException;
42import java.util.ArrayList;
43import java.util.HashMap;
44import java.util.HashSet;
45import java.util.Map;
46import java.util.Objects;
47import java.util.Set;
48import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080049import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080050import java.util.concurrent.TimeUnit;
51import java.util.concurrent.atomic.AtomicLong;
52
53import static com.google.common.base.Preconditions.checkArgument;
54import static junit.framework.TestCase.assertFalse;
55import static org.easymock.EasyMock.*;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080056import static org.junit.Assert.*;
Jonathan Hart584d2f32015-01-27 19:46:14 -080057
58/**
59 * Unit tests for EventuallyConsistentMapImpl.
60 */
61public class EventuallyConsistentMapImplTest {
62
63 private EventuallyConsistentMap<String, String> ecMap;
64
65 private ClusterService clusterService;
66 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080067 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080068
69 private static final String MAP_NAME = "test";
70 private static final MessageSubject PUT_MESSAGE_SUBJECT
71 = new MessageSubject("ecm-" + MAP_NAME + "-update");
72 private static final MessageSubject REMOVE_MESSAGE_SUBJECT
73 = new MessageSubject("ecm-" + MAP_NAME + "-remove");
74 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
75 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
76
77 private static final String KEY1 = "one";
78 private static final String KEY2 = "two";
79 private static final String VALUE1 = "oneValue";
80 private static final String VALUE2 = "twoValue";
81
82 private final ControllerNode self =
83 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
84
85 private ClusterMessageHandler putHandler;
86 private ClusterMessageHandler removeHandler;
87 private ClusterMessageHandler antiEntropyHandler;
88
89 /*
90 * Serialization is a bit tricky here. We need to serialize in the tests
91 * to set the expectations, which will use this serializer here, but the
92 * EventuallyConsistentMap will use its own internal serializer. This means
93 * this serializer must be set up exactly the same as map's internal
94 * serializer.
95 */
96 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
97 @Override
98 protected void setupKryoPool() {
99 serializerPool = KryoNamespace.newBuilder()
100 // Classes we give to the map
101 .register(KryoNamespaces.API)
102 .register(TestTimestamp.class)
103 // Below is the classes that the map internally registers
104 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800105 .register(PutEntry.class)
106 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800107 .register(ArrayList.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800108 .register(InternalPutEvent.class)
109 .register(InternalRemoveEvent.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800110 .register(AntiEntropyAdvertisement.class)
111 .register(HashMap.class)
112 .build();
113 }
114 };
115
116 @Before
117 public void setUp() throws Exception {
118 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800119 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
120 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800121 replay(clusterService);
122
123 clusterCommunicator = createMock(ClusterCommunicationService.class);
124
125 // Add expectation for adding cluster message subscribers which
126 // delegate to our ClusterCommunicationService implementation. This
127 // allows us to get a reference to the map's internal cluster message
128 // handlers so we can induce events coming in from a peer.
129 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
Madan Jampani2af244a2015-02-22 13:12:01 -0800130 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800131 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
132
133 replay(clusterCommunicator);
134
135 clockService = new SequentialClockService<>();
136
137 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
138 .register(KryoNamespaces.API)
139 .register(TestTimestamp.class);
140
141 ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
142 clusterCommunicator,
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800143 serializer, clockService)
144 .withBroadcastMessageExecutor(MoreExecutors.newDirectExecutorService());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800145
146 // Reset ready for tests to add their own expectations
147 reset(clusterCommunicator);
148 }
149
150 @After
151 public void tearDown() {
152 reset(clusterCommunicator);
153 ecMap.destroy();
154 }
155
Ray Milkey8dc82082015-02-20 16:22:38 -0800156 @SuppressWarnings("unchecked")
157 private EventuallyConsistentMapListener<String, String> getListener() {
158 return createMock(EventuallyConsistentMapListener.class);
159 }
160
Jonathan Hart584d2f32015-01-27 19:46:14 -0800161 @Test
162 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800163 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800164
165 assertEquals(0, ecMap.size());
166 ecMap.put(KEY1, VALUE1);
167 assertEquals(1, ecMap.size());
168 ecMap.put(KEY1, VALUE2);
169 assertEquals(1, ecMap.size());
170 ecMap.put(KEY2, VALUE2);
171 assertEquals(2, ecMap.size());
172 for (int i = 0; i < 10; i++) {
173 ecMap.put("" + i, "" + i);
174 }
175 assertEquals(12, ecMap.size());
176 ecMap.remove(KEY1);
177 assertEquals(11, ecMap.size());
178 ecMap.remove(KEY1);
179 assertEquals(11, ecMap.size());
180 }
181
182 @Test
183 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800184 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800185
186 assertTrue(ecMap.isEmpty());
187 ecMap.put(KEY1, VALUE1);
188 assertFalse(ecMap.isEmpty());
189 ecMap.remove(KEY1);
190 assertTrue(ecMap.isEmpty());
191 }
192
193 @Test
194 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800195 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800196
197 assertFalse(ecMap.containsKey(KEY1));
198 ecMap.put(KEY1, VALUE1);
199 assertTrue(ecMap.containsKey(KEY1));
200 assertFalse(ecMap.containsKey(KEY2));
201 ecMap.remove(KEY1);
202 assertFalse(ecMap.containsKey(KEY1));
203 }
204
205 @Test
206 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800207 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800208
209 assertFalse(ecMap.containsValue(VALUE1));
210 ecMap.put(KEY1, VALUE1);
211 assertTrue(ecMap.containsValue(VALUE1));
212 assertFalse(ecMap.containsValue(VALUE2));
213 ecMap.put(KEY1, VALUE2);
214 assertFalse(ecMap.containsValue(VALUE1));
215 assertTrue(ecMap.containsValue(VALUE2));
216 ecMap.remove(KEY1);
217 assertFalse(ecMap.containsValue(VALUE2));
218 }
219
220 @Test
221 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800222 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800223
224 CountDownLatch latch;
225
226 // Local put
227 assertNull(ecMap.get(KEY1));
228 ecMap.put(KEY1, VALUE1);
229 assertEquals(VALUE1, ecMap.get(KEY1));
230
231 // Remote put
232 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800233 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800234
235 // Create a latch so we know when the put operation has finished
236 latch = new CountDownLatch(1);
237 ecMap.addListener(new TestListener(latch));
238
239 assertNull(ecMap.get(KEY2));
240 putHandler.handle(message);
241 assertTrue("External listener never got notified of internal event",
242 latch.await(100, TimeUnit.MILLISECONDS));
243 assertEquals(VALUE2, ecMap.get(KEY2));
244
245 // Local remove
246 ecMap.remove(KEY2);
247 assertNull(ecMap.get(KEY2));
248
249 // Remote remove
250 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800251 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800252
253 // Create a latch so we know when the remove operation has finished
254 latch = new CountDownLatch(1);
255 ecMap.addListener(new TestListener(latch));
256
257 removeHandler.handle(removeMessage);
258 assertTrue("External listener never got notified of internal event",
259 latch.await(100, TimeUnit.MILLISECONDS));
260 assertNull(ecMap.get(KEY1));
261 }
262
263 @Test
264 public void testPut() throws Exception {
265 // Set up expectations of external events to be sent to listeners during
266 // the test. These don't use timestamps so we can set them all up at once.
267 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800268 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800269 listener.event(new EventuallyConsistentMapEvent<>(
270 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
271 listener.event(new EventuallyConsistentMapEvent<>(
272 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
273 replay(listener);
274
275 ecMap.addListener(listener);
276
277 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800278 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Jonathan Hart584d2f32015-01-27 19:46:14 -0800279 .peekAtNextTimestamp()), clusterCommunicator);
280
281 // Put first value
282 assertNull(ecMap.get(KEY1));
283 ecMap.put(KEY1, VALUE1);
284 assertEquals(VALUE1, ecMap.get(KEY1));
285
286 verify(clusterCommunicator);
287
288 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800289 expectSpecificMulticastMessage(generatePutMessage(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800290 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
291
292 // Update same key to a new value
293 ecMap.put(KEY1, VALUE2);
294 assertEquals(VALUE2, ecMap.get(KEY1));
295
296 verify(clusterCommunicator);
297
298 // Do a put with a older timestamp than the value already there.
299 // The map data should not be changed and no notifications should be sent.
300 reset(clusterCommunicator);
301 replay(clusterCommunicator);
302
303 clockService.turnBackTime();
304 ecMap.put(KEY1, VALUE1);
305 // Value should not have changed.
306 assertEquals(VALUE2, ecMap.get(KEY1));
307
308 verify(clusterCommunicator);
309
310 // Check that our listener received the correct events during the test
311 verify(listener);
312 }
313
314 @Test
315 public void testRemove() throws Exception {
316 // Set up expectations of external events to be sent to listeners during
317 // the test. These don't use timestamps so we can set them all up at once.
318 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800319 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800320 listener.event(new EventuallyConsistentMapEvent<>(
321 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
322 expectLastCall().times(2);
323 listener.event(new EventuallyConsistentMapEvent<>(
324 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
325 listener.event(new EventuallyConsistentMapEvent<>(
326 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
327 replay(listener);
328
329 ecMap.addListener(listener);
330
331 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800332 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800333 ecMap.put(KEY1, VALUE1);
334 assertEquals(VALUE1, ecMap.get(KEY1));
335
336 // Remove the value and check the correct internal cluster messages
337 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800338 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
339 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800340
341 ecMap.remove(KEY1);
342 assertNull(ecMap.get(KEY1));
343
344 verify(clusterCommunicator);
345
346 // Remove the same value again. Even though the value is no longer in
347 // the map, we expect that the tombstone is updated and another remove
348 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800349 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
350 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800351
352 ecMap.remove(KEY1);
353 assertNull(ecMap.get(KEY1));
354
355 verify(clusterCommunicator);
356
357
358 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800359 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800360
361 ecMap.put(KEY2, VALUE2);
362
363 clockService.turnBackTime();
364
365 // Remove should have no effect, since it has an older timestamp than
366 // the put. Expect no notifications to be sent out
367 reset(clusterCommunicator);
368 replay(clusterCommunicator);
369
370 ecMap.remove(KEY2);
371
372 verify(clusterCommunicator);
373
374 // Check that our listener received the correct events during the test
375 verify(listener);
376 }
377
378 @Test
379 public void testPutAll() throws Exception {
380 // putAll() with an empty map is a no-op - no messages will be sent
381 reset(clusterCommunicator);
382 replay(clusterCommunicator);
383
384 ecMap.putAll(new HashMap<>());
385
386 verify(clusterCommunicator);
387
388 // Set up the listener with our expected events
389 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800390 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800391 listener.event(new EventuallyConsistentMapEvent<>(
392 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
393 listener.event(new EventuallyConsistentMapEvent<>(
394 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
395 replay(listener);
396
397 ecMap.addListener(listener);
398
399 // Expect a multi-update inter-instance message
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800400 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
401 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800402
403 Map<String, String> putAllValues = new HashMap<>();
404 putAllValues.put(KEY1, VALUE1);
405 putAllValues.put(KEY2, VALUE2);
406
407 // Put the values in the map
408 ecMap.putAll(putAllValues);
409
410 // Check the correct messages and events were sent
411 verify(clusterCommunicator);
412 verify(listener);
413 }
414
415 @Test
416 public void testClear() throws Exception {
417 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800418 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800419 listener.event(new EventuallyConsistentMapEvent<>(
420 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
421 listener.event(new EventuallyConsistentMapEvent<>(
422 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
423 replay(listener);
424
425 // clear() on an empty map is a no-op - no messages will be sent
426 reset(clusterCommunicator);
427 replay(clusterCommunicator);
428
429 assertTrue(ecMap.isEmpty());
430 ecMap.clear();
431 verify(clusterCommunicator);
432
433 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800434 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800435 ecMap.put(KEY1, VALUE1);
436 ecMap.put(KEY2, VALUE2);
437
438 ecMap.addListener(listener);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800439 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800440
441 ecMap.clear();
442
443 verify(clusterCommunicator);
444 verify(listener);
445 }
446
447 @Test
448 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800449 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800450
451 assertTrue(ecMap.keySet().isEmpty());
452
453 // Generate some keys
454 Set<String> keys = new HashSet<>();
455 for (int i = 1; i <= 10; i++) {
456 keys.add("" + i);
457 }
458
459 // Put each key in the map
460 keys.forEach(k -> ecMap.put(k, "value" + k));
461
462 // Check keySet() returns the correct value
463 assertEquals(keys, ecMap.keySet());
464
465 // Update the value for one of the keys
466 ecMap.put(keys.iterator().next(), "new-value");
467
468 // Check the key set is still the same
469 assertEquals(keys, ecMap.keySet());
470
471 // Remove a key
472 String removeKey = keys.iterator().next();
473 keys.remove(removeKey);
474 ecMap.remove(removeKey);
475
476 // Check the key set is still correct
477 assertEquals(keys, ecMap.keySet());
478 }
479
480 @Test
481 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800482 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800483
484 assertTrue(ecMap.values().isEmpty());
485
486 // Generate some values
487 Map<String, String> expectedValues = new HashMap<>();
488 for (int i = 1; i <= 10; i++) {
489 expectedValues.put("" + i, "value" + i);
490 }
491
492 // Add them into the map
493 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
494
495 // Check the values collection is correct
496 assertEquals(expectedValues.values().size(), ecMap.values().size());
497 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
498
499 // Update the value for one of the keys
500 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
501 expectedValues.put(first.getKey(), "new-value");
502 ecMap.put(first.getKey(), "new-value");
503
504 // Check the values collection is still correct
505 assertEquals(expectedValues.values().size(), ecMap.values().size());
506 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
507
508 // Remove a key
509 String removeKey = expectedValues.keySet().iterator().next();
510 expectedValues.remove(removeKey);
511 ecMap.remove(removeKey);
512
513 // Check the values collection is still correct
514 assertEquals(expectedValues.values().size(), ecMap.values().size());
515 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
516 }
517
518 @Test
519 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800520 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800521
522 assertTrue(ecMap.entrySet().isEmpty());
523
524 // Generate some values
525 Map<String, String> expectedValues = new HashMap<>();
526 for (int i = 1; i <= 10; i++) {
527 expectedValues.put("" + i, "value" + i);
528 }
529
530 // Add them into the map
531 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
532
533 // Check the entry set is correct
534 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
535
536 // Update the value for one of the keys
537 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
538 expectedValues.put(first.getKey(), "new-value");
539 ecMap.put(first.getKey(), "new-value");
540
541 // Check the entry set is still correct
542 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
543
544 // Remove a key
545 String removeKey = expectedValues.keySet().iterator().next();
546 expectedValues.remove(removeKey);
547 ecMap.remove(removeKey);
548
549 // Check the entry set is still correct
550 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
551 }
552
553 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
554 if (expectedMap.entrySet().size() != actual.size()) {
555 return false;
556 }
557
558 for (Map.Entry<String, String> e : actual) {
559 if (!expectedMap.containsKey(e.getKey())) {
560 return false;
561 }
562 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
563 return false;
564 }
565 }
566 return true;
567 }
568
569 @Test
570 public void testDestroy() throws Exception {
571 clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
572 clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
573 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
574
575 replay(clusterCommunicator);
576
577 ecMap.destroy();
578
579 verify(clusterCommunicator);
580
581 try {
582 ecMap.get(KEY1);
583 fail("get after destroy should throw exception");
584 } catch (IllegalStateException e) {
585 assertTrue(true);
586 }
587
588 try {
589 ecMap.put(KEY1, VALUE1);
590 fail("put after destroy should throw exception");
591 } catch (IllegalStateException e) {
592 assertTrue(true);
593 }
594 }
595
596 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800597 InternalPutEvent<String, String> event =
598 new InternalPutEvent<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800599
600 return new ClusterMessage(
601 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
602 SERIALIZER.encode(event));
603 }
604
605 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800606 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800607
608 Timestamp timestamp1 = clockService.peek(1);
609 Timestamp timestamp2 = clockService.peek(2);
610
Jonathan Hartf9108232015-02-02 16:37:35 -0800611 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
612 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800613
614 list.add(pe1);
615 list.add(pe2);
616
Jonathan Hartf9108232015-02-02 16:37:35 -0800617 InternalPutEvent<String, String> event = new InternalPutEvent<>(list);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800618
619 return new ClusterMessage(
620 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
621 SERIALIZER.encode(event));
622 }
623
624 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800625 InternalRemoveEvent<String> event = new InternalRemoveEvent<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800626
627 return new ClusterMessage(
628 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
629 SERIALIZER.encode(event));
630 }
631
632 private ClusterMessage generateRemoveMessage(String key1, String key2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800633 ArrayList<RemoveEntry<String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800634
635 Timestamp timestamp1 = clockService.peek(1);
636 Timestamp timestamp2 = clockService.peek(2);
637
Jonathan Hartf9108232015-02-02 16:37:35 -0800638 RemoveEntry<String> re1 = new RemoveEntry<>(key1, timestamp1);
639 RemoveEntry<String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800640
641 list.add(re1);
642 list.add(re2);
643
Jonathan Hartf9108232015-02-02 16:37:35 -0800644 InternalRemoveEvent<String> event = new InternalRemoveEvent<>(list);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800645
646 return new ClusterMessage(
647 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
648 SERIALIZER.encode(event));
649 }
650
651 /**
652 * Sets up a mock ClusterCommunicationService to expect a specific cluster
653 * message to be broadcast to the cluster.
654 *
655 * @param m message we expect to be sent
656 * @param clusterCommunicator a mock ClusterCommunicationService to set up
657 */
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800658 private static void expectSpecificBroadcastMessage(ClusterMessage m,
659 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800660 reset(clusterCommunicator);
661 expect(clusterCommunicator.broadcast(m)).andReturn(true);
662 replay(clusterCommunicator);
663 }
664
665 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800666 * Sets up a mock ClusterCommunicationService to expect a specific cluster
667 * message to be multicast to the cluster.
668 *
669 * @param m message we expect to be sent
670 * @param clusterCommunicator a mock ClusterCommunicationService to set up
671 */
672 private static void expectSpecificMulticastMessage(ClusterMessage m,
673 ClusterCommunicationService clusterCommunicator) {
674 reset(clusterCommunicator);
675 expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
676 replay(clusterCommunicator);
677 }
678
679
680 /**
681 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800682 * that is sent to it. This is useful for unit tests where we aren't
683 * interested in testing the messaging component.
684 *
685 * @param clusterCommunicator a mock ClusterCommunicationService to set up
686 */
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800687 private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
688 reset(clusterCommunicator);
689 expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
690 anyObject(Iterable.class)))
691 .andReturn(true)
692 .anyTimes();
693 replay(clusterCommunicator);
694 }
695
696 /**
697 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
698 * that is sent to it. This is useful for unit tests where we aren't
699 * interested in testing the messaging component.
700 *
701 * @param clusterCommunicator a mock ClusterCommunicationService to set up
702 */
703 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800704 reset(clusterCommunicator);
705 expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
706 .andReturn(true)
707 .anyTimes();
708 replay(clusterCommunicator);
709 }
710
711 /**
712 * ClusterCommunicationService implementation that the map's addSubscriber
713 * call will delegate to. This means we can get a reference to the
714 * internal cluster message handler used by the map, so that we can simulate
715 * events coming in from other instances.
716 */
717 private final class TestClusterCommunicationService
718 implements ClusterCommunicationService {
719
720 @Override
721 public boolean broadcast(ClusterMessage message) {
722 return false;
723 }
724
725 @Override
726 public boolean broadcastIncludeSelf(ClusterMessage message) {
727 return false;
728 }
729
730 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800731 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800732 return false;
733 }
734
735 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800736 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800737 return false;
738 }
739
740 @Override
741 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
742 NodeId toNodeId)
743 throws IOException {
744 return null;
745 }
746
747 @Override
748 public void addSubscriber(MessageSubject subject,
749 ClusterMessageHandler subscriber) {
750 if (subject.equals(PUT_MESSAGE_SUBJECT)) {
751 putHandler = subscriber;
752 } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
753 removeHandler = subscriber;
754 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
755 antiEntropyHandler = subscriber;
756 } else {
757 throw new RuntimeException("Unexpected message subject " + subject.toString());
758 }
759 }
760
761 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800762 public void addSubscriber(MessageSubject subject,
763 ClusterMessageHandler subscriber,
764 ExecutorService executor) {
765 if (subject.equals(PUT_MESSAGE_SUBJECT)) {
766 putHandler = subscriber;
767 } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
768 removeHandler = subscriber;
769 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
770 antiEntropyHandler = subscriber;
771 } else {
772 throw new RuntimeException("Unexpected message subject " + subject.toString());
773 }
774 }
775
776 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800777 public void removeSubscriber(MessageSubject subject) {}
778 }
779
780 /**
781 * ClockService implementation that gives out timestamps based on a
782 * sequential counter. This clock service enables more control over the
783 * timestamps that are given out, including being able to "turn back time"
784 * to give out timestamps from the past.
785 *
786 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800787 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800788 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800789 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800790
791 private static final long INITIAL_VALUE = 1;
792 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
793
794 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800795 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800796 return new TestTimestamp(counter.getAndIncrement());
797 }
798
799 /**
800 * Returns what the next timestamp will be without consuming the
801 * timestamp. This allows test code to set expectations correctly while
802 * still allowing the CUT to get the same timestamp.
803 *
804 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800805 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800806 */
807 public Timestamp peekAtNextTimestamp() {
808 return peek(1);
809 }
810
811 /**
812 * Returns the ith timestamp to be given out in the future without
813 * consuming the timestamp. For example, i=1 returns the next timestamp,
814 * i=2 returns the timestamp after that, and so on.
815 *
816 * @param i number of the timestamp to peek at
817 * @return the ith timestamp that will be given out
818 */
819 public Timestamp peek(int i) {
820 checkArgument(i > 0, "i must be a positive integer");
821
822 return new TestTimestamp(counter.get() + i - 1);
823 }
824
825 /**
826 * Turns the clock back two ticks, so the next call to getTimestamp will
827 * return an older timestamp than the previous call to getTimestamp.
828 */
829 public void turnBackTime() {
830 // Not atomic, but should be OK for these tests.
831 counter.decrementAndGet();
832 counter.decrementAndGet();
833 }
834
835 }
836
837 /**
838 * Timestamp implementation where the value of the timestamp can be
839 * specified explicitly at creation time.
840 */
841 private class TestTimestamp implements Timestamp {
842
843 private final long timestamp;
844
845 /**
846 * Creates a new timestamp that has the specified value.
847 *
848 * @param timestamp value of the timestamp
849 */
850 public TestTimestamp(long timestamp) {
851 this.timestamp = timestamp;
852 }
853
854 @Override
855 public int compareTo(Timestamp o) {
856 checkArgument(o instanceof TestTimestamp);
857 TestTimestamp otherTimestamp = (TestTimestamp) o;
858 return ComparisonChain.start()
859 .compare(this.timestamp, otherTimestamp.timestamp)
860 .result();
861 }
862 }
863
864 /**
865 * EventuallyConsistentMapListener implementation which triggers a latch
866 * when it receives an event.
867 */
868 private class TestListener implements EventuallyConsistentMapListener<String, String> {
869 private CountDownLatch latch;
870
871 /**
872 * Creates a new listener that will trigger the specified latch when it
873 * receives and event.
874 *
875 * @param latch the latch to trigger on events
876 */
877 public TestListener(CountDownLatch latch) {
878 this.latch = latch;
879 }
880
881 @Override
882 public void event(EventuallyConsistentMapEvent<String, String> event) {
883 latch.countDown();
884 }
885 }
886}