blob: 8629fcd7721c7b6de06739090d2c71296753180b [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080019import com.google.common.collect.ImmutableSet;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080020import com.google.common.collect.Lists;
Jonathan Hart584d2f32015-01-27 19:46:14 -080021import com.google.common.util.concurrent.ListenableFuture;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080022import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080023import org.junit.After;
24import org.junit.Before;
25import org.junit.Test;
26import org.onlab.packet.IpAddress;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.DefaultControllerNode;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.store.Timestamp;
33import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
34import org.onosproject.store.cluster.messaging.ClusterMessage;
35import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
36import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070037import org.onosproject.store.service.ClockService;
Jonathan Hart77bdd262015-02-03 09:07:48 -080038import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080039import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070041import org.onosproject.store.service.EventuallyConsistentMap;
42import org.onosproject.store.service.EventuallyConsistentMapEvent;
43import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hart584d2f32015-01-27 19:46:14 -080044
45import java.io.IOException;
46import java.util.ArrayList;
47import java.util.HashMap;
48import java.util.HashSet;
49import java.util.Map;
50import java.util.Objects;
51import java.util.Set;
52import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080053import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080054import java.util.concurrent.TimeUnit;
55import java.util.concurrent.atomic.AtomicLong;
56
57import static com.google.common.base.Preconditions.checkArgument;
58import static junit.framework.TestCase.assertFalse;
59import static org.easymock.EasyMock.*;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080060import static org.junit.Assert.*;
Jonathan Hart584d2f32015-01-27 19:46:14 -080061
62/**
63 * Unit tests for EventuallyConsistentMapImpl.
64 */
65public class EventuallyConsistentMapImplTest {
66
67 private EventuallyConsistentMap<String, String> ecMap;
68
69 private ClusterService clusterService;
70 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080071 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080072
73 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080074 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080075 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080076 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
77 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
78
79 private static final String KEY1 = "one";
80 private static final String KEY2 = "two";
81 private static final String VALUE1 = "oneValue";
82 private static final String VALUE2 = "twoValue";
83
84 private final ControllerNode self =
85 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
86
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080087 private ClusterMessageHandler updateHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -080088 private ClusterMessageHandler antiEntropyHandler;
89
90 /*
91 * Serialization is a bit tricky here. We need to serialize in the tests
92 * to set the expectations, which will use this serializer here, but the
93 * EventuallyConsistentMap will use its own internal serializer. This means
94 * this serializer must be set up exactly the same as map's internal
95 * serializer.
96 */
97 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
98 @Override
99 protected void setupKryoPool() {
100 serializerPool = KryoNamespace.newBuilder()
101 // Classes we give to the map
102 .register(KryoNamespaces.API)
103 .register(TestTimestamp.class)
104 // Below is the classes that the map internally registers
105 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800106 .register(PutEntry.class)
107 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800108 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800109 .register(AntiEntropyAdvertisement.class)
110 .register(HashMap.class)
111 .build();
112 }
113 };
114
115 @Before
116 public void setUp() throws Exception {
117 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800118 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
119 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800120 replay(clusterService);
121
122 clusterCommunicator = createMock(ClusterCommunicationService.class);
123
124 // Add expectation for adding cluster message subscribers which
125 // delegate to our ClusterCommunicationService implementation. This
126 // allows us to get a reference to the map's internal cluster message
127 // handlers so we can induce events coming in from a peer.
128 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
Madan Jampani2af244a2015-02-22 13:12:01 -0800129 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800130 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
131
132 replay(clusterCommunicator);
133
134 clockService = new SequentialClockService<>();
135
136 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
137 .register(KryoNamespaces.API)
138 .register(TestTimestamp.class);
139
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700140 ecMap = new EventuallyConsistentMapBuilderImpl<>(
141 clusterService, clusterCommunicator)
142 .withName(MAP_NAME)
143 .withSerializer(serializer)
144 .withClockService(clockService)
145 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
146 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800147
148 // Reset ready for tests to add their own expectations
149 reset(clusterCommunicator);
150 }
151
152 @After
153 public void tearDown() {
154 reset(clusterCommunicator);
155 ecMap.destroy();
156 }
157
Ray Milkey8dc82082015-02-20 16:22:38 -0800158 @SuppressWarnings("unchecked")
159 private EventuallyConsistentMapListener<String, String> getListener() {
160 return createMock(EventuallyConsistentMapListener.class);
161 }
162
Jonathan Hart584d2f32015-01-27 19:46:14 -0800163 @Test
164 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800165 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800166
167 assertEquals(0, ecMap.size());
168 ecMap.put(KEY1, VALUE1);
169 assertEquals(1, ecMap.size());
170 ecMap.put(KEY1, VALUE2);
171 assertEquals(1, ecMap.size());
172 ecMap.put(KEY2, VALUE2);
173 assertEquals(2, ecMap.size());
174 for (int i = 0; i < 10; i++) {
175 ecMap.put("" + i, "" + i);
176 }
177 assertEquals(12, ecMap.size());
178 ecMap.remove(KEY1);
179 assertEquals(11, ecMap.size());
180 ecMap.remove(KEY1);
181 assertEquals(11, ecMap.size());
182 }
183
184 @Test
185 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800186 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800187
188 assertTrue(ecMap.isEmpty());
189 ecMap.put(KEY1, VALUE1);
190 assertFalse(ecMap.isEmpty());
191 ecMap.remove(KEY1);
192 assertTrue(ecMap.isEmpty());
193 }
194
195 @Test
196 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800197 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800198
199 assertFalse(ecMap.containsKey(KEY1));
200 ecMap.put(KEY1, VALUE1);
201 assertTrue(ecMap.containsKey(KEY1));
202 assertFalse(ecMap.containsKey(KEY2));
203 ecMap.remove(KEY1);
204 assertFalse(ecMap.containsKey(KEY1));
205 }
206
207 @Test
208 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800209 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800210
211 assertFalse(ecMap.containsValue(VALUE1));
212 ecMap.put(KEY1, VALUE1);
213 assertTrue(ecMap.containsValue(VALUE1));
214 assertFalse(ecMap.containsValue(VALUE2));
215 ecMap.put(KEY1, VALUE2);
216 assertFalse(ecMap.containsValue(VALUE1));
217 assertTrue(ecMap.containsValue(VALUE2));
218 ecMap.remove(KEY1);
219 assertFalse(ecMap.containsValue(VALUE2));
220 }
221
222 @Test
223 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800224 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800225
226 CountDownLatch latch;
227
228 // Local put
229 assertNull(ecMap.get(KEY1));
230 ecMap.put(KEY1, VALUE1);
231 assertEquals(VALUE1, ecMap.get(KEY1));
232
233 // Remote put
234 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800235 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800236
237 // Create a latch so we know when the put operation has finished
238 latch = new CountDownLatch(1);
239 ecMap.addListener(new TestListener(latch));
240
241 assertNull(ecMap.get(KEY2));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800242 updateHandler.handle(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800243 assertTrue("External listener never got notified of internal event",
244 latch.await(100, TimeUnit.MILLISECONDS));
245 assertEquals(VALUE2, ecMap.get(KEY2));
246
247 // Local remove
248 ecMap.remove(KEY2);
249 assertNull(ecMap.get(KEY2));
250
251 // Remote remove
252 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800253 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800254
255 // Create a latch so we know when the remove operation has finished
256 latch = new CountDownLatch(1);
257 ecMap.addListener(new TestListener(latch));
258
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800259 updateHandler.handle(removeMessage);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800260 assertTrue("External listener never got notified of internal event",
261 latch.await(100, TimeUnit.MILLISECONDS));
262 assertNull(ecMap.get(KEY1));
263 }
264
265 @Test
266 public void testPut() throws Exception {
267 // Set up expectations of external events to be sent to listeners during
268 // the test. These don't use timestamps so we can set them all up at once.
269 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800270 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800271 listener.event(new EventuallyConsistentMapEvent<>(
272 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
273 listener.event(new EventuallyConsistentMapEvent<>(
274 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
275 replay(listener);
276
277 ecMap.addListener(listener);
278
279 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800280 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Jonathan Hart584d2f32015-01-27 19:46:14 -0800281 .peekAtNextTimestamp()), clusterCommunicator);
282
283 // Put first value
284 assertNull(ecMap.get(KEY1));
285 ecMap.put(KEY1, VALUE1);
286 assertEquals(VALUE1, ecMap.get(KEY1));
287
288 verify(clusterCommunicator);
289
290 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800291 expectSpecificMulticastMessage(generatePutMessage(
Jonathan Hart584d2f32015-01-27 19:46:14 -0800292 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
293
294 // Update same key to a new value
295 ecMap.put(KEY1, VALUE2);
296 assertEquals(VALUE2, ecMap.get(KEY1));
297
298 verify(clusterCommunicator);
299
300 // Do a put with a older timestamp than the value already there.
301 // The map data should not be changed and no notifications should be sent.
302 reset(clusterCommunicator);
303 replay(clusterCommunicator);
304
305 clockService.turnBackTime();
306 ecMap.put(KEY1, VALUE1);
307 // Value should not have changed.
308 assertEquals(VALUE2, ecMap.get(KEY1));
309
310 verify(clusterCommunicator);
311
312 // Check that our listener received the correct events during the test
313 verify(listener);
314 }
315
316 @Test
317 public void testRemove() throws Exception {
318 // Set up expectations of external events to be sent to listeners during
319 // the test. These don't use timestamps so we can set them all up at once.
320 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800321 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800322 listener.event(new EventuallyConsistentMapEvent<>(
323 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
324 expectLastCall().times(2);
325 listener.event(new EventuallyConsistentMapEvent<>(
326 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
327 listener.event(new EventuallyConsistentMapEvent<>(
328 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
329 replay(listener);
330
331 ecMap.addListener(listener);
332
333 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800334 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800335 ecMap.put(KEY1, VALUE1);
336 assertEquals(VALUE1, ecMap.get(KEY1));
337
338 // Remove the value and check the correct internal cluster messages
339 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800340 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
341 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800342
343 ecMap.remove(KEY1);
344 assertNull(ecMap.get(KEY1));
345
346 verify(clusterCommunicator);
347
348 // Remove the same value again. Even though the value is no longer in
349 // the map, we expect that the tombstone is updated and another remove
350 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800351 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
352 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800353
354 ecMap.remove(KEY1);
355 assertNull(ecMap.get(KEY1));
356
357 verify(clusterCommunicator);
358
359
360 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800361 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800362
363 ecMap.put(KEY2, VALUE2);
364
365 clockService.turnBackTime();
366
367 // Remove should have no effect, since it has an older timestamp than
368 // the put. Expect no notifications to be sent out
369 reset(clusterCommunicator);
370 replay(clusterCommunicator);
371
372 ecMap.remove(KEY2);
373
374 verify(clusterCommunicator);
375
376 // Check that our listener received the correct events during the test
377 verify(listener);
378 }
379
380 @Test
381 public void testPutAll() throws Exception {
382 // putAll() with an empty map is a no-op - no messages will be sent
383 reset(clusterCommunicator);
384 replay(clusterCommunicator);
385
386 ecMap.putAll(new HashMap<>());
387
388 verify(clusterCommunicator);
389
390 // Set up the listener with our expected events
391 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800392 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800393 listener.event(new EventuallyConsistentMapEvent<>(
394 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
395 listener.event(new EventuallyConsistentMapEvent<>(
396 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
397 replay(listener);
398
399 ecMap.addListener(listener);
400
401 // Expect a multi-update inter-instance message
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800402 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
403 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800404
405 Map<String, String> putAllValues = new HashMap<>();
406 putAllValues.put(KEY1, VALUE1);
407 putAllValues.put(KEY2, VALUE2);
408
409 // Put the values in the map
410 ecMap.putAll(putAllValues);
411
412 // Check the correct messages and events were sent
413 verify(clusterCommunicator);
414 verify(listener);
415 }
416
417 @Test
418 public void testClear() throws Exception {
419 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800420 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800421 listener.event(new EventuallyConsistentMapEvent<>(
422 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
423 listener.event(new EventuallyConsistentMapEvent<>(
424 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
425 replay(listener);
426
427 // clear() on an empty map is a no-op - no messages will be sent
428 reset(clusterCommunicator);
429 replay(clusterCommunicator);
430
431 assertTrue(ecMap.isEmpty());
432 ecMap.clear();
433 verify(clusterCommunicator);
434
435 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800436 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800437 ecMap.put(KEY1, VALUE1);
438 ecMap.put(KEY2, VALUE2);
439
440 ecMap.addListener(listener);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800441 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800442
443 ecMap.clear();
444
445 verify(clusterCommunicator);
446 verify(listener);
447 }
448
449 @Test
450 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800451 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800452
453 assertTrue(ecMap.keySet().isEmpty());
454
455 // Generate some keys
456 Set<String> keys = new HashSet<>();
457 for (int i = 1; i <= 10; i++) {
458 keys.add("" + i);
459 }
460
461 // Put each key in the map
462 keys.forEach(k -> ecMap.put(k, "value" + k));
463
464 // Check keySet() returns the correct value
465 assertEquals(keys, ecMap.keySet());
466
467 // Update the value for one of the keys
468 ecMap.put(keys.iterator().next(), "new-value");
469
470 // Check the key set is still the same
471 assertEquals(keys, ecMap.keySet());
472
473 // Remove a key
474 String removeKey = keys.iterator().next();
475 keys.remove(removeKey);
476 ecMap.remove(removeKey);
477
478 // Check the key set is still correct
479 assertEquals(keys, ecMap.keySet());
480 }
481
482 @Test
483 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800484 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800485
486 assertTrue(ecMap.values().isEmpty());
487
488 // Generate some values
489 Map<String, String> expectedValues = new HashMap<>();
490 for (int i = 1; i <= 10; i++) {
491 expectedValues.put("" + i, "value" + i);
492 }
493
494 // Add them into the map
495 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
496
497 // Check the values collection is correct
498 assertEquals(expectedValues.values().size(), ecMap.values().size());
499 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
500
501 // Update the value for one of the keys
502 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
503 expectedValues.put(first.getKey(), "new-value");
504 ecMap.put(first.getKey(), "new-value");
505
506 // Check the values collection is still correct
507 assertEquals(expectedValues.values().size(), ecMap.values().size());
508 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
509
510 // Remove a key
511 String removeKey = expectedValues.keySet().iterator().next();
512 expectedValues.remove(removeKey);
513 ecMap.remove(removeKey);
514
515 // Check the values collection is still correct
516 assertEquals(expectedValues.values().size(), ecMap.values().size());
517 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
518 }
519
520 @Test
521 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800522 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800523
524 assertTrue(ecMap.entrySet().isEmpty());
525
526 // Generate some values
527 Map<String, String> expectedValues = new HashMap<>();
528 for (int i = 1; i <= 10; i++) {
529 expectedValues.put("" + i, "value" + i);
530 }
531
532 // Add them into the map
533 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
534
535 // Check the entry set is correct
536 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
537
538 // Update the value for one of the keys
539 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
540 expectedValues.put(first.getKey(), "new-value");
541 ecMap.put(first.getKey(), "new-value");
542
543 // Check the entry set is still correct
544 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
545
546 // Remove a key
547 String removeKey = expectedValues.keySet().iterator().next();
548 expectedValues.remove(removeKey);
549 ecMap.remove(removeKey);
550
551 // Check the entry set is still correct
552 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
553 }
554
555 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
556 if (expectedMap.entrySet().size() != actual.size()) {
557 return false;
558 }
559
560 for (Map.Entry<String, String> e : actual) {
561 if (!expectedMap.containsKey(e.getKey())) {
562 return false;
563 }
564 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
565 return false;
566 }
567 }
568 return true;
569 }
570
571 @Test
572 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800573 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800574 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
575
576 replay(clusterCommunicator);
577
578 ecMap.destroy();
579
580 verify(clusterCommunicator);
581
582 try {
583 ecMap.get(KEY1);
584 fail("get after destroy should throw exception");
585 } catch (IllegalStateException e) {
586 assertTrue(true);
587 }
588
589 try {
590 ecMap.put(KEY1, VALUE1);
591 fail("put after destroy should throw exception");
592 } catch (IllegalStateException e) {
593 assertTrue(true);
594 }
595 }
596
597 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800598 PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800599
600 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800601 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
602 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800603 }
604
605 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800606 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800607
608 Timestamp timestamp1 = clockService.peek(1);
609 Timestamp timestamp2 = clockService.peek(2);
610
Jonathan Hartf9108232015-02-02 16:37:35 -0800611 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
612 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800613
614 list.add(pe1);
615 list.add(pe2);
616
Jonathan Hart584d2f32015-01-27 19:46:14 -0800617
618 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800619 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
620 SERIALIZER.encode(list));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800621 }
622
623 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800624 RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800625
626 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800627 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
628 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800629 }
630
631 private ClusterMessage generateRemoveMessage(String key1, String key2) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800632 ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800633
634 Timestamp timestamp1 = clockService.peek(1);
635 Timestamp timestamp2 = clockService.peek(2);
636
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800637 RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
638 RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800639
640 list.add(re1);
641 list.add(re2);
642
Jonathan Hart584d2f32015-01-27 19:46:14 -0800643 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800644 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
645 SERIALIZER.encode(list));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800646 }
647
648 /**
649 * Sets up a mock ClusterCommunicationService to expect a specific cluster
650 * message to be broadcast to the cluster.
651 *
652 * @param m message we expect to be sent
653 * @param clusterCommunicator a mock ClusterCommunicationService to set up
654 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800655 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800656 private static void expectSpecificBroadcastMessage(ClusterMessage m,
657 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800658 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800659// expect(clusterCommunicator.broadcast(m)).andReturn(true);
660 expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
661 .andReturn(true)
662 .anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800663 replay(clusterCommunicator);
664 }
665
666 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800667 * Sets up a mock ClusterCommunicationService to expect a specific cluster
668 * message to be multicast to the cluster.
669 *
670 * @param m message we expect to be sent
671 * @param clusterCommunicator a mock ClusterCommunicationService to set up
672 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800673 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800674 private static void expectSpecificMulticastMessage(ClusterMessage m,
675 ClusterCommunicationService clusterCommunicator) {
676 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800677// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
678 expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
679 .andReturn(true)
680 .anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800681 replay(clusterCommunicator);
682 }
683
684
685 /**
686 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800687 * that is sent to it. This is useful for unit tests where we aren't
688 * interested in testing the messaging component.
689 *
690 * @param clusterCommunicator a mock ClusterCommunicationService to set up
691 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800692 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800693 private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
694 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800695// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
696// anyObject(Iterable.class)))
697 expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
698 anyObject(NodeId.class)))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800699 .andReturn(true)
700 .anyTimes();
701 replay(clusterCommunicator);
702 }
703
704 /**
705 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
706 * that is sent to it. This is useful for unit tests where we aren't
707 * interested in testing the messaging component.
708 *
709 * @param clusterCommunicator a mock ClusterCommunicationService to set up
710 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800711 //FIXME rename
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800712 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800713 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800714// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
715// .andReturn(true)
716// .anyTimes();
717 expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
Jonathan Hart584d2f32015-01-27 19:46:14 -0800718 .andReturn(true)
719 .anyTimes();
720 replay(clusterCommunicator);
721 }
722
723 /**
724 * ClusterCommunicationService implementation that the map's addSubscriber
725 * call will delegate to. This means we can get a reference to the
726 * internal cluster message handler used by the map, so that we can simulate
727 * events coming in from other instances.
728 */
729 private final class TestClusterCommunicationService
730 implements ClusterCommunicationService {
731
732 @Override
733 public boolean broadcast(ClusterMessage message) {
734 return false;
735 }
736
737 @Override
738 public boolean broadcastIncludeSelf(ClusterMessage message) {
739 return false;
740 }
741
742 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800743 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800744 return false;
745 }
746
747 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800748 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800749 return false;
750 }
751
752 @Override
753 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
754 NodeId toNodeId)
755 throws IOException {
756 return null;
757 }
758
759 @Override
760 public void addSubscriber(MessageSubject subject,
761 ClusterMessageHandler subscriber) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800762 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
763 updateHandler = subscriber;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800764 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
765 antiEntropyHandler = subscriber;
766 } else {
767 throw new RuntimeException("Unexpected message subject " + subject.toString());
768 }
769 }
770
771 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800772 public void addSubscriber(MessageSubject subject,
773 ClusterMessageHandler subscriber,
774 ExecutorService executor) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800775 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
776 updateHandler = subscriber;
Madan Jampani2af244a2015-02-22 13:12:01 -0800777 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
778 antiEntropyHandler = subscriber;
779 } else {
780 throw new RuntimeException("Unexpected message subject " + subject.toString());
781 }
782 }
783
784 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800785 public void removeSubscriber(MessageSubject subject) {}
786 }
787
788 /**
789 * ClockService implementation that gives out timestamps based on a
790 * sequential counter. This clock service enables more control over the
791 * timestamps that are given out, including being able to "turn back time"
792 * to give out timestamps from the past.
793 *
794 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800795 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800796 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800797 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800798
799 private static final long INITIAL_VALUE = 1;
800 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
801
802 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800803 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800804 return new TestTimestamp(counter.getAndIncrement());
805 }
806
807 /**
808 * Returns what the next timestamp will be without consuming the
809 * timestamp. This allows test code to set expectations correctly while
810 * still allowing the CUT to get the same timestamp.
811 *
812 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800813 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800814 */
815 public Timestamp peekAtNextTimestamp() {
816 return peek(1);
817 }
818
819 /**
820 * Returns the ith timestamp to be given out in the future without
821 * consuming the timestamp. For example, i=1 returns the next timestamp,
822 * i=2 returns the timestamp after that, and so on.
823 *
824 * @param i number of the timestamp to peek at
825 * @return the ith timestamp that will be given out
826 */
827 public Timestamp peek(int i) {
828 checkArgument(i > 0, "i must be a positive integer");
829
830 return new TestTimestamp(counter.get() + i - 1);
831 }
832
833 /**
834 * Turns the clock back two ticks, so the next call to getTimestamp will
835 * return an older timestamp than the previous call to getTimestamp.
836 */
837 public void turnBackTime() {
838 // Not atomic, but should be OK for these tests.
839 counter.decrementAndGet();
840 counter.decrementAndGet();
841 }
842
843 }
844
845 /**
846 * Timestamp implementation where the value of the timestamp can be
847 * specified explicitly at creation time.
848 */
849 private class TestTimestamp implements Timestamp {
850
851 private final long timestamp;
852
853 /**
854 * Creates a new timestamp that has the specified value.
855 *
856 * @param timestamp value of the timestamp
857 */
858 public TestTimestamp(long timestamp) {
859 this.timestamp = timestamp;
860 }
861
862 @Override
863 public int compareTo(Timestamp o) {
864 checkArgument(o instanceof TestTimestamp);
865 TestTimestamp otherTimestamp = (TestTimestamp) o;
866 return ComparisonChain.start()
867 .compare(this.timestamp, otherTimestamp.timestamp)
868 .result();
869 }
870 }
871
872 /**
873 * EventuallyConsistentMapListener implementation which triggers a latch
874 * when it receives an event.
875 */
876 private class TestListener implements EventuallyConsistentMapListener<String, String> {
877 private CountDownLatch latch;
878
879 /**
880 * Creates a new listener that will trigger the specified latch when it
881 * receives and event.
882 *
883 * @param latch the latch to trigger on events
884 */
885 public TestListener(CountDownLatch latch) {
886 this.latch = latch;
887 }
888
889 @Override
890 public void event(EventuallyConsistentMapEvent<String, String> event) {
891 latch.countDown();
892 }
893 }
894}