blob: e6670de5ef55d6fb9917c90968587243db252f2e [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
19import com.google.common.util.concurrent.ListenableFuture;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080020import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani28726282015-02-19 11:40:23 -080021
Jonathan Hart584d2f32015-01-27 19:46:14 -080022import org.junit.After;
23import org.junit.Before;
24import org.junit.Test;
25import org.onlab.packet.IpAddress;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.ControllerNode;
29import org.onosproject.cluster.DefaultControllerNode;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.store.Timestamp;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080036import org.onosproject.store.impl.ClockService;
37import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080038import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.serializers.KryoSerializer;
40
41import java.io.IOException;
42import java.util.ArrayList;
43import java.util.HashMap;
44import java.util.HashSet;
45import java.util.Map;
46import java.util.Objects;
47import java.util.Set;
48import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080049import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080050import java.util.concurrent.TimeUnit;
51import java.util.concurrent.atomic.AtomicLong;
52
53import static com.google.common.base.Preconditions.checkArgument;
54import static junit.framework.TestCase.assertFalse;
55import static org.easymock.EasyMock.*;
56import static org.junit.Assert.assertEquals;
57import static org.junit.Assert.assertNull;
58import static org.junit.Assert.assertTrue;
59import static org.junit.Assert.fail;
60
61/**
62 * Unit tests for EventuallyConsistentMapImpl.
63 */
64public class EventuallyConsistentMapImplTest {
65
66 private EventuallyConsistentMap<String, String> ecMap;
67
68 private ClusterService clusterService;
69 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080070 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080071
72 private static final String MAP_NAME = "test";
73 private static final MessageSubject PUT_MESSAGE_SUBJECT
74 = new MessageSubject("ecm-" + MAP_NAME + "-update");
75 private static final MessageSubject REMOVE_MESSAGE_SUBJECT
76 = new MessageSubject("ecm-" + MAP_NAME + "-remove");
77 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
78 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
79
80 private static final String KEY1 = "one";
81 private static final String KEY2 = "two";
82 private static final String VALUE1 = "oneValue";
83 private static final String VALUE2 = "twoValue";
84
85 private final ControllerNode self =
86 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
87
88 private ClusterMessageHandler putHandler;
89 private ClusterMessageHandler removeHandler;
90 private ClusterMessageHandler antiEntropyHandler;
91
92 /*
93 * Serialization is a bit tricky here. We need to serialize in the tests
94 * to set the expectations, which will use this serializer here, but the
95 * EventuallyConsistentMap will use its own internal serializer. This means
96 * this serializer must be set up exactly the same as map's internal
97 * serializer.
98 */
99 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
100 @Override
101 protected void setupKryoPool() {
102 serializerPool = KryoNamespace.newBuilder()
103 // Classes we give to the map
104 .register(KryoNamespaces.API)
105 .register(TestTimestamp.class)
106 // Below is the classes that the map internally registers
107 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800108 .register(PutEntry.class)
109 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800110 .register(ArrayList.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800111 .register(InternalPutEvent.class)
112 .register(InternalRemoveEvent.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800113 .register(AntiEntropyAdvertisement.class)
114 .register(HashMap.class)
115 .build();
116 }
117 };
118
119 @Before
120 public void setUp() throws Exception {
121 clusterService = createMock(ClusterService.class);
122 expect(clusterService.getLocalNode()).andReturn(self)
123 .anyTimes();
124 replay(clusterService);
125
126 clusterCommunicator = createMock(ClusterCommunicationService.class);
127
128 // Add expectation for adding cluster message subscribers which
129 // delegate to our ClusterCommunicationService implementation. This
130 // allows us to get a reference to the map's internal cluster message
131 // handlers so we can induce events coming in from a peer.
132 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
Madan Jampani2af244a2015-02-22 13:12:01 -0800133 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800134 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
135
136 replay(clusterCommunicator);
137
138 clockService = new SequentialClockService<>();
139
140 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
141 .register(KryoNamespaces.API)
142 .register(TestTimestamp.class);
143
144 ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
145 clusterCommunicator,
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800146 serializer, clockService)
147 .withBroadcastMessageExecutor(MoreExecutors.newDirectExecutorService());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800148
149 // Reset ready for tests to add their own expectations
150 reset(clusterCommunicator);
151 }
152
153 @After
154 public void tearDown() {
155 reset(clusterCommunicator);
156 ecMap.destroy();
157 }
158
Ray Milkey8dc82082015-02-20 16:22:38 -0800159 @SuppressWarnings("unchecked")
160 private EventuallyConsistentMapListener<String, String> getListener() {
161 return createMock(EventuallyConsistentMapListener.class);
162 }
163
Jonathan Hart584d2f32015-01-27 19:46:14 -0800164 @Test
165 public void testSize() throws Exception {
166 expectAnyMessage(clusterCommunicator);
167
168 assertEquals(0, ecMap.size());
169 ecMap.put(KEY1, VALUE1);
170 assertEquals(1, ecMap.size());
171 ecMap.put(KEY1, VALUE2);
172 assertEquals(1, ecMap.size());
173 ecMap.put(KEY2, VALUE2);
174 assertEquals(2, ecMap.size());
175 for (int i = 0; i < 10; i++) {
176 ecMap.put("" + i, "" + i);
177 }
178 assertEquals(12, ecMap.size());
179 ecMap.remove(KEY1);
180 assertEquals(11, ecMap.size());
181 ecMap.remove(KEY1);
182 assertEquals(11, ecMap.size());
183 }
184
185 @Test
186 public void testIsEmpty() throws Exception {
187 expectAnyMessage(clusterCommunicator);
188
189 assertTrue(ecMap.isEmpty());
190 ecMap.put(KEY1, VALUE1);
191 assertFalse(ecMap.isEmpty());
192 ecMap.remove(KEY1);
193 assertTrue(ecMap.isEmpty());
194 }
195
196 @Test
197 public void testContainsKey() throws Exception {
198 expectAnyMessage(clusterCommunicator);
199
200 assertFalse(ecMap.containsKey(KEY1));
201 ecMap.put(KEY1, VALUE1);
202 assertTrue(ecMap.containsKey(KEY1));
203 assertFalse(ecMap.containsKey(KEY2));
204 ecMap.remove(KEY1);
205 assertFalse(ecMap.containsKey(KEY1));
206 }
207
208 @Test
209 public void testContainsValue() throws Exception {
210 expectAnyMessage(clusterCommunicator);
211
212 assertFalse(ecMap.containsValue(VALUE1));
213 ecMap.put(KEY1, VALUE1);
214 assertTrue(ecMap.containsValue(VALUE1));
215 assertFalse(ecMap.containsValue(VALUE2));
216 ecMap.put(KEY1, VALUE2);
217 assertFalse(ecMap.containsValue(VALUE1));
218 assertTrue(ecMap.containsValue(VALUE2));
219 ecMap.remove(KEY1);
220 assertFalse(ecMap.containsValue(VALUE2));
221 }
222
223 @Test
224 public void testGet() throws Exception {
225 expectAnyMessage(clusterCommunicator);
226
227 CountDownLatch latch;
228
229 // Local put
230 assertNull(ecMap.get(KEY1));
231 ecMap.put(KEY1, VALUE1);
232 assertEquals(VALUE1, ecMap.get(KEY1));
233
234 // Remote put
235 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800236 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800237
238 // Create a latch so we know when the put operation has finished
239 latch = new CountDownLatch(1);
240 ecMap.addListener(new TestListener(latch));
241
242 assertNull(ecMap.get(KEY2));
243 putHandler.handle(message);
244 assertTrue("External listener never got notified of internal event",
245 latch.await(100, TimeUnit.MILLISECONDS));
246 assertEquals(VALUE2, ecMap.get(KEY2));
247
248 // Local remove
249 ecMap.remove(KEY2);
250 assertNull(ecMap.get(KEY2));
251
252 // Remote remove
253 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800254 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800255
256 // Create a latch so we know when the remove operation has finished
257 latch = new CountDownLatch(1);
258 ecMap.addListener(new TestListener(latch));
259
260 removeHandler.handle(removeMessage);
261 assertTrue("External listener never got notified of internal event",
262 latch.await(100, TimeUnit.MILLISECONDS));
263 assertNull(ecMap.get(KEY1));
264 }
265
266 @Test
267 public void testPut() throws Exception {
268 // Set up expectations of external events to be sent to listeners during
269 // the test. These don't use timestamps so we can set them all up at once.
270 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800271 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800272 listener.event(new EventuallyConsistentMapEvent<>(
273 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
274 listener.event(new EventuallyConsistentMapEvent<>(
275 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
276 replay(listener);
277
278 ecMap.addListener(listener);
279
280 // Set up expected internal message to be broadcast to peers on first put
281 expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
282 .peekAtNextTimestamp()), clusterCommunicator);
283
284 // Put first value
285 assertNull(ecMap.get(KEY1));
286 ecMap.put(KEY1, VALUE1);
287 assertEquals(VALUE1, ecMap.get(KEY1));
288
289 verify(clusterCommunicator);
290
291 // Set up expected internal message to be broadcast to peers on second put
292 expectSpecificMessage(generatePutMessage(
293 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
294
295 // Update same key to a new value
296 ecMap.put(KEY1, VALUE2);
297 assertEquals(VALUE2, ecMap.get(KEY1));
298
299 verify(clusterCommunicator);
300
301 // Do a put with a older timestamp than the value already there.
302 // The map data should not be changed and no notifications should be sent.
303 reset(clusterCommunicator);
304 replay(clusterCommunicator);
305
306 clockService.turnBackTime();
307 ecMap.put(KEY1, VALUE1);
308 // Value should not have changed.
309 assertEquals(VALUE2, ecMap.get(KEY1));
310
311 verify(clusterCommunicator);
312
313 // Check that our listener received the correct events during the test
314 verify(listener);
315 }
316
317 @Test
318 public void testRemove() throws Exception {
319 // Set up expectations of external events to be sent to listeners during
320 // the test. These don't use timestamps so we can set them all up at once.
321 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800322 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800323 listener.event(new EventuallyConsistentMapEvent<>(
324 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
325 expectLastCall().times(2);
326 listener.event(new EventuallyConsistentMapEvent<>(
327 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
328 listener.event(new EventuallyConsistentMapEvent<>(
329 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
330 replay(listener);
331
332 ecMap.addListener(listener);
333
334 // Put in an initial value
335 expectAnyMessage(clusterCommunicator);
336 ecMap.put(KEY1, VALUE1);
337 assertEquals(VALUE1, ecMap.get(KEY1));
338
339 // Remove the value and check the correct internal cluster messages
340 // are sent
341 expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
342 clusterCommunicator);
343
344 ecMap.remove(KEY1);
345 assertNull(ecMap.get(KEY1));
346
347 verify(clusterCommunicator);
348
349 // Remove the same value again. Even though the value is no longer in
350 // the map, we expect that the tombstone is updated and another remove
351 // event is sent to the cluster and external listeners.
352 expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
353 clusterCommunicator);
354
355 ecMap.remove(KEY1);
356 assertNull(ecMap.get(KEY1));
357
358 verify(clusterCommunicator);
359
360
361 // Put in a new value for us to try and remove
362 expectAnyMessage(clusterCommunicator);
363
364 ecMap.put(KEY2, VALUE2);
365
366 clockService.turnBackTime();
367
368 // Remove should have no effect, since it has an older timestamp than
369 // the put. Expect no notifications to be sent out
370 reset(clusterCommunicator);
371 replay(clusterCommunicator);
372
373 ecMap.remove(KEY2);
374
375 verify(clusterCommunicator);
376
377 // Check that our listener received the correct events during the test
378 verify(listener);
379 }
380
381 @Test
382 public void testPutAll() throws Exception {
383 // putAll() with an empty map is a no-op - no messages will be sent
384 reset(clusterCommunicator);
385 replay(clusterCommunicator);
386
387 ecMap.putAll(new HashMap<>());
388
389 verify(clusterCommunicator);
390
391 // Set up the listener with our expected events
392 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800393 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800394 listener.event(new EventuallyConsistentMapEvent<>(
395 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
396 listener.event(new EventuallyConsistentMapEvent<>(
397 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
398 replay(listener);
399
400 ecMap.addListener(listener);
401
402 // Expect a multi-update inter-instance message
403 expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
404 clusterCommunicator);
405
406 Map<String, String> putAllValues = new HashMap<>();
407 putAllValues.put(KEY1, VALUE1);
408 putAllValues.put(KEY2, VALUE2);
409
410 // Put the values in the map
411 ecMap.putAll(putAllValues);
412
413 // Check the correct messages and events were sent
414 verify(clusterCommunicator);
415 verify(listener);
416 }
417
418 @Test
419 public void testClear() throws Exception {
420 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800421 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800422 listener.event(new EventuallyConsistentMapEvent<>(
423 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
424 listener.event(new EventuallyConsistentMapEvent<>(
425 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
426 replay(listener);
427
428 // clear() on an empty map is a no-op - no messages will be sent
429 reset(clusterCommunicator);
430 replay(clusterCommunicator);
431
432 assertTrue(ecMap.isEmpty());
433 ecMap.clear();
434 verify(clusterCommunicator);
435
436 // Put some items in the map
437 expectAnyMessage(clusterCommunicator);
438 ecMap.put(KEY1, VALUE1);
439 ecMap.put(KEY2, VALUE2);
440
441 ecMap.addListener(listener);
442 expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
443
444 ecMap.clear();
445
446 verify(clusterCommunicator);
447 verify(listener);
448 }
449
450 @Test
451 public void testKeySet() throws Exception {
452 expectAnyMessage(clusterCommunicator);
453
454 assertTrue(ecMap.keySet().isEmpty());
455
456 // Generate some keys
457 Set<String> keys = new HashSet<>();
458 for (int i = 1; i <= 10; i++) {
459 keys.add("" + i);
460 }
461
462 // Put each key in the map
463 keys.forEach(k -> ecMap.put(k, "value" + k));
464
465 // Check keySet() returns the correct value
466 assertEquals(keys, ecMap.keySet());
467
468 // Update the value for one of the keys
469 ecMap.put(keys.iterator().next(), "new-value");
470
471 // Check the key set is still the same
472 assertEquals(keys, ecMap.keySet());
473
474 // Remove a key
475 String removeKey = keys.iterator().next();
476 keys.remove(removeKey);
477 ecMap.remove(removeKey);
478
479 // Check the key set is still correct
480 assertEquals(keys, ecMap.keySet());
481 }
482
483 @Test
484 public void testValues() throws Exception {
485 expectAnyMessage(clusterCommunicator);
486
487 assertTrue(ecMap.values().isEmpty());
488
489 // Generate some values
490 Map<String, String> expectedValues = new HashMap<>();
491 for (int i = 1; i <= 10; i++) {
492 expectedValues.put("" + i, "value" + i);
493 }
494
495 // Add them into the map
496 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
497
498 // Check the values collection is correct
499 assertEquals(expectedValues.values().size(), ecMap.values().size());
500 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
501
502 // Update the value for one of the keys
503 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
504 expectedValues.put(first.getKey(), "new-value");
505 ecMap.put(first.getKey(), "new-value");
506
507 // Check the values collection is still correct
508 assertEquals(expectedValues.values().size(), ecMap.values().size());
509 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
510
511 // Remove a key
512 String removeKey = expectedValues.keySet().iterator().next();
513 expectedValues.remove(removeKey);
514 ecMap.remove(removeKey);
515
516 // Check the values collection is still correct
517 assertEquals(expectedValues.values().size(), ecMap.values().size());
518 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
519 }
520
521 @Test
522 public void testEntrySet() throws Exception {
523 expectAnyMessage(clusterCommunicator);
524
525 assertTrue(ecMap.entrySet().isEmpty());
526
527 // Generate some values
528 Map<String, String> expectedValues = new HashMap<>();
529 for (int i = 1; i <= 10; i++) {
530 expectedValues.put("" + i, "value" + i);
531 }
532
533 // Add them into the map
534 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
535
536 // Check the entry set is correct
537 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
538
539 // Update the value for one of the keys
540 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
541 expectedValues.put(first.getKey(), "new-value");
542 ecMap.put(first.getKey(), "new-value");
543
544 // Check the entry set is still correct
545 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
546
547 // Remove a key
548 String removeKey = expectedValues.keySet().iterator().next();
549 expectedValues.remove(removeKey);
550 ecMap.remove(removeKey);
551
552 // Check the entry set is still correct
553 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
554 }
555
556 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
557 if (expectedMap.entrySet().size() != actual.size()) {
558 return false;
559 }
560
561 for (Map.Entry<String, String> e : actual) {
562 if (!expectedMap.containsKey(e.getKey())) {
563 return false;
564 }
565 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
566 return false;
567 }
568 }
569 return true;
570 }
571
572 @Test
573 public void testDestroy() throws Exception {
574 clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
575 clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
576 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
577
578 replay(clusterCommunicator);
579
580 ecMap.destroy();
581
582 verify(clusterCommunicator);
583
584 try {
585 ecMap.get(KEY1);
586 fail("get after destroy should throw exception");
587 } catch (IllegalStateException e) {
588 assertTrue(true);
589 }
590
591 try {
592 ecMap.put(KEY1, VALUE1);
593 fail("put after destroy should throw exception");
594 } catch (IllegalStateException e) {
595 assertTrue(true);
596 }
597 }
598
599 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800600 InternalPutEvent<String, String> event =
601 new InternalPutEvent<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800602
603 return new ClusterMessage(
604 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
605 SERIALIZER.encode(event));
606 }
607
608 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800609 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800610
611 Timestamp timestamp1 = clockService.peek(1);
612 Timestamp timestamp2 = clockService.peek(2);
613
Jonathan Hartf9108232015-02-02 16:37:35 -0800614 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
615 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800616
617 list.add(pe1);
618 list.add(pe2);
619
Jonathan Hartf9108232015-02-02 16:37:35 -0800620 InternalPutEvent<String, String> event = new InternalPutEvent<>(list);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800621
622 return new ClusterMessage(
623 clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
624 SERIALIZER.encode(event));
625 }
626
627 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800628 InternalRemoveEvent<String> event = new InternalRemoveEvent<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800629
630 return new ClusterMessage(
631 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
632 SERIALIZER.encode(event));
633 }
634
635 private ClusterMessage generateRemoveMessage(String key1, String key2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800636 ArrayList<RemoveEntry<String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800637
638 Timestamp timestamp1 = clockService.peek(1);
639 Timestamp timestamp2 = clockService.peek(2);
640
Jonathan Hartf9108232015-02-02 16:37:35 -0800641 RemoveEntry<String> re1 = new RemoveEntry<>(key1, timestamp1);
642 RemoveEntry<String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800643
644 list.add(re1);
645 list.add(re2);
646
Jonathan Hartf9108232015-02-02 16:37:35 -0800647 InternalRemoveEvent<String> event = new InternalRemoveEvent<>(list);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800648
649 return new ClusterMessage(
650 clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
651 SERIALIZER.encode(event));
652 }
653
654 /**
655 * Sets up a mock ClusterCommunicationService to expect a specific cluster
656 * message to be broadcast to the cluster.
657 *
658 * @param m message we expect to be sent
659 * @param clusterCommunicator a mock ClusterCommunicationService to set up
660 */
661 private static void expectSpecificMessage(ClusterMessage m,
662 ClusterCommunicationService clusterCommunicator) {
663 reset(clusterCommunicator);
664 expect(clusterCommunicator.broadcast(m)).andReturn(true);
665 replay(clusterCommunicator);
666 }
667
668 /**
669 * Sets up a mock ClusterCommunicationService to expect any cluster message
670 * that is sent to it. This is useful for unit tests where we aren't
671 * interested in testing the messaging component.
672 *
673 * @param clusterCommunicator a mock ClusterCommunicationService to set up
674 */
675 private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
676 reset(clusterCommunicator);
677 expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
678 .andReturn(true)
679 .anyTimes();
680 replay(clusterCommunicator);
681 }
682
683 /**
684 * ClusterCommunicationService implementation that the map's addSubscriber
685 * call will delegate to. This means we can get a reference to the
686 * internal cluster message handler used by the map, so that we can simulate
687 * events coming in from other instances.
688 */
689 private final class TestClusterCommunicationService
690 implements ClusterCommunicationService {
691
692 @Override
693 public boolean broadcast(ClusterMessage message) {
694 return false;
695 }
696
697 @Override
698 public boolean broadcastIncludeSelf(ClusterMessage message) {
699 return false;
700 }
701
702 @Override
703 public boolean unicast(ClusterMessage message, NodeId toNodeId)
704 throws IOException {
705 return false;
706 }
707
708 @Override
709 public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
710 return false;
711 }
712
713 @Override
714 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
715 NodeId toNodeId)
716 throws IOException {
717 return null;
718 }
719
720 @Override
721 public void addSubscriber(MessageSubject subject,
722 ClusterMessageHandler subscriber) {
723 if (subject.equals(PUT_MESSAGE_SUBJECT)) {
724 putHandler = subscriber;
725 } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
726 removeHandler = subscriber;
727 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
728 antiEntropyHandler = subscriber;
729 } else {
730 throw new RuntimeException("Unexpected message subject " + subject.toString());
731 }
732 }
733
734 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800735 public void addSubscriber(MessageSubject subject,
736 ClusterMessageHandler subscriber,
737 ExecutorService executor) {
738 if (subject.equals(PUT_MESSAGE_SUBJECT)) {
739 putHandler = subscriber;
740 } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
741 removeHandler = subscriber;
742 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
743 antiEntropyHandler = subscriber;
744 } else {
745 throw new RuntimeException("Unexpected message subject " + subject.toString());
746 }
747 }
748
749 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800750 public void removeSubscriber(MessageSubject subject) {}
751 }
752
753 /**
754 * ClockService implementation that gives out timestamps based on a
755 * sequential counter. This clock service enables more control over the
756 * timestamps that are given out, including being able to "turn back time"
757 * to give out timestamps from the past.
758 *
759 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800760 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800761 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800762 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800763
764 private static final long INITIAL_VALUE = 1;
765 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
766
767 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800768 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800769 return new TestTimestamp(counter.getAndIncrement());
770 }
771
772 /**
773 * Returns what the next timestamp will be without consuming the
774 * timestamp. This allows test code to set expectations correctly while
775 * still allowing the CUT to get the same timestamp.
776 *
777 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800778 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800779 */
780 public Timestamp peekAtNextTimestamp() {
781 return peek(1);
782 }
783
784 /**
785 * Returns the ith timestamp to be given out in the future without
786 * consuming the timestamp. For example, i=1 returns the next timestamp,
787 * i=2 returns the timestamp after that, and so on.
788 *
789 * @param i number of the timestamp to peek at
790 * @return the ith timestamp that will be given out
791 */
792 public Timestamp peek(int i) {
793 checkArgument(i > 0, "i must be a positive integer");
794
795 return new TestTimestamp(counter.get() + i - 1);
796 }
797
798 /**
799 * Turns the clock back two ticks, so the next call to getTimestamp will
800 * return an older timestamp than the previous call to getTimestamp.
801 */
802 public void turnBackTime() {
803 // Not atomic, but should be OK for these tests.
804 counter.decrementAndGet();
805 counter.decrementAndGet();
806 }
807
808 }
809
810 /**
811 * Timestamp implementation where the value of the timestamp can be
812 * specified explicitly at creation time.
813 */
814 private class TestTimestamp implements Timestamp {
815
816 private final long timestamp;
817
818 /**
819 * Creates a new timestamp that has the specified value.
820 *
821 * @param timestamp value of the timestamp
822 */
823 public TestTimestamp(long timestamp) {
824 this.timestamp = timestamp;
825 }
826
827 @Override
828 public int compareTo(Timestamp o) {
829 checkArgument(o instanceof TestTimestamp);
830 TestTimestamp otherTimestamp = (TestTimestamp) o;
831 return ComparisonChain.start()
832 .compare(this.timestamp, otherTimestamp.timestamp)
833 .result();
834 }
835 }
836
837 /**
838 * EventuallyConsistentMapListener implementation which triggers a latch
839 * when it receives an event.
840 */
841 private class TestListener implements EventuallyConsistentMapListener<String, String> {
842 private CountDownLatch latch;
843
844 /**
845 * Creates a new listener that will trigger the specified latch when it
846 * receives and event.
847 *
848 * @param latch the latch to trigger on events
849 */
850 public TestListener(CountDownLatch latch) {
851 this.latch = latch;
852 }
853
854 @Override
855 public void event(EventuallyConsistentMapEvent<String, String> event) {
856 latch.countDown();
857 }
858 }
859}