blob: bc0942eb531096834568f6018bdae4d33217e0eb [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.impl;
17
18import com.google.common.collect.ComparisonChain;
19import com.google.common.util.concurrent.ListenableFuture;
20import org.junit.After;
21import org.junit.Before;
22import org.junit.Test;
23import org.onlab.packet.IpAddress;
24import org.onlab.util.KryoNamespace;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.DefaultControllerNode;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.store.Timestamp;
30import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
31import org.onosproject.store.cluster.messaging.ClusterMessage;
32import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
33import org.onosproject.store.cluster.messaging.MessageSubject;
34import org.onosproject.store.serializers.KryoNamespaces;
35import org.onosproject.store.serializers.KryoSerializer;
36
37import java.io.IOException;
38import java.util.ArrayList;
39import java.util.HashMap;
40import java.util.HashSet;
41import java.util.Map;
42import java.util.Objects;
43import java.util.Set;
44import java.util.concurrent.CountDownLatch;
45import java.util.concurrent.TimeUnit;
46import java.util.concurrent.atomic.AtomicLong;
47
48import static com.google.common.base.Preconditions.checkArgument;
49import static junit.framework.TestCase.assertFalse;
50import static org.easymock.EasyMock.*;
51import static org.junit.Assert.assertEquals;
52import static org.junit.Assert.assertNull;
53import static org.junit.Assert.assertTrue;
54import static org.junit.Assert.fail;
55
56/**
57 * Unit tests for EventuallyConsistentMapImpl.
58 */
59public class EventuallyConsistentMapImplTest {
60
61 private EventuallyConsistentMap<String, String> ecMap;
62
63 private ClusterService clusterService;
64 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080065 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080066
67 private static final String MAP_NAME = "test";
68 private static final MessageSubject PUT_MESSAGE_SUBJECT
69 = new MessageSubject("ecm-" + MAP_NAME + "-update");
70 private static final MessageSubject REMOVE_MESSAGE_SUBJECT
71 = new MessageSubject("ecm-" + MAP_NAME + "-remove");
72 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
73 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
74
75 private static final String KEY1 = "one";
76 private static final String KEY2 = "two";
77 private static final String VALUE1 = "oneValue";
78 private static final String VALUE2 = "twoValue";
79
80 private final ControllerNode self =
81 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
82
83 private ClusterMessageHandler putHandler;
84 private ClusterMessageHandler removeHandler;
85 private ClusterMessageHandler antiEntropyHandler;
86
87 /*
88 * Serialization is a bit tricky here. We need to serialize in the tests
89 * to set the expectations, which will use this serializer here, but the
90 * EventuallyConsistentMap will use its own internal serializer. This means
91 * this serializer must be set up exactly the same as map's internal
92 * serializer.
93 */
94 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
95 @Override
96 protected void setupKryoPool() {
97 serializerPool = KryoNamespace.newBuilder()
98 // Classes we give to the map
99 .register(KryoNamespaces.API)
100 .register(TestTimestamp.class)
101 // Below is the classes that the map internally registers
102 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800103 .register(PutEntry.class)
104 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800105 .register(ArrayList.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800106 .register(InternalPutEvent.class)
107 .register(InternalRemoveEvent.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800108 .register(AntiEntropyAdvertisement.class)
109 .register(HashMap.class)
110 .build();
111 }
112 };
113
114 @Before
115 public void setUp() throws Exception {
116 clusterService = createMock(ClusterService.class);
117 expect(clusterService.getLocalNode()).andReturn(self)
118 .anyTimes();
119 replay(clusterService);
120
121 clusterCommunicator = createMock(ClusterCommunicationService.class);
122
123 // Add expectation for adding cluster message subscribers which
124 // delegate to our ClusterCommunicationService implementation. This
125 // allows us to get a reference to the map's internal cluster message
126 // handlers so we can induce events coming in from a peer.
127 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
128 anyObject(ClusterMessageHandler.class));
129 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
130
131 replay(clusterCommunicator);
132
133 clockService = new SequentialClockService<>();
134
135 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
136 .register(KryoNamespaces.API)
137 .register(TestTimestamp.class);
138
139 ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
140 clusterCommunicator,
141 serializer, clockService);
142
143 // Reset ready for tests to add their own expectations
144 reset(clusterCommunicator);
145 }
146
147 @After
148 public void tearDown() {
149 reset(clusterCommunicator);
150 ecMap.destroy();
151 }
152
153 @Test
154 public void testSize() throws Exception {
155 expectAnyMessage(clusterCommunicator);
156
157 assertEquals(0, ecMap.size());
158 ecMap.put(KEY1, VALUE1);
159 assertEquals(1, ecMap.size());
160 ecMap.put(KEY1, VALUE2);
161 assertEquals(1, ecMap.size());
162 ecMap.put(KEY2, VALUE2);
163 assertEquals(2, ecMap.size());
164 for (int i = 0; i < 10; i++) {
165 ecMap.put("" + i, "" + i);
166 }
167 assertEquals(12, ecMap.size());
168 ecMap.remove(KEY1);
169 assertEquals(11, ecMap.size());
170 ecMap.remove(KEY1);
171 assertEquals(11, ecMap.size());
172 }
173
174 @Test
175 public void testIsEmpty() throws Exception {
176 expectAnyMessage(clusterCommunicator);
177
178 assertTrue(ecMap.isEmpty());
179 ecMap.put(KEY1, VALUE1);
180 assertFalse(ecMap.isEmpty());
181 ecMap.remove(KEY1);
182 assertTrue(ecMap.isEmpty());
183 }
184
185 @Test
186 public void testContainsKey() throws Exception {
187 expectAnyMessage(clusterCommunicator);
188
189 assertFalse(ecMap.containsKey(KEY1));
190 ecMap.put(KEY1, VALUE1);
191 assertTrue(ecMap.containsKey(KEY1));
192 assertFalse(ecMap.containsKey(KEY2));
193 ecMap.remove(KEY1);
194 assertFalse(ecMap.containsKey(KEY1));
195 }
196
197 @Test
198 public void testContainsValue() throws Exception {
199 expectAnyMessage(clusterCommunicator);
200
201 assertFalse(ecMap.containsValue(VALUE1));
202 ecMap.put(KEY1, VALUE1);
203 assertTrue(ecMap.containsValue(VALUE1));
204 assertFalse(ecMap.containsValue(VALUE2));
205 ecMap.put(KEY1, VALUE2);
206 assertFalse(ecMap.containsValue(VALUE1));
207 assertTrue(ecMap.containsValue(VALUE2));
208 ecMap.remove(KEY1);
209 assertFalse(ecMap.containsValue(VALUE2));
210 }
211
212 @Test
213 public void testGet() throws Exception {
214 expectAnyMessage(clusterCommunicator);
215
216 CountDownLatch latch;
217
218 // Local put
219 assertNull(ecMap.get(KEY1));
220 ecMap.put(KEY1, VALUE1);
221 assertEquals(VALUE1, ecMap.get(KEY1));
222
223 // Remote put
224 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800225 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800226
227 // Create a latch so we know when the put operation has finished
228 latch = new CountDownLatch(1);
229 ecMap.addListener(new TestListener(latch));
230
231 assertNull(ecMap.get(KEY2));
232 putHandler.handle(message);
233 assertTrue("External listener never got notified of internal event",
234 latch.await(100, TimeUnit.MILLISECONDS));
235 assertEquals(VALUE2, ecMap.get(KEY2));
236
237 // Local remove
238 ecMap.remove(KEY2);
239 assertNull(ecMap.get(KEY2));
240
241 // Remote remove
242 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800243 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800244
245 // Create a latch so we know when the remove operation has finished
246 latch = new CountDownLatch(1);
247 ecMap.addListener(new TestListener(latch));
248
249 removeHandler.handle(removeMessage);
250 assertTrue("External listener never got notified of internal event",
251 latch.await(100, TimeUnit.MILLISECONDS));
252 assertNull(ecMap.get(KEY1));
253 }
254
255 @Test
256 public void testPut() throws Exception {
257 // Set up expectations of external events to be sent to listeners during
258 // the test. These don't use timestamps so we can set them all up at once.
259 EventuallyConsistentMapListener<String, String> listener
260 = createMock(EventuallyConsistentMapListener.class);
261 listener.event(new EventuallyConsistentMapEvent<>(
262 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
263 listener.event(new EventuallyConsistentMapEvent<>(
264 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
265 replay(listener);
266
267 ecMap.addListener(listener);
268
269 // Set up expected internal message to be broadcast to peers on first put
270 expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
271 .peekAtNextTimestamp()), clusterCommunicator);
272
273 // Put first value
274 assertNull(ecMap.get(KEY1));
275 ecMap.put(KEY1, VALUE1);
276 assertEquals(VALUE1, ecMap.get(KEY1));
277
278 verify(clusterCommunicator);
279
280 // Set up expected internal message to be broadcast to peers on second put
281 expectSpecificMessage(generatePutMessage(
282 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
283
284 // Update same key to a new value
285 ecMap.put(KEY1, VALUE2);
286 assertEquals(VALUE2, ecMap.get(KEY1));
287
288 verify(clusterCommunicator);
289
290 // Do a put with a older timestamp than the value already there.
291 // The map data should not be changed and no notifications should be sent.
292 reset(clusterCommunicator);
293 replay(clusterCommunicator);
294
295 clockService.turnBackTime();
296 ecMap.put(KEY1, VALUE1);
297 // Value should not have changed.
298 assertEquals(VALUE2, ecMap.get(KEY1));
299
300 verify(clusterCommunicator);
301
302 // Check that our listener received the correct events during the test
303 verify(listener);
304 }
305
306 @Test
307 public void testRemove() throws Exception {
308 // Set up expectations of external events to be sent to listeners during
309 // the test. These don't use timestamps so we can set them all up at once.
310 EventuallyConsistentMapListener<String, String> listener
311 = createMock(EventuallyConsistentMapListener.class);
312 listener.event(new EventuallyConsistentMapEvent<>(
313 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
314 expectLastCall().times(2);
315 listener.event(new EventuallyConsistentMapEvent<>(
316 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
317 listener.event(new EventuallyConsistentMapEvent<>(
318 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
319 replay(listener);
320
321 ecMap.addListener(listener);
322
323 // Put in an initial value
324 expectAnyMessage(clusterCommunicator);
325 ecMap.put(KEY1, VALUE1);
326 assertEquals(VALUE1, ecMap.get(KEY1));
327
328 // Remove the value and check the correct internal cluster messages
329 // are sent
330 expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
331 clusterCommunicator);
332
333 ecMap.remove(KEY1);
334 assertNull(ecMap.get(KEY1));
335
336 verify(clusterCommunicator);
337
338 // Remove the same value again. Even though the value is no longer in
339 // the map, we expect that the tombstone is updated and another remove
340 // event is sent to the cluster and external listeners.
341 expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
342 clusterCommunicator);
343
344 ecMap.remove(KEY1);
345 assertNull(ecMap.get(KEY1));
346
347 verify(clusterCommunicator);
348
349
350 // Put in a new value for us to try and remove
351 expectAnyMessage(clusterCommunicator);
352
353 ecMap.put(KEY2, VALUE2);
354
355 clockService.turnBackTime();
356
357 // Remove should have no effect, since it has an older timestamp than
358 // the put. Expect no notifications to be sent out
359 reset(clusterCommunicator);
360 replay(clusterCommunicator);
361
362 ecMap.remove(KEY2);
363
364 verify(clusterCommunicator);
365
366 // Check that our listener received the correct events during the test
367 verify(listener);
368 }
369
370 @Test
371 public void testPutAll() throws Exception {
372 // putAll() with an empty map is a no-op - no messages will be sent
373 reset(clusterCommunicator);
374 replay(clusterCommunicator);
375
376 ecMap.putAll(new HashMap<>());
377
378 verify(clusterCommunicator);
379
380 // Set up the listener with our expected events
381 EventuallyConsistentMapListener<String, String> listener
382 = createMock(EventuallyConsistentMapListener.class);
383 listener.event(new EventuallyConsistentMapEvent<>(
384 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
385 listener.event(new EventuallyConsistentMapEvent<>(
386 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
387 replay(listener);
388
389 ecMap.addListener(listener);
390
391 // Expect a multi-update inter-instance message
392 expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
393 clusterCommunicator);
394
395 Map<String, String> putAllValues = new HashMap<>();
396 putAllValues.put(KEY1, VALUE1);
397 putAllValues.put(KEY2, VALUE2);
398
399 // Put the values in the map
400 ecMap.putAll(putAllValues);
401
402 // Check the correct messages and events were sent
403 verify(clusterCommunicator);
404 verify(listener);
405 }
406
407 @Test
408 public void testClear() throws Exception {
409 EventuallyConsistentMapListener<String, String> listener
410 = createMock(EventuallyConsistentMapListener.class);
411 listener.event(new EventuallyConsistentMapEvent<>(
412 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
413 listener.event(new EventuallyConsistentMapEvent<>(
414 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
415 replay(listener);
416
417 // clear() on an empty map is a no-op - no messages will be sent
418 reset(clusterCommunicator);
419 replay(clusterCommunicator);
420
421 assertTrue(ecMap.isEmpty());
422 ecMap.clear();
423 verify(clusterCommunicator);
424
425 // Put some items in the map
426 expectAnyMessage(clusterCommunicator);
427 ecMap.put(KEY1, VALUE1);
428 ecMap.put(KEY2, VALUE2);
429
430 ecMap.addListener(listener);
431 expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
432
433 ecMap.clear();
434
435 verify(clusterCommunicator);
436 verify(listener);
437 }
438
439 @Test
440 public void testKeySet() throws Exception {
441 expectAnyMessage(clusterCommunicator);
442
443 assertTrue(ecMap.keySet().isEmpty());
444
445 // Generate some keys
446 Set<String> keys = new HashSet<>();
447 for (int i = 1; i <= 10; i++) {
448 keys.add("" + i);
449 }
450
451 // Put each key in the map
452 keys.forEach(k -> ecMap.put(k, "value" + k));
453
454 // Check keySet() returns the correct value
455 assertEquals(keys, ecMap.keySet());
456
457 // Update the value for one of the keys
458 ecMap.put(keys.iterator().next(), "new-value");
459
460 // Check the key set is still the same
461 assertEquals(keys, ecMap.keySet());
462
463 // Remove a key
464 String removeKey = keys.iterator().next();
465 keys.remove(removeKey);
466 ecMap.remove(removeKey);
467
468 // Check the key set is still correct
469 assertEquals(keys, ecMap.keySet());
470 }
471
472 @Test
473 public void testValues() throws Exception {
474 expectAnyMessage(clusterCommunicator);
475
476 assertTrue(ecMap.values().isEmpty());
477
478 // Generate some values
479 Map<String, String> expectedValues = new HashMap<>();
480 for (int i = 1; i <= 10; i++) {
481 expectedValues.put("" + i, "value" + i);
482 }
483
484 // Add them into the map
485 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
486
487 // Check the values collection is correct
488 assertEquals(expectedValues.values().size(), ecMap.values().size());
489 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
490
491 // Update the value for one of the keys
492 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
493 expectedValues.put(first.getKey(), "new-value");
494 ecMap.put(first.getKey(), "new-value");
495
496 // Check the values collection is still correct
497 assertEquals(expectedValues.values().size(), ecMap.values().size());
498 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
499
500 // Remove a key
501 String removeKey = expectedValues.keySet().iterator().next();
502 expectedValues.remove(removeKey);
503 ecMap.remove(removeKey);
504
505 // Check the values collection is still correct
506 assertEquals(expectedValues.values().size(), ecMap.values().size());
507 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
508 }
509
510 @Test
511 public void testEntrySet() throws Exception {
512 expectAnyMessage(clusterCommunicator);
513
514 assertTrue(ecMap.entrySet().isEmpty());
515
516 // Generate some values
517 Map<String, String> expectedValues = new HashMap<>();
518 for (int i = 1; i <= 10; i++) {
519 expectedValues.put("" + i, "value" + i);
520 }
521
522 // Add them into the map
523 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
524
525 // Check the entry set is correct
526 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
527
528 // Update the value for one of the keys
529 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
530 expectedValues.put(first.getKey(), "new-value");
531 ecMap.put(first.getKey(), "new-value");
532
533 // Check the entry set is still correct
534 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
535
536 // Remove a key
537 String removeKey = expectedValues.keySet().iterator().next();
538 expectedValues.remove(removeKey);
539 ecMap.remove(removeKey);
540
541 // Check the entry set is still correct
542 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
543 }
544
545 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
546 if (expectedMap.entrySet().size() != actual.size()) {
547 return false;
548 }
549
550 for (Map.Entry<String, String> e : actual) {
551 if (!expectedMap.containsKey(e.getKey())) {
552 return false;
553 }
554 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
555 return false;
556 }
557 }
558 return true;
559 }
560
561 @Test
562 public void testDestroy() throws Exception {
563 clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
564 clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
565 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
566
567 replay(clusterCommunicator);
568
569 ecMap.destroy();
570
571 verify(clusterCommunicator);
572
573 try {
574 ecMap.get(KEY1);
575 fail("get after destroy should throw exception");
576 } catch (IllegalStateException e) {
577 assertTrue(true);
578 }
579
580 try {
581 ecMap.put(KEY1, VALUE1);
582 fail("put after destroy should throw exception");
583 } catch (IllegalStateException e) {
584 assertTrue(true);
585 }
586 }
587
588 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800589 InternalPutEvent<String, String> event =
590 new InternalPutEvent<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800591
592 return new ClusterMessage(
593 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
594 SERIALIZER.encode(event));
595 }
596
597 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800598 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800599
600 Timestamp timestamp1 = clockService.peek(1);
601 Timestamp timestamp2 = clockService.peek(2);
602
Jonathan Hartf9108232015-02-02 16:37:35 -0800603 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
604 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800605
606 list.add(pe1);
607 list.add(pe2);
608
Jonathan Hartf9108232015-02-02 16:37:35 -0800609 InternalPutEvent<String, String> event = new InternalPutEvent<>(list);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800610
611 return new ClusterMessage(
612 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
613 SERIALIZER.encode(event));
614 }
615
616 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800617 InternalRemoveEvent<String> event = new InternalRemoveEvent<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800618
619 return new ClusterMessage(
620 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
621 SERIALIZER.encode(event));
622 }
623
624 private ClusterMessage generateRemoveMessage(String key1, String key2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800625 ArrayList<RemoveEntry<String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800626
627 Timestamp timestamp1 = clockService.peek(1);
628 Timestamp timestamp2 = clockService.peek(2);
629
Jonathan Hartf9108232015-02-02 16:37:35 -0800630 RemoveEntry<String> re1 = new RemoveEntry<>(key1, timestamp1);
631 RemoveEntry<String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800632
633 list.add(re1);
634 list.add(re2);
635
Jonathan Hartf9108232015-02-02 16:37:35 -0800636 InternalRemoveEvent<String> event = new InternalRemoveEvent<>(list);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800637
638 return new ClusterMessage(
639 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
640 SERIALIZER.encode(event));
641 }
642
643 /**
644 * Sets up a mock ClusterCommunicationService to expect a specific cluster
645 * message to be broadcast to the cluster.
646 *
647 * @param m message we expect to be sent
648 * @param clusterCommunicator a mock ClusterCommunicationService to set up
649 */
650 private static void expectSpecificMessage(ClusterMessage m,
651 ClusterCommunicationService clusterCommunicator) {
652 reset(clusterCommunicator);
653 expect(clusterCommunicator.broadcast(m)).andReturn(true);
654 replay(clusterCommunicator);
655 }
656
657 /**
658 * Sets up a mock ClusterCommunicationService to expect any cluster message
659 * that is sent to it. This is useful for unit tests where we aren't
660 * interested in testing the messaging component.
661 *
662 * @param clusterCommunicator a mock ClusterCommunicationService to set up
663 */
664 private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
665 reset(clusterCommunicator);
666 expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
667 .andReturn(true)
668 .anyTimes();
669 replay(clusterCommunicator);
670 }
671
672 /**
673 * ClusterCommunicationService implementation that the map's addSubscriber
674 * call will delegate to. This means we can get a reference to the
675 * internal cluster message handler used by the map, so that we can simulate
676 * events coming in from other instances.
677 */
678 private final class TestClusterCommunicationService
679 implements ClusterCommunicationService {
680
681 @Override
682 public boolean broadcast(ClusterMessage message) {
683 return false;
684 }
685
686 @Override
687 public boolean broadcastIncludeSelf(ClusterMessage message) {
688 return false;
689 }
690
691 @Override
692 public boolean unicast(ClusterMessage message, NodeId toNodeId)
693 throws IOException {
694 return false;
695 }
696
697 @Override
698 public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
699 return false;
700 }
701
702 @Override
703 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
704 NodeId toNodeId)
705 throws IOException {
706 return null;
707 }
708
709 @Override
710 public void addSubscriber(MessageSubject subject,
711 ClusterMessageHandler subscriber) {
712 if (subject.equals(PUT_MESSAGE_SUBJECT)) {
713 putHandler = subscriber;
714 } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
715 removeHandler = subscriber;
716 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
717 antiEntropyHandler = subscriber;
718 } else {
719 throw new RuntimeException("Unexpected message subject " + subject.toString());
720 }
721 }
722
723 @Override
724 public void removeSubscriber(MessageSubject subject) {}
725 }
726
727 /**
728 * ClockService implementation that gives out timestamps based on a
729 * sequential counter. This clock service enables more control over the
730 * timestamps that are given out, including being able to "turn back time"
731 * to give out timestamps from the past.
732 *
733 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800734 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800735 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800736 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800737
738 private static final long INITIAL_VALUE = 1;
739 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
740
741 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800742 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800743 return new TestTimestamp(counter.getAndIncrement());
744 }
745
746 /**
747 * Returns what the next timestamp will be without consuming the
748 * timestamp. This allows test code to set expectations correctly while
749 * still allowing the CUT to get the same timestamp.
750 *
751 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800752 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800753 */
754 public Timestamp peekAtNextTimestamp() {
755 return peek(1);
756 }
757
758 /**
759 * Returns the ith timestamp to be given out in the future without
760 * consuming the timestamp. For example, i=1 returns the next timestamp,
761 * i=2 returns the timestamp after that, and so on.
762 *
763 * @param i number of the timestamp to peek at
764 * @return the ith timestamp that will be given out
765 */
766 public Timestamp peek(int i) {
767 checkArgument(i > 0, "i must be a positive integer");
768
769 return new TestTimestamp(counter.get() + i - 1);
770 }
771
772 /**
773 * Turns the clock back two ticks, so the next call to getTimestamp will
774 * return an older timestamp than the previous call to getTimestamp.
775 */
776 public void turnBackTime() {
777 // Not atomic, but should be OK for these tests.
778 counter.decrementAndGet();
779 counter.decrementAndGet();
780 }
781
782 }
783
784 /**
785 * Timestamp implementation where the value of the timestamp can be
786 * specified explicitly at creation time.
787 */
788 private class TestTimestamp implements Timestamp {
789
790 private final long timestamp;
791
792 /**
793 * Creates a new timestamp that has the specified value.
794 *
795 * @param timestamp value of the timestamp
796 */
797 public TestTimestamp(long timestamp) {
798 this.timestamp = timestamp;
799 }
800
801 @Override
802 public int compareTo(Timestamp o) {
803 checkArgument(o instanceof TestTimestamp);
804 TestTimestamp otherTimestamp = (TestTimestamp) o;
805 return ComparisonChain.start()
806 .compare(this.timestamp, otherTimestamp.timestamp)
807 .result();
808 }
809 }
810
811 /**
812 * EventuallyConsistentMapListener implementation which triggers a latch
813 * when it receives an event.
814 */
815 private class TestListener implements EventuallyConsistentMapListener<String, String> {
816 private CountDownLatch latch;
817
818 /**
819 * Creates a new listener that will trigger the specified latch when it
820 * receives and event.
821 *
822 * @param latch the latch to trigger on events
823 */
824 public TestListener(CountDownLatch latch) {
825 this.latch = latch;
826 }
827
828 @Override
829 public void event(EventuallyConsistentMapEvent<String, String> event) {
830 latch.countDown();
831 }
832 }
833}