blob: 5269d3931c9401a690a430a529ebdae83041fecf [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
18import com.google.common.collect.ComparisonChain;
19import com.google.common.util.concurrent.ListenableFuture;
20import org.junit.After;
21import org.junit.Before;
22import org.junit.Test;
23import org.onlab.packet.IpAddress;
24import org.onlab.util.KryoNamespace;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.DefaultControllerNode;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.store.Timestamp;
30import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
31import org.onosproject.store.cluster.messaging.ClusterMessage;
32import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
33import org.onosproject.store.cluster.messaging.MessageSubject;
34import org.onosproject.store.serializers.KryoNamespaces;
35import org.onosproject.store.serializers.KryoSerializer;
36
37import java.io.IOException;
38import java.util.ArrayList;
39import java.util.HashMap;
40import java.util.HashSet;
41import java.util.Map;
42import java.util.Objects;
43import java.util.Set;
44import java.util.concurrent.CountDownLatch;
45import java.util.concurrent.TimeUnit;
46import java.util.concurrent.atomic.AtomicLong;
47
48import static com.google.common.base.Preconditions.checkArgument;
49import static junit.framework.TestCase.assertFalse;
50import static org.easymock.EasyMock.*;
51import static org.junit.Assert.assertEquals;
52import static org.junit.Assert.assertNull;
53import static org.junit.Assert.assertTrue;
54import static org.junit.Assert.fail;
55
56/**
57 * Unit tests for EventuallyConsistentMapImpl.
58 */
59public class EventuallyConsistentMapImplTest {
60
61 private EventuallyConsistentMap<String, String> ecMap;
62
63 private ClusterService clusterService;
64 private ClusterCommunicationService clusterCommunicator;
65 private SequentialClockService<String> clockService;
66
67 private static final String MAP_NAME = "test";
68 private static final MessageSubject PUT_MESSAGE_SUBJECT
69 = new MessageSubject("ecm-" + MAP_NAME + "-update");
70 private static final MessageSubject REMOVE_MESSAGE_SUBJECT
71 = new MessageSubject("ecm-" + MAP_NAME + "-remove");
72 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
73 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
74
75 private static final String KEY1 = "one";
76 private static final String KEY2 = "two";
77 private static final String VALUE1 = "oneValue";
78 private static final String VALUE2 = "twoValue";
79
80 private final ControllerNode self =
81 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
82
83 private ClusterMessageHandler putHandler;
84 private ClusterMessageHandler removeHandler;
85 private ClusterMessageHandler antiEntropyHandler;
86
87 /*
88 * Serialization is a bit tricky here. We need to serialize in the tests
89 * to set the expectations, which will use this serializer here, but the
90 * EventuallyConsistentMap will use its own internal serializer. This means
91 * this serializer must be set up exactly the same as map's internal
92 * serializer.
93 */
94 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
95 @Override
96 protected void setupKryoPool() {
97 serializerPool = KryoNamespace.newBuilder()
98 // Classes we give to the map
99 .register(KryoNamespaces.API)
100 .register(TestTimestamp.class)
101 // Below is the classes that the map internally registers
102 .register(WallClockTimestamp.class)
103 .register(EventuallyConsistentMapImpl.PutEntry.class)
104 .register(EventuallyConsistentMapImpl.RemoveEntry.class)
105 .register(ArrayList.class)
106 .register(EventuallyConsistentMapImpl.InternalPutEvent.class)
107 .register(EventuallyConsistentMapImpl.InternalRemoveEvent.class)
108 .register(AntiEntropyAdvertisement.class)
109 .register(HashMap.class)
110 .build();
111 }
112 };
113
114 @Before
115 public void setUp() throws Exception {
116 clusterService = createMock(ClusterService.class);
117 expect(clusterService.getLocalNode()).andReturn(self)
118 .anyTimes();
119 replay(clusterService);
120
121 clusterCommunicator = createMock(ClusterCommunicationService.class);
122
123 // Add expectation for adding cluster message subscribers which
124 // delegate to our ClusterCommunicationService implementation. This
125 // allows us to get a reference to the map's internal cluster message
126 // handlers so we can induce events coming in from a peer.
127 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
128 anyObject(ClusterMessageHandler.class));
129 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
130
131 replay(clusterCommunicator);
132
133 clockService = new SequentialClockService<>();
134
135 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
136 .register(KryoNamespaces.API)
137 .register(TestTimestamp.class);
138
139 ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
140 clusterCommunicator,
141 serializer, clockService);
142
143 // Reset ready for tests to add their own expectations
144 reset(clusterCommunicator);
145 }
146
147 @After
148 public void tearDown() {
149 reset(clusterCommunicator);
150 ecMap.destroy();
151 }
152
153 @Test
154 public void testSize() throws Exception {
155 expectAnyMessage(clusterCommunicator);
156
157 assertEquals(0, ecMap.size());
158 ecMap.put(KEY1, VALUE1);
159 assertEquals(1, ecMap.size());
160 ecMap.put(KEY1, VALUE2);
161 assertEquals(1, ecMap.size());
162 ecMap.put(KEY2, VALUE2);
163 assertEquals(2, ecMap.size());
164 for (int i = 0; i < 10; i++) {
165 ecMap.put("" + i, "" + i);
166 }
167 assertEquals(12, ecMap.size());
168 ecMap.remove(KEY1);
169 assertEquals(11, ecMap.size());
170 ecMap.remove(KEY1);
171 assertEquals(11, ecMap.size());
172 }
173
174 @Test
175 public void testIsEmpty() throws Exception {
176 expectAnyMessage(clusterCommunicator);
177
178 assertTrue(ecMap.isEmpty());
179 ecMap.put(KEY1, VALUE1);
180 assertFalse(ecMap.isEmpty());
181 ecMap.remove(KEY1);
182 assertTrue(ecMap.isEmpty());
183 }
184
185 @Test
186 public void testContainsKey() throws Exception {
187 expectAnyMessage(clusterCommunicator);
188
189 assertFalse(ecMap.containsKey(KEY1));
190 ecMap.put(KEY1, VALUE1);
191 assertTrue(ecMap.containsKey(KEY1));
192 assertFalse(ecMap.containsKey(KEY2));
193 ecMap.remove(KEY1);
194 assertFalse(ecMap.containsKey(KEY1));
195 }
196
197 @Test
198 public void testContainsValue() throws Exception {
199 expectAnyMessage(clusterCommunicator);
200
201 assertFalse(ecMap.containsValue(VALUE1));
202 ecMap.put(KEY1, VALUE1);
203 assertTrue(ecMap.containsValue(VALUE1));
204 assertFalse(ecMap.containsValue(VALUE2));
205 ecMap.put(KEY1, VALUE2);
206 assertFalse(ecMap.containsValue(VALUE1));
207 assertTrue(ecMap.containsValue(VALUE2));
208 ecMap.remove(KEY1);
209 assertFalse(ecMap.containsValue(VALUE2));
210 }
211
212 @Test
213 public void testGet() throws Exception {
214 expectAnyMessage(clusterCommunicator);
215
216 CountDownLatch latch;
217
218 // Local put
219 assertNull(ecMap.get(KEY1));
220 ecMap.put(KEY1, VALUE1);
221 assertEquals(VALUE1, ecMap.get(KEY1));
222
223 // Remote put
224 ClusterMessage message
225 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2));
226
227 // Create a latch so we know when the put operation has finished
228 latch = new CountDownLatch(1);
229 ecMap.addListener(new TestListener(latch));
230
231 assertNull(ecMap.get(KEY2));
232 putHandler.handle(message);
233 assertTrue("External listener never got notified of internal event",
234 latch.await(100, TimeUnit.MILLISECONDS));
235 assertEquals(VALUE2, ecMap.get(KEY2));
236
237 // Local remove
238 ecMap.remove(KEY2);
239 assertNull(ecMap.get(KEY2));
240
241 // Remote remove
242 ClusterMessage removeMessage
243 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1));
244
245 // Create a latch so we know when the remove operation has finished
246 latch = new CountDownLatch(1);
247 ecMap.addListener(new TestListener(latch));
248
249 removeHandler.handle(removeMessage);
250 assertTrue("External listener never got notified of internal event",
251 latch.await(100, TimeUnit.MILLISECONDS));
252 assertNull(ecMap.get(KEY1));
253 }
254
255 @Test
256 public void testPut() throws Exception {
257 // Set up expectations of external events to be sent to listeners during
258 // the test. These don't use timestamps so we can set them all up at once.
259 EventuallyConsistentMapListener<String, String> listener
260 = createMock(EventuallyConsistentMapListener.class);
261 listener.event(new EventuallyConsistentMapEvent<>(
262 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
263 listener.event(new EventuallyConsistentMapEvent<>(
264 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
265 replay(listener);
266
267 ecMap.addListener(listener);
268
269 // Set up expected internal message to be broadcast to peers on first put
270 expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
271 .peekAtNextTimestamp()), clusterCommunicator);
272
273 // Put first value
274 assertNull(ecMap.get(KEY1));
275 ecMap.put(KEY1, VALUE1);
276 assertEquals(VALUE1, ecMap.get(KEY1));
277
278 verify(clusterCommunicator);
279
280 // Set up expected internal message to be broadcast to peers on second put
281 expectSpecificMessage(generatePutMessage(
282 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
283
284 // Update same key to a new value
285 ecMap.put(KEY1, VALUE2);
286 assertEquals(VALUE2, ecMap.get(KEY1));
287
288 verify(clusterCommunicator);
289
290 // Do a put with a older timestamp than the value already there.
291 // The map data should not be changed and no notifications should be sent.
292 reset(clusterCommunicator);
293 replay(clusterCommunicator);
294
295 clockService.turnBackTime();
296 ecMap.put(KEY1, VALUE1);
297 // Value should not have changed.
298 assertEquals(VALUE2, ecMap.get(KEY1));
299
300 verify(clusterCommunicator);
301
302 // Check that our listener received the correct events during the test
303 verify(listener);
304 }
305
306 @Test
307 public void testRemove() throws Exception {
308 // Set up expectations of external events to be sent to listeners during
309 // the test. These don't use timestamps so we can set them all up at once.
310 EventuallyConsistentMapListener<String, String> listener
311 = createMock(EventuallyConsistentMapListener.class);
312 listener.event(new EventuallyConsistentMapEvent<>(
313 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
314 expectLastCall().times(2);
315 listener.event(new EventuallyConsistentMapEvent<>(
316 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
317 listener.event(new EventuallyConsistentMapEvent<>(
318 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
319 replay(listener);
320
321 ecMap.addListener(listener);
322
323 // Put in an initial value
324 expectAnyMessage(clusterCommunicator);
325 ecMap.put(KEY1, VALUE1);
326 assertEquals(VALUE1, ecMap.get(KEY1));
327
328 // Remove the value and check the correct internal cluster messages
329 // are sent
330 expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
331 clusterCommunicator);
332
333 ecMap.remove(KEY1);
334 assertNull(ecMap.get(KEY1));
335
336 verify(clusterCommunicator);
337
338 // Remove the same value again. Even though the value is no longer in
339 // the map, we expect that the tombstone is updated and another remove
340 // event is sent to the cluster and external listeners.
341 expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
342 clusterCommunicator);
343
344 ecMap.remove(KEY1);
345 assertNull(ecMap.get(KEY1));
346
347 verify(clusterCommunicator);
348
349
350 // Put in a new value for us to try and remove
351 expectAnyMessage(clusterCommunicator);
352
353 ecMap.put(KEY2, VALUE2);
354
355 clockService.turnBackTime();
356
357 // Remove should have no effect, since it has an older timestamp than
358 // the put. Expect no notifications to be sent out
359 reset(clusterCommunicator);
360 replay(clusterCommunicator);
361
362 ecMap.remove(KEY2);
363
364 verify(clusterCommunicator);
365
366 // Check that our listener received the correct events during the test
367 verify(listener);
368 }
369
370 @Test
371 public void testPutAll() throws Exception {
372 // putAll() with an empty map is a no-op - no messages will be sent
373 reset(clusterCommunicator);
374 replay(clusterCommunicator);
375
376 ecMap.putAll(new HashMap<>());
377
378 verify(clusterCommunicator);
379
380 // Set up the listener with our expected events
381 EventuallyConsistentMapListener<String, String> listener
382 = createMock(EventuallyConsistentMapListener.class);
383 listener.event(new EventuallyConsistentMapEvent<>(
384 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
385 listener.event(new EventuallyConsistentMapEvent<>(
386 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
387 replay(listener);
388
389 ecMap.addListener(listener);
390
391 // Expect a multi-update inter-instance message
392 expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
393 clusterCommunicator);
394
395 Map<String, String> putAllValues = new HashMap<>();
396 putAllValues.put(KEY1, VALUE1);
397 putAllValues.put(KEY2, VALUE2);
398
399 // Put the values in the map
400 ecMap.putAll(putAllValues);
401
402 // Check the correct messages and events were sent
403 verify(clusterCommunicator);
404 verify(listener);
405 }
406
407 @Test
408 public void testClear() throws Exception {
409 EventuallyConsistentMapListener<String, String> listener
410 = createMock(EventuallyConsistentMapListener.class);
411 listener.event(new EventuallyConsistentMapEvent<>(
412 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
413 listener.event(new EventuallyConsistentMapEvent<>(
414 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
415 replay(listener);
416
417 // clear() on an empty map is a no-op - no messages will be sent
418 reset(clusterCommunicator);
419 replay(clusterCommunicator);
420
421 assertTrue(ecMap.isEmpty());
422 ecMap.clear();
423 verify(clusterCommunicator);
424
425 // Put some items in the map
426 expectAnyMessage(clusterCommunicator);
427 ecMap.put(KEY1, VALUE1);
428 ecMap.put(KEY2, VALUE2);
429
430 ecMap.addListener(listener);
431 expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
432
433 ecMap.clear();
434
435 verify(clusterCommunicator);
436 verify(listener);
437 }
438
439 @Test
440 public void testKeySet() throws Exception {
441 expectAnyMessage(clusterCommunicator);
442
443 assertTrue(ecMap.keySet().isEmpty());
444
445 // Generate some keys
446 Set<String> keys = new HashSet<>();
447 for (int i = 1; i <= 10; i++) {
448 keys.add("" + i);
449 }
450
451 // Put each key in the map
452 keys.forEach(k -> ecMap.put(k, "value" + k));
453
454 // Check keySet() returns the correct value
455 assertEquals(keys, ecMap.keySet());
456
457 // Update the value for one of the keys
458 ecMap.put(keys.iterator().next(), "new-value");
459
460 // Check the key set is still the same
461 assertEquals(keys, ecMap.keySet());
462
463 // Remove a key
464 String removeKey = keys.iterator().next();
465 keys.remove(removeKey);
466 ecMap.remove(removeKey);
467
468 // Check the key set is still correct
469 assertEquals(keys, ecMap.keySet());
470 }
471
472 @Test
473 public void testValues() throws Exception {
474 expectAnyMessage(clusterCommunicator);
475
476 assertTrue(ecMap.values().isEmpty());
477
478 // Generate some values
479 Map<String, String> expectedValues = new HashMap<>();
480 for (int i = 1; i <= 10; i++) {
481 expectedValues.put("" + i, "value" + i);
482 }
483
484 // Add them into the map
485 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
486
487 // Check the values collection is correct
488 assertEquals(expectedValues.values().size(), ecMap.values().size());
489 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
490
491 // Update the value for one of the keys
492 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
493 expectedValues.put(first.getKey(), "new-value");
494 ecMap.put(first.getKey(), "new-value");
495
496 // Check the values collection is still correct
497 assertEquals(expectedValues.values().size(), ecMap.values().size());
498 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
499
500 // Remove a key
501 String removeKey = expectedValues.keySet().iterator().next();
502 expectedValues.remove(removeKey);
503 ecMap.remove(removeKey);
504
505 // Check the values collection is still correct
506 assertEquals(expectedValues.values().size(), ecMap.values().size());
507 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
508 }
509
510 @Test
511 public void testEntrySet() throws Exception {
512 expectAnyMessage(clusterCommunicator);
513
514 assertTrue(ecMap.entrySet().isEmpty());
515
516 // Generate some values
517 Map<String, String> expectedValues = new HashMap<>();
518 for (int i = 1; i <= 10; i++) {
519 expectedValues.put("" + i, "value" + i);
520 }
521
522 // Add them into the map
523 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
524
525 // Check the entry set is correct
526 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
527
528 // Update the value for one of the keys
529 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
530 expectedValues.put(first.getKey(), "new-value");
531 ecMap.put(first.getKey(), "new-value");
532
533 // Check the entry set is still correct
534 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
535
536 // Remove a key
537 String removeKey = expectedValues.keySet().iterator().next();
538 expectedValues.remove(removeKey);
539 ecMap.remove(removeKey);
540
541 // Check the entry set is still correct
542 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
543 }
544
545 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
546 if (expectedMap.entrySet().size() != actual.size()) {
547 return false;
548 }
549
550 for (Map.Entry<String, String> e : actual) {
551 if (!expectedMap.containsKey(e.getKey())) {
552 return false;
553 }
554 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
555 return false;
556 }
557 }
558 return true;
559 }
560
561 @Test
562 public void testDestroy() throws Exception {
563 clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
564 clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
565 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
566
567 replay(clusterCommunicator);
568
569 ecMap.destroy();
570
571 verify(clusterCommunicator);
572
573 try {
574 ecMap.get(KEY1);
575 fail("get after destroy should throw exception");
576 } catch (IllegalStateException e) {
577 assertTrue(true);
578 }
579
580 try {
581 ecMap.put(KEY1, VALUE1);
582 fail("put after destroy should throw exception");
583 } catch (IllegalStateException e) {
584 assertTrue(true);
585 }
586 }
587
588 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
589 EventuallyConsistentMapImpl.InternalPutEvent<String, String> event =
590 new EventuallyConsistentMapImpl.InternalPutEvent<>(
591 key, value, timestamp);
592
593 return new ClusterMessage(
594 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
595 SERIALIZER.encode(event));
596 }
597
598 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
599 ArrayList<EventuallyConsistentMapImpl.PutEntry<String, String>> list = new ArrayList<>();
600
601 Timestamp timestamp1 = clockService.peek(1);
602 Timestamp timestamp2 = clockService.peek(2);
603
604 EventuallyConsistentMapImpl.PutEntry<String, String> pe1
605 = new EventuallyConsistentMapImpl.PutEntry<>(key1, value1, timestamp1);
606 EventuallyConsistentMapImpl.PutEntry<String, String> pe2
607 = new EventuallyConsistentMapImpl.PutEntry<>(key2, value2, timestamp2);
608
609 list.add(pe1);
610 list.add(pe2);
611
612 EventuallyConsistentMapImpl.InternalPutEvent<String, String> event
613 = new EventuallyConsistentMapImpl.InternalPutEvent<>(list);
614
615 return new ClusterMessage(
616 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
617 SERIALIZER.encode(event));
618 }
619
620 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
621 EventuallyConsistentMapImpl.InternalRemoveEvent<String> event =
622 new EventuallyConsistentMapImpl.InternalRemoveEvent<>(
623 key, timestamp);
624
625 return new ClusterMessage(
626 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
627 SERIALIZER.encode(event));
628 }
629
630 private ClusterMessage generateRemoveMessage(String key1, String key2) {
631 ArrayList<EventuallyConsistentMapImpl.RemoveEntry<String>> list = new ArrayList<>();
632
633 Timestamp timestamp1 = clockService.peek(1);
634 Timestamp timestamp2 = clockService.peek(2);
635
636 EventuallyConsistentMapImpl.RemoveEntry<String> re1
637 = new EventuallyConsistentMapImpl.RemoveEntry<>(key1, timestamp1);
638 EventuallyConsistentMapImpl.RemoveEntry<String> re2
639 = new EventuallyConsistentMapImpl.RemoveEntry<>(key2, timestamp2);
640
641 list.add(re1);
642 list.add(re2);
643
644 EventuallyConsistentMapImpl.InternalRemoveEvent<String> event
645 = new EventuallyConsistentMapImpl.InternalRemoveEvent<>(list);
646
647 return new ClusterMessage(
648 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
649 SERIALIZER.encode(event));
650 }
651
652 /**
653 * Sets up a mock ClusterCommunicationService to expect a specific cluster
654 * message to be broadcast to the cluster.
655 *
656 * @param m message we expect to be sent
657 * @param clusterCommunicator a mock ClusterCommunicationService to set up
658 */
659 private static void expectSpecificMessage(ClusterMessage m,
660 ClusterCommunicationService clusterCommunicator) {
661 reset(clusterCommunicator);
662 expect(clusterCommunicator.broadcast(m)).andReturn(true);
663 replay(clusterCommunicator);
664 }
665
666 /**
667 * Sets up a mock ClusterCommunicationService to expect any cluster message
668 * that is sent to it. This is useful for unit tests where we aren't
669 * interested in testing the messaging component.
670 *
671 * @param clusterCommunicator a mock ClusterCommunicationService to set up
672 */
673 private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
674 reset(clusterCommunicator);
675 expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
676 .andReturn(true)
677 .anyTimes();
678 replay(clusterCommunicator);
679 }
680
681 /**
682 * ClusterCommunicationService implementation that the map's addSubscriber
683 * call will delegate to. This means we can get a reference to the
684 * internal cluster message handler used by the map, so that we can simulate
685 * events coming in from other instances.
686 */
687 private final class TestClusterCommunicationService
688 implements ClusterCommunicationService {
689
690 @Override
691 public boolean broadcast(ClusterMessage message) {
692 return false;
693 }
694
695 @Override
696 public boolean broadcastIncludeSelf(ClusterMessage message) {
697 return false;
698 }
699
700 @Override
701 public boolean unicast(ClusterMessage message, NodeId toNodeId)
702 throws IOException {
703 return false;
704 }
705
706 @Override
707 public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
708 return false;
709 }
710
711 @Override
712 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
713 NodeId toNodeId)
714 throws IOException {
715 return null;
716 }
717
718 @Override
719 public void addSubscriber(MessageSubject subject,
720 ClusterMessageHandler subscriber) {
721 if (subject.equals(PUT_MESSAGE_SUBJECT)) {
722 putHandler = subscriber;
723 } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
724 removeHandler = subscriber;
725 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
726 antiEntropyHandler = subscriber;
727 } else {
728 throw new RuntimeException("Unexpected message subject " + subject.toString());
729 }
730 }
731
732 @Override
733 public void removeSubscriber(MessageSubject subject) {}
734 }
735
736 /**
737 * ClockService implementation that gives out timestamps based on a
738 * sequential counter. This clock service enables more control over the
739 * timestamps that are given out, including being able to "turn back time"
740 * to give out timestamps from the past.
741 *
742 * @param <T> Type that the clock service will give out timestamps for
743 */
744 private class SequentialClockService<T> implements ClockService<T> {
745
746 private static final long INITIAL_VALUE = 1;
747 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
748
749 @Override
750 public Timestamp getTimestamp(T object) {
751 return new TestTimestamp(counter.getAndIncrement());
752 }
753
754 /**
755 * Returns what the next timestamp will be without consuming the
756 * timestamp. This allows test code to set expectations correctly while
757 * still allowing the CUT to get the same timestamp.
758 *
759 * @return timestamp equal to the timestamp that will be returned by the
760 * next call to {@link #getTimestamp(T)}.
761 */
762 public Timestamp peekAtNextTimestamp() {
763 return peek(1);
764 }
765
766 /**
767 * Returns the ith timestamp to be given out in the future without
768 * consuming the timestamp. For example, i=1 returns the next timestamp,
769 * i=2 returns the timestamp after that, and so on.
770 *
771 * @param i number of the timestamp to peek at
772 * @return the ith timestamp that will be given out
773 */
774 public Timestamp peek(int i) {
775 checkArgument(i > 0, "i must be a positive integer");
776
777 return new TestTimestamp(counter.get() + i - 1);
778 }
779
780 /**
781 * Turns the clock back two ticks, so the next call to getTimestamp will
782 * return an older timestamp than the previous call to getTimestamp.
783 */
784 public void turnBackTime() {
785 // Not atomic, but should be OK for these tests.
786 counter.decrementAndGet();
787 counter.decrementAndGet();
788 }
789
790 }
791
792 /**
793 * Timestamp implementation where the value of the timestamp can be
794 * specified explicitly at creation time.
795 */
796 private class TestTimestamp implements Timestamp {
797
798 private final long timestamp;
799
800 /**
801 * Creates a new timestamp that has the specified value.
802 *
803 * @param timestamp value of the timestamp
804 */
805 public TestTimestamp(long timestamp) {
806 this.timestamp = timestamp;
807 }
808
809 @Override
810 public int compareTo(Timestamp o) {
811 checkArgument(o instanceof TestTimestamp);
812 TestTimestamp otherTimestamp = (TestTimestamp) o;
813 return ComparisonChain.start()
814 .compare(this.timestamp, otherTimestamp.timestamp)
815 .result();
816 }
817 }
818
819 /**
820 * EventuallyConsistentMapListener implementation which triggers a latch
821 * when it receives an event.
822 */
823 private class TestListener implements EventuallyConsistentMapListener<String, String> {
824 private CountDownLatch latch;
825
826 /**
827 * Creates a new listener that will trigger the specified latch when it
828 * receives and event.
829 *
830 * @param latch the latch to trigger on events
831 */
832 public TestListener(CountDownLatch latch) {
833 this.latch = latch;
834 }
835
836 @Override
837 public void event(EventuallyConsistentMapEvent<String, String> event) {
838 latch.countDown();
839 }
840 }
841}