blob: f7c9323f0945f48396abee3b7468bc63661055de [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080019import com.google.common.collect.ImmutableSet;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080020import com.google.common.collect.Lists;
Jonathan Hart584d2f32015-01-27 19:46:14 -080021import com.google.common.util.concurrent.ListenableFuture;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080022import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080023import org.junit.After;
24import org.junit.Before;
25import org.junit.Test;
26import org.onlab.packet.IpAddress;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.DefaultControllerNode;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.store.Timestamp;
33import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
34import org.onosproject.store.cluster.messaging.ClusterMessage;
35import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
36import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart77bdd262015-02-03 09:07:48 -080037import org.onosproject.store.impl.ClockService;
38import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080039import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.serializers.KryoSerializer;
41
42import java.io.IOException;
43import java.util.ArrayList;
44import java.util.HashMap;
45import java.util.HashSet;
46import java.util.Map;
47import java.util.Objects;
48import java.util.Set;
49import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080050import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080051import java.util.concurrent.TimeUnit;
52import java.util.concurrent.atomic.AtomicLong;
53
54import static com.google.common.base.Preconditions.checkArgument;
55import static junit.framework.TestCase.assertFalse;
56import static org.easymock.EasyMock.*;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080057import static org.junit.Assert.*;
Jonathan Hart584d2f32015-01-27 19:46:14 -080058
59/**
60 * Unit tests for EventuallyConsistentMapImpl.
61 */
62public class EventuallyConsistentMapImplTest {
63
64 private EventuallyConsistentMap<String, String> ecMap;
65
66 private ClusterService clusterService;
67 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080068 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080069
70 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080071 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080072 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080073 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
74 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
75
76 private static final String KEY1 = "one";
77 private static final String KEY2 = "two";
78 private static final String VALUE1 = "oneValue";
79 private static final String VALUE2 = "twoValue";
80
81 private final ControllerNode self =
82 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
83
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080084 private ClusterMessageHandler updateHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -080085 private ClusterMessageHandler antiEntropyHandler;
86
87 /*
88 * Serialization is a bit tricky here. We need to serialize in the tests
89 * to set the expectations, which will use this serializer here, but the
90 * EventuallyConsistentMap will use its own internal serializer. This means
91 * this serializer must be set up exactly the same as map's internal
92 * serializer.
93 */
94 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
95 @Override
96 protected void setupKryoPool() {
97 serializerPool = KryoNamespace.newBuilder()
98 // Classes we give to the map
99 .register(KryoNamespaces.API)
100 .register(TestTimestamp.class)
101 // Below is the classes that the map internally registers
102 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800103 .register(PutEntry.class)
104 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800105 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800106 .register(AntiEntropyAdvertisement.class)
107 .register(HashMap.class)
108 .build();
109 }
110 };
111
112 @Before
113 public void setUp() throws Exception {
114 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800115 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
116 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800117 replay(clusterService);
118
119 clusterCommunicator = createMock(ClusterCommunicationService.class);
120
121 // Add expectation for adding cluster message subscribers which
122 // delegate to our ClusterCommunicationService implementation. This
123 // allows us to get a reference to the map's internal cluster message
124 // handlers so we can induce events coming in from a peer.
125 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
Madan Jampani2af244a2015-02-22 13:12:01 -0800126 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800127 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
128
129 replay(clusterCommunicator);
130
131 clockService = new SequentialClockService<>();
132
133 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
134 .register(KryoNamespaces.API)
135 .register(TestTimestamp.class);
136
137 ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
138 clusterCommunicator,
Madan Jampanib28e4ad2015-02-19 12:31:37 -0800139 serializer, clockService)
140 .withBroadcastMessageExecutor(MoreExecutors.newDirectExecutorService());
Jonathan Hart584d2f32015-01-27 19:46:14 -0800141
142 // Reset ready for tests to add their own expectations
143 reset(clusterCommunicator);
144 }
145
146 @After
147 public void tearDown() {
148 reset(clusterCommunicator);
149 ecMap.destroy();
150 }
151
Ray Milkey8dc82082015-02-20 16:22:38 -0800152 @SuppressWarnings("unchecked")
153 private EventuallyConsistentMapListener<String, String> getListener() {
154 return createMock(EventuallyConsistentMapListener.class);
155 }
156
Jonathan Hart584d2f32015-01-27 19:46:14 -0800157 @Test
158 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800159 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800160
161 assertEquals(0, ecMap.size());
162 ecMap.put(KEY1, VALUE1);
163 assertEquals(1, ecMap.size());
164 ecMap.put(KEY1, VALUE2);
165 assertEquals(1, ecMap.size());
166 ecMap.put(KEY2, VALUE2);
167 assertEquals(2, ecMap.size());
168 for (int i = 0; i < 10; i++) {
169 ecMap.put("" + i, "" + i);
170 }
171 assertEquals(12, ecMap.size());
172 ecMap.remove(KEY1);
173 assertEquals(11, ecMap.size());
174 ecMap.remove(KEY1);
175 assertEquals(11, ecMap.size());
176 }
177
178 @Test
179 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800180 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800181
182 assertTrue(ecMap.isEmpty());
183 ecMap.put(KEY1, VALUE1);
184 assertFalse(ecMap.isEmpty());
185 ecMap.remove(KEY1);
186 assertTrue(ecMap.isEmpty());
187 }
188
189 @Test
190 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800191 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800192
193 assertFalse(ecMap.containsKey(KEY1));
194 ecMap.put(KEY1, VALUE1);
195 assertTrue(ecMap.containsKey(KEY1));
196 assertFalse(ecMap.containsKey(KEY2));
197 ecMap.remove(KEY1);
198 assertFalse(ecMap.containsKey(KEY1));
199 }
200
201 @Test
202 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800203 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800204
205 assertFalse(ecMap.containsValue(VALUE1));
206 ecMap.put(KEY1, VALUE1);
207 assertTrue(ecMap.containsValue(VALUE1));
208 assertFalse(ecMap.containsValue(VALUE2));
209 ecMap.put(KEY1, VALUE2);
210 assertFalse(ecMap.containsValue(VALUE1));
211 assertTrue(ecMap.containsValue(VALUE2));
212 ecMap.remove(KEY1);
213 assertFalse(ecMap.containsValue(VALUE2));
214 }
215
216 @Test
217 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800218 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800219
220 CountDownLatch latch;
221
222 // Local put
223 assertNull(ecMap.get(KEY1));
224 ecMap.put(KEY1, VALUE1);
225 assertEquals(VALUE1, ecMap.get(KEY1));
226
227 // Remote put
228 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800229 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800230
231 // Create a latch so we know when the put operation has finished
232 latch = new CountDownLatch(1);
233 ecMap.addListener(new TestListener(latch));
234
235 assertNull(ecMap.get(KEY2));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800236 updateHandler.handle(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800237 assertTrue("External listener never got notified of internal event",
238 latch.await(100, TimeUnit.MILLISECONDS));
239 assertEquals(VALUE2, ecMap.get(KEY2));
240
241 // Local remove
242 ecMap.remove(KEY2);
243 assertNull(ecMap.get(KEY2));
244
245 // Remote remove
246 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800247 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800248
249 // Create a latch so we know when the remove operation has finished
250 latch = new CountDownLatch(1);
251 ecMap.addListener(new TestListener(latch));
252
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800253 updateHandler.handle(removeMessage);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800254 assertTrue("External listener never got notified of internal event",
255 latch.await(100, TimeUnit.MILLISECONDS));
256 assertNull(ecMap.get(KEY1));
257 }
258
259 @Test
260 public void testPut() throws Exception {
261 // Set up expectations of external events to be sent to listeners during
262 // the test. These don't use timestamps so we can set them all up at once.
263 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800264 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800265 listener.event(new EventuallyConsistentMapEvent<>(
266 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
267 listener.event(new EventuallyConsistentMapEvent<>(
268 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
269 replay(listener);
270
271 ecMap.addListener(listener);
272
273 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800274 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Jonathan Hart584d2f32015-01-27 19:46:14 -0800275 .peekAtNextTimestamp()), clusterCommunicator);
276
277 // Put first value
278 assertNull(ecMap.get(KEY1));
279 ecMap.put(KEY1, VALUE1);
280 assertEquals(VALUE1, ecMap.get(KEY1));
281
282 verify(clusterCommunicator);
283
284 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800285 expectSpecificMulticastMessage(generatePutMessage(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800286 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
287
288 // Update same key to a new value
289 ecMap.put(KEY1, VALUE2);
290 assertEquals(VALUE2, ecMap.get(KEY1));
291
292 verify(clusterCommunicator);
293
294 // Do a put with a older timestamp than the value already there.
295 // The map data should not be changed and no notifications should be sent.
296 reset(clusterCommunicator);
297 replay(clusterCommunicator);
298
299 clockService.turnBackTime();
300 ecMap.put(KEY1, VALUE1);
301 // Value should not have changed.
302 assertEquals(VALUE2, ecMap.get(KEY1));
303
304 verify(clusterCommunicator);
305
306 // Check that our listener received the correct events during the test
307 verify(listener);
308 }
309
310 @Test
311 public void testRemove() throws Exception {
312 // Set up expectations of external events to be sent to listeners during
313 // the test. These don't use timestamps so we can set them all up at once.
314 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800315 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800316 listener.event(new EventuallyConsistentMapEvent<>(
317 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
318 expectLastCall().times(2);
319 listener.event(new EventuallyConsistentMapEvent<>(
320 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
321 listener.event(new EventuallyConsistentMapEvent<>(
322 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
323 replay(listener);
324
325 ecMap.addListener(listener);
326
327 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800328 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800329 ecMap.put(KEY1, VALUE1);
330 assertEquals(VALUE1, ecMap.get(KEY1));
331
332 // Remove the value and check the correct internal cluster messages
333 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800334 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
335 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800336
337 ecMap.remove(KEY1);
338 assertNull(ecMap.get(KEY1));
339
340 verify(clusterCommunicator);
341
342 // Remove the same value again. Even though the value is no longer in
343 // the map, we expect that the tombstone is updated and another remove
344 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800345 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
346 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800347
348 ecMap.remove(KEY1);
349 assertNull(ecMap.get(KEY1));
350
351 verify(clusterCommunicator);
352
353
354 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800355 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800356
357 ecMap.put(KEY2, VALUE2);
358
359 clockService.turnBackTime();
360
361 // Remove should have no effect, since it has an older timestamp than
362 // the put. Expect no notifications to be sent out
363 reset(clusterCommunicator);
364 replay(clusterCommunicator);
365
366 ecMap.remove(KEY2);
367
368 verify(clusterCommunicator);
369
370 // Check that our listener received the correct events during the test
371 verify(listener);
372 }
373
374 @Test
375 public void testPutAll() throws Exception {
376 // putAll() with an empty map is a no-op - no messages will be sent
377 reset(clusterCommunicator);
378 replay(clusterCommunicator);
379
380 ecMap.putAll(new HashMap<>());
381
382 verify(clusterCommunicator);
383
384 // Set up the listener with our expected events
385 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800386 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800387 listener.event(new EventuallyConsistentMapEvent<>(
388 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
389 listener.event(new EventuallyConsistentMapEvent<>(
390 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
391 replay(listener);
392
393 ecMap.addListener(listener);
394
395 // Expect a multi-update inter-instance message
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800396 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
397 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800398
399 Map<String, String> putAllValues = new HashMap<>();
400 putAllValues.put(KEY1, VALUE1);
401 putAllValues.put(KEY2, VALUE2);
402
403 // Put the values in the map
404 ecMap.putAll(putAllValues);
405
406 // Check the correct messages and events were sent
407 verify(clusterCommunicator);
408 verify(listener);
409 }
410
411 @Test
412 public void testClear() throws Exception {
413 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800414 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800415 listener.event(new EventuallyConsistentMapEvent<>(
416 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
417 listener.event(new EventuallyConsistentMapEvent<>(
418 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
419 replay(listener);
420
421 // clear() on an empty map is a no-op - no messages will be sent
422 reset(clusterCommunicator);
423 replay(clusterCommunicator);
424
425 assertTrue(ecMap.isEmpty());
426 ecMap.clear();
427 verify(clusterCommunicator);
428
429 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800430 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800431 ecMap.put(KEY1, VALUE1);
432 ecMap.put(KEY2, VALUE2);
433
434 ecMap.addListener(listener);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800435 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800436
437 ecMap.clear();
438
439 verify(clusterCommunicator);
440 verify(listener);
441 }
442
443 @Test
444 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800445 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800446
447 assertTrue(ecMap.keySet().isEmpty());
448
449 // Generate some keys
450 Set<String> keys = new HashSet<>();
451 for (int i = 1; i <= 10; i++) {
452 keys.add("" + i);
453 }
454
455 // Put each key in the map
456 keys.forEach(k -> ecMap.put(k, "value" + k));
457
458 // Check keySet() returns the correct value
459 assertEquals(keys, ecMap.keySet());
460
461 // Update the value for one of the keys
462 ecMap.put(keys.iterator().next(), "new-value");
463
464 // Check the key set is still the same
465 assertEquals(keys, ecMap.keySet());
466
467 // Remove a key
468 String removeKey = keys.iterator().next();
469 keys.remove(removeKey);
470 ecMap.remove(removeKey);
471
472 // Check the key set is still correct
473 assertEquals(keys, ecMap.keySet());
474 }
475
476 @Test
477 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800478 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800479
480 assertTrue(ecMap.values().isEmpty());
481
482 // Generate some values
483 Map<String, String> expectedValues = new HashMap<>();
484 for (int i = 1; i <= 10; i++) {
485 expectedValues.put("" + i, "value" + i);
486 }
487
488 // Add them into the map
489 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
490
491 // Check the values collection is correct
492 assertEquals(expectedValues.values().size(), ecMap.values().size());
493 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
494
495 // Update the value for one of the keys
496 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
497 expectedValues.put(first.getKey(), "new-value");
498 ecMap.put(first.getKey(), "new-value");
499
500 // Check the values collection is still correct
501 assertEquals(expectedValues.values().size(), ecMap.values().size());
502 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
503
504 // Remove a key
505 String removeKey = expectedValues.keySet().iterator().next();
506 expectedValues.remove(removeKey);
507 ecMap.remove(removeKey);
508
509 // Check the values collection is still correct
510 assertEquals(expectedValues.values().size(), ecMap.values().size());
511 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
512 }
513
514 @Test
515 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800516 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800517
518 assertTrue(ecMap.entrySet().isEmpty());
519
520 // Generate some values
521 Map<String, String> expectedValues = new HashMap<>();
522 for (int i = 1; i <= 10; i++) {
523 expectedValues.put("" + i, "value" + i);
524 }
525
526 // Add them into the map
527 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
528
529 // Check the entry set is correct
530 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
531
532 // Update the value for one of the keys
533 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
534 expectedValues.put(first.getKey(), "new-value");
535 ecMap.put(first.getKey(), "new-value");
536
537 // Check the entry set is still correct
538 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
539
540 // Remove a key
541 String removeKey = expectedValues.keySet().iterator().next();
542 expectedValues.remove(removeKey);
543 ecMap.remove(removeKey);
544
545 // Check the entry set is still correct
546 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
547 }
548
549 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
550 if (expectedMap.entrySet().size() != actual.size()) {
551 return false;
552 }
553
554 for (Map.Entry<String, String> e : actual) {
555 if (!expectedMap.containsKey(e.getKey())) {
556 return false;
557 }
558 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
559 return false;
560 }
561 }
562 return true;
563 }
564
565 @Test
566 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800567 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800568 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
569
570 replay(clusterCommunicator);
571
572 ecMap.destroy();
573
574 verify(clusterCommunicator);
575
576 try {
577 ecMap.get(KEY1);
578 fail("get after destroy should throw exception");
579 } catch (IllegalStateException e) {
580 assertTrue(true);
581 }
582
583 try {
584 ecMap.put(KEY1, VALUE1);
585 fail("put after destroy should throw exception");
586 } catch (IllegalStateException e) {
587 assertTrue(true);
588 }
589 }
590
591 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800592 PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800593
594 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800595 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
596 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800597 }
598
599 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800600 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800601
602 Timestamp timestamp1 = clockService.peek(1);
603 Timestamp timestamp2 = clockService.peek(2);
604
Jonathan Hartf9108232015-02-02 16:37:35 -0800605 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
606 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800607
608 list.add(pe1);
609 list.add(pe2);
610
Jonathan Hart584d2f32015-01-27 19:46:14 -0800611
612 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800613 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
614 SERIALIZER.encode(list));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800615 }
616
617 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800618 RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800619
620 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800621 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
622 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800623 }
624
625 private ClusterMessage generateRemoveMessage(String key1, String key2) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800626 ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800627
628 Timestamp timestamp1 = clockService.peek(1);
629 Timestamp timestamp2 = clockService.peek(2);
630
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800631 RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
632 RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800633
634 list.add(re1);
635 list.add(re2);
636
Jonathan Hart584d2f32015-01-27 19:46:14 -0800637 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800638 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
639 SERIALIZER.encode(list));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800640 }
641
642 /**
643 * Sets up a mock ClusterCommunicationService to expect a specific cluster
644 * message to be broadcast to the cluster.
645 *
646 * @param m message we expect to be sent
647 * @param clusterCommunicator a mock ClusterCommunicationService to set up
648 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800649 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800650 private static void expectSpecificBroadcastMessage(ClusterMessage m,
651 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800652 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800653// expect(clusterCommunicator.broadcast(m)).andReturn(true);
654 expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
655 .andReturn(true)
656 .anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800657 replay(clusterCommunicator);
658 }
659
660 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800661 * Sets up a mock ClusterCommunicationService to expect a specific cluster
662 * message to be multicast to the cluster.
663 *
664 * @param m message we expect to be sent
665 * @param clusterCommunicator a mock ClusterCommunicationService to set up
666 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800667 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800668 private static void expectSpecificMulticastMessage(ClusterMessage m,
669 ClusterCommunicationService clusterCommunicator) {
670 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800671// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
672 expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
673 .andReturn(true)
674 .anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800675 replay(clusterCommunicator);
676 }
677
678
679 /**
680 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800681 * that is sent to it. This is useful for unit tests where we aren't
682 * interested in testing the messaging component.
683 *
684 * @param clusterCommunicator a mock ClusterCommunicationService to set up
685 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800686 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800687 private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
688 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800689// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
690// anyObject(Iterable.class)))
691 expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
692 anyObject(NodeId.class)))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800693 .andReturn(true)
694 .anyTimes();
695 replay(clusterCommunicator);
696 }
697
698 /**
699 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
700 * that is sent to it. This is useful for unit tests where we aren't
701 * interested in testing the messaging component.
702 *
703 * @param clusterCommunicator a mock ClusterCommunicationService to set up
704 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800705 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800706 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800707 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800708// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
709// .andReturn(true)
710// .anyTimes();
711 expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
Jonathan Hart584d2f32015-01-27 19:46:14 -0800712 .andReturn(true)
713 .anyTimes();
714 replay(clusterCommunicator);
715 }
716
717 /**
718 * ClusterCommunicationService implementation that the map's addSubscriber
719 * call will delegate to. This means we can get a reference to the
720 * internal cluster message handler used by the map, so that we can simulate
721 * events coming in from other instances.
722 */
723 private final class TestClusterCommunicationService
724 implements ClusterCommunicationService {
725
726 @Override
727 public boolean broadcast(ClusterMessage message) {
728 return false;
729 }
730
731 @Override
732 public boolean broadcastIncludeSelf(ClusterMessage message) {
733 return false;
734 }
735
736 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800737 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800738 return false;
739 }
740
741 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800742 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800743 return false;
744 }
745
746 @Override
747 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
748 NodeId toNodeId)
749 throws IOException {
750 return null;
751 }
752
753 @Override
754 public void addSubscriber(MessageSubject subject,
755 ClusterMessageHandler subscriber) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800756 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
757 updateHandler = subscriber;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800758 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
759 antiEntropyHandler = subscriber;
760 } else {
761 throw new RuntimeException("Unexpected message subject " + subject.toString());
762 }
763 }
764
765 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800766 public void addSubscriber(MessageSubject subject,
767 ClusterMessageHandler subscriber,
768 ExecutorService executor) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800769 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
770 updateHandler = subscriber;
Madan Jampani2af244a2015-02-22 13:12:01 -0800771 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
772 antiEntropyHandler = subscriber;
773 } else {
774 throw new RuntimeException("Unexpected message subject " + subject.toString());
775 }
776 }
777
778 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800779 public void removeSubscriber(MessageSubject subject) {}
780 }
781
782 /**
783 * ClockService implementation that gives out timestamps based on a
784 * sequential counter. This clock service enables more control over the
785 * timestamps that are given out, including being able to "turn back time"
786 * to give out timestamps from the past.
787 *
788 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800789 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800790 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800791 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800792
793 private static final long INITIAL_VALUE = 1;
794 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
795
796 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800797 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800798 return new TestTimestamp(counter.getAndIncrement());
799 }
800
801 /**
802 * Returns what the next timestamp will be without consuming the
803 * timestamp. This allows test code to set expectations correctly while
804 * still allowing the CUT to get the same timestamp.
805 *
806 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800807 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800808 */
809 public Timestamp peekAtNextTimestamp() {
810 return peek(1);
811 }
812
813 /**
814 * Returns the ith timestamp to be given out in the future without
815 * consuming the timestamp. For example, i=1 returns the next timestamp,
816 * i=2 returns the timestamp after that, and so on.
817 *
818 * @param i number of the timestamp to peek at
819 * @return the ith timestamp that will be given out
820 */
821 public Timestamp peek(int i) {
822 checkArgument(i > 0, "i must be a positive integer");
823
824 return new TestTimestamp(counter.get() + i - 1);
825 }
826
827 /**
828 * Turns the clock back two ticks, so the next call to getTimestamp will
829 * return an older timestamp than the previous call to getTimestamp.
830 */
831 public void turnBackTime() {
832 // Not atomic, but should be OK for these tests.
833 counter.decrementAndGet();
834 counter.decrementAndGet();
835 }
836
837 }
838
839 /**
840 * Timestamp implementation where the value of the timestamp can be
841 * specified explicitly at creation time.
842 */
843 private class TestTimestamp implements Timestamp {
844
845 private final long timestamp;
846
847 /**
848 * Creates a new timestamp that has the specified value.
849 *
850 * @param timestamp value of the timestamp
851 */
852 public TestTimestamp(long timestamp) {
853 this.timestamp = timestamp;
854 }
855
856 @Override
857 public int compareTo(Timestamp o) {
858 checkArgument(o instanceof TestTimestamp);
859 TestTimestamp otherTimestamp = (TestTimestamp) o;
860 return ComparisonChain.start()
861 .compare(this.timestamp, otherTimestamp.timestamp)
862 .result();
863 }
864 }
865
866 /**
867 * EventuallyConsistentMapListener implementation which triggers a latch
868 * when it receives an event.
869 */
870 private class TestListener implements EventuallyConsistentMapListener<String, String> {
871 private CountDownLatch latch;
872
873 /**
874 * Creates a new listener that will trigger the specified latch when it
875 * receives and event.
876 *
877 * @param latch the latch to trigger on events
878 */
879 public TestListener(CountDownLatch latch) {
880 this.latch = latch;
881 }
882
883 @Override
884 public void event(EventuallyConsistentMapEvent<String, String> event) {
885 latch.countDown();
886 }
887 }
888}