blob: fc857953ac8c4cf83244711689d6922d45798b9f [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
19import com.google.common.util.concurrent.ListenableFuture;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080020import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani28726282015-02-19 11:40:23 -080021
Jonathan Hart584d2f32015-01-27 19:46:14 -080022import org.junit.After;
23import org.junit.Before;
24import org.junit.Test;
25import org.onlab.packet.IpAddress;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.ControllerNode;
29import org.onosproject.cluster.DefaultControllerNode;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.store.Timestamp;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080036import org.onosproject.store.impl.ClockService;
37import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080038import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.serializers.KryoSerializer;
40
41import java.io.IOException;
42import java.util.ArrayList;
43import java.util.HashMap;
44import java.util.HashSet;
45import java.util.Map;
46import java.util.Objects;
47import java.util.Set;
48import java.util.concurrent.CountDownLatch;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.atomic.AtomicLong;
51
52import static com.google.common.base.Preconditions.checkArgument;
53import static junit.framework.TestCase.assertFalse;
54import static org.easymock.EasyMock.*;
55import static org.junit.Assert.assertEquals;
56import static org.junit.Assert.assertNull;
57import static org.junit.Assert.assertTrue;
58import static org.junit.Assert.fail;
59
60/**
61 * Unit tests for EventuallyConsistentMapImpl.
62 */
63public class EventuallyConsistentMapImplTest {
64
65 private EventuallyConsistentMap<String, String> ecMap;
66
67 private ClusterService clusterService;
68 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080069 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080070
71 private static final String MAP_NAME = "test";
72 private static final MessageSubject PUT_MESSAGE_SUBJECT
73 = new MessageSubject("ecm-" + MAP_NAME + "-update");
74 private static final MessageSubject REMOVE_MESSAGE_SUBJECT
75 = new MessageSubject("ecm-" + MAP_NAME + "-remove");
76 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
77 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
78
79 private static final String KEY1 = "one";
80 private static final String KEY2 = "two";
81 private static final String VALUE1 = "oneValue";
82 private static final String VALUE2 = "twoValue";
83
84 private final ControllerNode self =
85 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
86
87 private ClusterMessageHandler putHandler;
88 private ClusterMessageHandler removeHandler;
89 private ClusterMessageHandler antiEntropyHandler;
90
91 /*
92 * Serialization is a bit tricky here. We need to serialize in the tests
93 * to set the expectations, which will use this serializer here, but the
94 * EventuallyConsistentMap will use its own internal serializer. This means
95 * this serializer must be set up exactly the same as map's internal
96 * serializer.
97 */
98 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
99 @Override
100 protected void setupKryoPool() {
101 serializerPool = KryoNamespace.newBuilder()
102 // Classes we give to the map
103 .register(KryoNamespaces.API)
104 .register(TestTimestamp.class)
105 // Below is the classes that the map internally registers
106 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800107 .register(PutEntry.class)
108 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800109 .register(ArrayList.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800110 .register(InternalPutEvent.class)
111 .register(InternalRemoveEvent.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800112 .register(AntiEntropyAdvertisement.class)
113 .register(HashMap.class)
114 .build();
115 }
116 };
117
118 @Before
119 public void setUp() throws Exception {
120 clusterService = createMock(ClusterService.class);
121 expect(clusterService.getLocalNode()).andReturn(self)
122 .anyTimes();
123 replay(clusterService);
124
125 clusterCommunicator = createMock(ClusterCommunicationService.class);
126
127 // Add expectation for adding cluster message subscribers which
128 // delegate to our ClusterCommunicationService implementation. This
129 // allows us to get a reference to the map's internal cluster message
130 // handlers so we can induce events coming in from a peer.
131 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
132 anyObject(ClusterMessageHandler.class));
133 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
134
135 replay(clusterCommunicator);
136
137 clockService = new SequentialClockService<>();
138
139 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
140 .register(KryoNamespaces.API)
141 .register(TestTimestamp.class);
142
143 ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
144 clusterCommunicator,
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800145 serializer, clockService)
146 .withBroadcastMessageExecutor(MoreExecutors.newDirectExecutorService());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800147
148 // Reset ready for tests to add their own expectations
149 reset(clusterCommunicator);
150 }
151
152 @After
153 public void tearDown() {
154 reset(clusterCommunicator);
155 ecMap.destroy();
156 }
157
Ray Milkey8dc82082015-02-20 16:22:38 -0800158 @SuppressWarnings("unchecked")
159 private EventuallyConsistentMapListener<String, String> getListener() {
160 return createMock(EventuallyConsistentMapListener.class);
161 }
162
Jonathan Hart584d2f32015-01-27 19:46:14 -0800163 @Test
164 public void testSize() throws Exception {
165 expectAnyMessage(clusterCommunicator);
166
167 assertEquals(0, ecMap.size());
168 ecMap.put(KEY1, VALUE1);
169 assertEquals(1, ecMap.size());
170 ecMap.put(KEY1, VALUE2);
171 assertEquals(1, ecMap.size());
172 ecMap.put(KEY2, VALUE2);
173 assertEquals(2, ecMap.size());
174 for (int i = 0; i < 10; i++) {
175 ecMap.put("" + i, "" + i);
176 }
177 assertEquals(12, ecMap.size());
178 ecMap.remove(KEY1);
179 assertEquals(11, ecMap.size());
180 ecMap.remove(KEY1);
181 assertEquals(11, ecMap.size());
182 }
183
184 @Test
185 public void testIsEmpty() throws Exception {
186 expectAnyMessage(clusterCommunicator);
187
188 assertTrue(ecMap.isEmpty());
189 ecMap.put(KEY1, VALUE1);
190 assertFalse(ecMap.isEmpty());
191 ecMap.remove(KEY1);
192 assertTrue(ecMap.isEmpty());
193 }
194
195 @Test
196 public void testContainsKey() throws Exception {
197 expectAnyMessage(clusterCommunicator);
198
199 assertFalse(ecMap.containsKey(KEY1));
200 ecMap.put(KEY1, VALUE1);
201 assertTrue(ecMap.containsKey(KEY1));
202 assertFalse(ecMap.containsKey(KEY2));
203 ecMap.remove(KEY1);
204 assertFalse(ecMap.containsKey(KEY1));
205 }
206
207 @Test
208 public void testContainsValue() throws Exception {
209 expectAnyMessage(clusterCommunicator);
210
211 assertFalse(ecMap.containsValue(VALUE1));
212 ecMap.put(KEY1, VALUE1);
213 assertTrue(ecMap.containsValue(VALUE1));
214 assertFalse(ecMap.containsValue(VALUE2));
215 ecMap.put(KEY1, VALUE2);
216 assertFalse(ecMap.containsValue(VALUE1));
217 assertTrue(ecMap.containsValue(VALUE2));
218 ecMap.remove(KEY1);
219 assertFalse(ecMap.containsValue(VALUE2));
220 }
221
222 @Test
223 public void testGet() throws Exception {
224 expectAnyMessage(clusterCommunicator);
225
226 CountDownLatch latch;
227
228 // Local put
229 assertNull(ecMap.get(KEY1));
230 ecMap.put(KEY1, VALUE1);
231 assertEquals(VALUE1, ecMap.get(KEY1));
232
233 // Remote put
234 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800235 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800236
237 // Create a latch so we know when the put operation has finished
238 latch = new CountDownLatch(1);
239 ecMap.addListener(new TestListener(latch));
240
241 assertNull(ecMap.get(KEY2));
242 putHandler.handle(message);
243 assertTrue("External listener never got notified of internal event",
244 latch.await(100, TimeUnit.MILLISECONDS));
245 assertEquals(VALUE2, ecMap.get(KEY2));
246
247 // Local remove
248 ecMap.remove(KEY2);
249 assertNull(ecMap.get(KEY2));
250
251 // Remote remove
252 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800253 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800254
255 // Create a latch so we know when the remove operation has finished
256 latch = new CountDownLatch(1);
257 ecMap.addListener(new TestListener(latch));
258
259 removeHandler.handle(removeMessage);
260 assertTrue("External listener never got notified of internal event",
261 latch.await(100, TimeUnit.MILLISECONDS));
262 assertNull(ecMap.get(KEY1));
263 }
264
265 @Test
266 public void testPut() throws Exception {
267 // Set up expectations of external events to be sent to listeners during
268 // the test. These don't use timestamps so we can set them all up at once.
269 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800270 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800271 listener.event(new EventuallyConsistentMapEvent<>(
272 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
273 listener.event(new EventuallyConsistentMapEvent<>(
274 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
275 replay(listener);
276
277 ecMap.addListener(listener);
278
279 // Set up expected internal message to be broadcast to peers on first put
280 expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
281 .peekAtNextTimestamp()), clusterCommunicator);
282
283 // Put first value
284 assertNull(ecMap.get(KEY1));
285 ecMap.put(KEY1, VALUE1);
286 assertEquals(VALUE1, ecMap.get(KEY1));
287
288 verify(clusterCommunicator);
289
290 // Set up expected internal message to be broadcast to peers on second put
291 expectSpecificMessage(generatePutMessage(
292 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
293
294 // Update same key to a new value
295 ecMap.put(KEY1, VALUE2);
296 assertEquals(VALUE2, ecMap.get(KEY1));
297
298 verify(clusterCommunicator);
299
300 // Do a put with a older timestamp than the value already there.
301 // The map data should not be changed and no notifications should be sent.
302 reset(clusterCommunicator);
303 replay(clusterCommunicator);
304
305 clockService.turnBackTime();
306 ecMap.put(KEY1, VALUE1);
307 // Value should not have changed.
308 assertEquals(VALUE2, ecMap.get(KEY1));
309
310 verify(clusterCommunicator);
311
312 // Check that our listener received the correct events during the test
313 verify(listener);
314 }
315
316 @Test
317 public void testRemove() throws Exception {
318 // Set up expectations of external events to be sent to listeners during
319 // the test. These don't use timestamps so we can set them all up at once.
320 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800321 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800322 listener.event(new EventuallyConsistentMapEvent<>(
323 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
324 expectLastCall().times(2);
325 listener.event(new EventuallyConsistentMapEvent<>(
326 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
327 listener.event(new EventuallyConsistentMapEvent<>(
328 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
329 replay(listener);
330
331 ecMap.addListener(listener);
332
333 // Put in an initial value
334 expectAnyMessage(clusterCommunicator);
335 ecMap.put(KEY1, VALUE1);
336 assertEquals(VALUE1, ecMap.get(KEY1));
337
338 // Remove the value and check the correct internal cluster messages
339 // are sent
340 expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
341 clusterCommunicator);
342
343 ecMap.remove(KEY1);
344 assertNull(ecMap.get(KEY1));
345
346 verify(clusterCommunicator);
347
348 // Remove the same value again. Even though the value is no longer in
349 // the map, we expect that the tombstone is updated and another remove
350 // event is sent to the cluster and external listeners.
351 expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
352 clusterCommunicator);
353
354 ecMap.remove(KEY1);
355 assertNull(ecMap.get(KEY1));
356
357 verify(clusterCommunicator);
358
359
360 // Put in a new value for us to try and remove
361 expectAnyMessage(clusterCommunicator);
362
363 ecMap.put(KEY2, VALUE2);
364
365 clockService.turnBackTime();
366
367 // Remove should have no effect, since it has an older timestamp than
368 // the put. Expect no notifications to be sent out
369 reset(clusterCommunicator);
370 replay(clusterCommunicator);
371
372 ecMap.remove(KEY2);
373
374 verify(clusterCommunicator);
375
376 // Check that our listener received the correct events during the test
377 verify(listener);
378 }
379
380 @Test
381 public void testPutAll() throws Exception {
382 // putAll() with an empty map is a no-op - no messages will be sent
383 reset(clusterCommunicator);
384 replay(clusterCommunicator);
385
386 ecMap.putAll(new HashMap<>());
387
388 verify(clusterCommunicator);
389
390 // Set up the listener with our expected events
391 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800392 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800393 listener.event(new EventuallyConsistentMapEvent<>(
394 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
395 listener.event(new EventuallyConsistentMapEvent<>(
396 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
397 replay(listener);
398
399 ecMap.addListener(listener);
400
401 // Expect a multi-update inter-instance message
402 expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
403 clusterCommunicator);
404
405 Map<String, String> putAllValues = new HashMap<>();
406 putAllValues.put(KEY1, VALUE1);
407 putAllValues.put(KEY2, VALUE2);
408
409 // Put the values in the map
410 ecMap.putAll(putAllValues);
411
412 // Check the correct messages and events were sent
413 verify(clusterCommunicator);
414 verify(listener);
415 }
416
417 @Test
418 public void testClear() throws Exception {
419 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800420 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800421 listener.event(new EventuallyConsistentMapEvent<>(
422 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
423 listener.event(new EventuallyConsistentMapEvent<>(
424 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
425 replay(listener);
426
427 // clear() on an empty map is a no-op - no messages will be sent
428 reset(clusterCommunicator);
429 replay(clusterCommunicator);
430
431 assertTrue(ecMap.isEmpty());
432 ecMap.clear();
433 verify(clusterCommunicator);
434
435 // Put some items in the map
436 expectAnyMessage(clusterCommunicator);
437 ecMap.put(KEY1, VALUE1);
438 ecMap.put(KEY2, VALUE2);
439
440 ecMap.addListener(listener);
441 expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
442
443 ecMap.clear();
444
445 verify(clusterCommunicator);
446 verify(listener);
447 }
448
449 @Test
450 public void testKeySet() throws Exception {
451 expectAnyMessage(clusterCommunicator);
452
453 assertTrue(ecMap.keySet().isEmpty());
454
455 // Generate some keys
456 Set<String> keys = new HashSet<>();
457 for (int i = 1; i <= 10; i++) {
458 keys.add("" + i);
459 }
460
461 // Put each key in the map
462 keys.forEach(k -> ecMap.put(k, "value" + k));
463
464 // Check keySet() returns the correct value
465 assertEquals(keys, ecMap.keySet());
466
467 // Update the value for one of the keys
468 ecMap.put(keys.iterator().next(), "new-value");
469
470 // Check the key set is still the same
471 assertEquals(keys, ecMap.keySet());
472
473 // Remove a key
474 String removeKey = keys.iterator().next();
475 keys.remove(removeKey);
476 ecMap.remove(removeKey);
477
478 // Check the key set is still correct
479 assertEquals(keys, ecMap.keySet());
480 }
481
482 @Test
483 public void testValues() throws Exception {
484 expectAnyMessage(clusterCommunicator);
485
486 assertTrue(ecMap.values().isEmpty());
487
488 // Generate some values
489 Map<String, String> expectedValues = new HashMap<>();
490 for (int i = 1; i <= 10; i++) {
491 expectedValues.put("" + i, "value" + i);
492 }
493
494 // Add them into the map
495 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
496
497 // Check the values collection is correct
498 assertEquals(expectedValues.values().size(), ecMap.values().size());
499 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
500
501 // Update the value for one of the keys
502 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
503 expectedValues.put(first.getKey(), "new-value");
504 ecMap.put(first.getKey(), "new-value");
505
506 // Check the values collection is still correct
507 assertEquals(expectedValues.values().size(), ecMap.values().size());
508 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
509
510 // Remove a key
511 String removeKey = expectedValues.keySet().iterator().next();
512 expectedValues.remove(removeKey);
513 ecMap.remove(removeKey);
514
515 // Check the values collection is still correct
516 assertEquals(expectedValues.values().size(), ecMap.values().size());
517 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
518 }
519
520 @Test
521 public void testEntrySet() throws Exception {
522 expectAnyMessage(clusterCommunicator);
523
524 assertTrue(ecMap.entrySet().isEmpty());
525
526 // Generate some values
527 Map<String, String> expectedValues = new HashMap<>();
528 for (int i = 1; i <= 10; i++) {
529 expectedValues.put("" + i, "value" + i);
530 }
531
532 // Add them into the map
533 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
534
535 // Check the entry set is correct
536 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
537
538 // Update the value for one of the keys
539 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
540 expectedValues.put(first.getKey(), "new-value");
541 ecMap.put(first.getKey(), "new-value");
542
543 // Check the entry set is still correct
544 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
545
546 // Remove a key
547 String removeKey = expectedValues.keySet().iterator().next();
548 expectedValues.remove(removeKey);
549 ecMap.remove(removeKey);
550
551 // Check the entry set is still correct
552 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
553 }
554
555 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
556 if (expectedMap.entrySet().size() != actual.size()) {
557 return false;
558 }
559
560 for (Map.Entry<String, String> e : actual) {
561 if (!expectedMap.containsKey(e.getKey())) {
562 return false;
563 }
564 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
565 return false;
566 }
567 }
568 return true;
569 }
570
571 @Test
572 public void testDestroy() throws Exception {
573 clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
574 clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
575 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
576
577 replay(clusterCommunicator);
578
579 ecMap.destroy();
580
581 verify(clusterCommunicator);
582
583 try {
584 ecMap.get(KEY1);
585 fail("get after destroy should throw exception");
586 } catch (IllegalStateException e) {
587 assertTrue(true);
588 }
589
590 try {
591 ecMap.put(KEY1, VALUE1);
592 fail("put after destroy should throw exception");
593 } catch (IllegalStateException e) {
594 assertTrue(true);
595 }
596 }
597
598 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800599 InternalPutEvent<String, String> event =
600 new InternalPutEvent<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800601
602 return new ClusterMessage(
603 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
604 SERIALIZER.encode(event));
605 }
606
607 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800608 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800609
610 Timestamp timestamp1 = clockService.peek(1);
611 Timestamp timestamp2 = clockService.peek(2);
612
Jonathan Hartf9108232015-02-02 16:37:35 -0800613 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
614 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800615
616 list.add(pe1);
617 list.add(pe2);
618
Jonathan Hartf9108232015-02-02 16:37:35 -0800619 InternalPutEvent<String, String> event = new InternalPutEvent<>(list);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800620
621 return new ClusterMessage(
622 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
623 SERIALIZER.encode(event));
624 }
625
626 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800627 InternalRemoveEvent<String> event = new InternalRemoveEvent<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800628
629 return new ClusterMessage(
630 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
631 SERIALIZER.encode(event));
632 }
633
634 private ClusterMessage generateRemoveMessage(String key1, String key2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800635 ArrayList<RemoveEntry<String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800636
637 Timestamp timestamp1 = clockService.peek(1);
638 Timestamp timestamp2 = clockService.peek(2);
639
Jonathan Hartf9108232015-02-02 16:37:35 -0800640 RemoveEntry<String> re1 = new RemoveEntry<>(key1, timestamp1);
641 RemoveEntry<String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800642
643 list.add(re1);
644 list.add(re2);
645
Jonathan Hartf9108232015-02-02 16:37:35 -0800646 InternalRemoveEvent<String> event = new InternalRemoveEvent<>(list);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800647
648 return new ClusterMessage(
649 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
650 SERIALIZER.encode(event));
651 }
652
653 /**
654 * Sets up a mock ClusterCommunicationService to expect a specific cluster
655 * message to be broadcast to the cluster.
656 *
657 * @param m message we expect to be sent
658 * @param clusterCommunicator a mock ClusterCommunicationService to set up
659 */
660 private static void expectSpecificMessage(ClusterMessage m,
661 ClusterCommunicationService clusterCommunicator) {
662 reset(clusterCommunicator);
663 expect(clusterCommunicator.broadcast(m)).andReturn(true);
664 replay(clusterCommunicator);
665 }
666
667 /**
668 * Sets up a mock ClusterCommunicationService to expect any cluster message
669 * that is sent to it. This is useful for unit tests where we aren't
670 * interested in testing the messaging component.
671 *
672 * @param clusterCommunicator a mock ClusterCommunicationService to set up
673 */
674 private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
675 reset(clusterCommunicator);
676 expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
677 .andReturn(true)
678 .anyTimes();
679 replay(clusterCommunicator);
680 }
681
682 /**
683 * ClusterCommunicationService implementation that the map's addSubscriber
684 * call will delegate to. This means we can get a reference to the
685 * internal cluster message handler used by the map, so that we can simulate
686 * events coming in from other instances.
687 */
688 private final class TestClusterCommunicationService
689 implements ClusterCommunicationService {
690
691 @Override
692 public boolean broadcast(ClusterMessage message) {
693 return false;
694 }
695
696 @Override
697 public boolean broadcastIncludeSelf(ClusterMessage message) {
698 return false;
699 }
700
701 @Override
702 public boolean unicast(ClusterMessage message, NodeId toNodeId)
703 throws IOException {
704 return false;
705 }
706
707 @Override
708 public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
709 return false;
710 }
711
712 @Override
713 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
714 NodeId toNodeId)
715 throws IOException {
716 return null;
717 }
718
719 @Override
720 public void addSubscriber(MessageSubject subject,
721 ClusterMessageHandler subscriber) {
722 if (subject.equals(PUT_MESSAGE_SUBJECT)) {
723 putHandler = subscriber;
724 } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
725 removeHandler = subscriber;
726 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
727 antiEntropyHandler = subscriber;
728 } else {
729 throw new RuntimeException("Unexpected message subject " + subject.toString());
730 }
731 }
732
733 @Override
734 public void removeSubscriber(MessageSubject subject) {}
735 }
736
737 /**
738 * ClockService implementation that gives out timestamps based on a
739 * sequential counter. This clock service enables more control over the
740 * timestamps that are given out, including being able to "turn back time"
741 * to give out timestamps from the past.
742 *
743 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800744 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800745 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800746 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800747
748 private static final long INITIAL_VALUE = 1;
749 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
750
751 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800752 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800753 return new TestTimestamp(counter.getAndIncrement());
754 }
755
756 /**
757 * Returns what the next timestamp will be without consuming the
758 * timestamp. This allows test code to set expectations correctly while
759 * still allowing the CUT to get the same timestamp.
760 *
761 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800762 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800763 */
764 public Timestamp peekAtNextTimestamp() {
765 return peek(1);
766 }
767
768 /**
769 * Returns the ith timestamp to be given out in the future without
770 * consuming the timestamp. For example, i=1 returns the next timestamp,
771 * i=2 returns the timestamp after that, and so on.
772 *
773 * @param i number of the timestamp to peek at
774 * @return the ith timestamp that will be given out
775 */
776 public Timestamp peek(int i) {
777 checkArgument(i > 0, "i must be a positive integer");
778
779 return new TestTimestamp(counter.get() + i - 1);
780 }
781
782 /**
783 * Turns the clock back two ticks, so the next call to getTimestamp will
784 * return an older timestamp than the previous call to getTimestamp.
785 */
786 public void turnBackTime() {
787 // Not atomic, but should be OK for these tests.
788 counter.decrementAndGet();
789 counter.decrementAndGet();
790 }
791
792 }
793
794 /**
795 * Timestamp implementation where the value of the timestamp can be
796 * specified explicitly at creation time.
797 */
798 private class TestTimestamp implements Timestamp {
799
800 private final long timestamp;
801
802 /**
803 * Creates a new timestamp that has the specified value.
804 *
805 * @param timestamp value of the timestamp
806 */
807 public TestTimestamp(long timestamp) {
808 this.timestamp = timestamp;
809 }
810
811 @Override
812 public int compareTo(Timestamp o) {
813 checkArgument(o instanceof TestTimestamp);
814 TestTimestamp otherTimestamp = (TestTimestamp) o;
815 return ComparisonChain.start()
816 .compare(this.timestamp, otherTimestamp.timestamp)
817 .result();
818 }
819 }
820
821 /**
822 * EventuallyConsistentMapListener implementation which triggers a latch
823 * when it receives an event.
824 */
825 private class TestListener implements EventuallyConsistentMapListener<String, String> {
826 private CountDownLatch latch;
827
828 /**
829 * Creates a new listener that will trigger the specified latch when it
830 * receives and event.
831 *
832 * @param latch the latch to trigger on events
833 */
834 public TestListener(CountDownLatch latch) {
835 this.latch = latch;
836 }
837
838 @Override
839 public void event(EventuallyConsistentMapEvent<String, String> event) {
840 latch.countDown();
841 }
842 }
843}