blob: 1b41d0c75364aeeec5f3a5f5698d5ca4f3cdd3f4 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080019import com.google.common.collect.ImmutableSet;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080020import com.google.common.collect.Lists;
Jonathan Hart584d2f32015-01-27 19:46:14 -080021import com.google.common.util.concurrent.ListenableFuture;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080022import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani3e033bd2015-04-08 13:03:49 -070023
Jonathan Hart584d2f32015-01-27 19:46:14 -080024import org.junit.After;
25import org.junit.Before;
26import org.junit.Test;
27import org.onlab.packet.IpAddress;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.ControllerNode;
31import org.onosproject.cluster.DefaultControllerNode;
32import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070033import org.onosproject.event.AbstractEvent;
Jonathan Hart584d2f32015-01-27 19:46:14 -080034import org.onosproject.store.Timestamp;
35import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.ClusterMessage;
37import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
38import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070039import org.onosproject.store.service.ClockService;
Madan Jampani3e033bd2015-04-08 13:03:49 -070040import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart63939a32015-05-08 11:57:03 -070041import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080042import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070044import org.onosproject.store.service.EventuallyConsistentMap;
45import org.onosproject.store.service.EventuallyConsistentMapEvent;
46import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hart584d2f32015-01-27 19:46:14 -080047
Jonathan Hart584d2f32015-01-27 19:46:14 -080048import java.util.ArrayList;
49import java.util.HashMap;
50import java.util.HashSet;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070051import java.util.List;
Jonathan Hart584d2f32015-01-27 19:46:14 -080052import java.util.Map;
53import java.util.Objects;
54import java.util.Set;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070055import java.util.concurrent.CompletableFuture;
Jonathan Hart584d2f32015-01-27 19:46:14 -080056import java.util.concurrent.CountDownLatch;
Madan Jampaniec5ae342015-04-13 15:43:10 -070057import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080058import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080059import java.util.concurrent.TimeUnit;
60import java.util.concurrent.atomic.AtomicLong;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070061import java.util.function.Consumer;
62import java.util.function.Function;
Jonathan Hart584d2f32015-01-27 19:46:14 -080063
64import static com.google.common.base.Preconditions.checkArgument;
65import static junit.framework.TestCase.assertFalse;
66import static org.easymock.EasyMock.*;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080067import static org.junit.Assert.*;
Jonathan Hart584d2f32015-01-27 19:46:14 -080068
69/**
70 * Unit tests for EventuallyConsistentMapImpl.
71 */
72public class EventuallyConsistentMapImplTest {
73
74 private EventuallyConsistentMap<String, String> ecMap;
75
76 private ClusterService clusterService;
77 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080078 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080079
80 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080081 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080082 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080083 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
84 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
85
86 private static final String KEY1 = "one";
87 private static final String KEY2 = "two";
88 private static final String VALUE1 = "oneValue";
89 private static final String VALUE2 = "twoValue";
90
91 private final ControllerNode self =
92 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
93
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080094 private ClusterMessageHandler updateHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -080095 private ClusterMessageHandler antiEntropyHandler;
96
97 /*
98 * Serialization is a bit tricky here. We need to serialize in the tests
99 * to set the expectations, which will use this serializer here, but the
100 * EventuallyConsistentMap will use its own internal serializer. This means
101 * this serializer must be set up exactly the same as map's internal
102 * serializer.
103 */
104 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
105 @Override
106 protected void setupKryoPool() {
107 serializerPool = KryoNamespace.newBuilder()
108 // Classes we give to the map
109 .register(KryoNamespaces.API)
110 .register(TestTimestamp.class)
111 // Below is the classes that the map internally registers
Madan Jampani3e033bd2015-04-08 13:03:49 -0700112 .register(LogicalTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800113 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800114 .register(PutEntry.class)
115 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800116 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800117 .register(AntiEntropyAdvertisement.class)
118 .register(HashMap.class)
119 .build();
120 }
121 };
122
123 @Before
124 public void setUp() throws Exception {
125 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800126 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
127 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800128 replay(clusterService);
129
130 clusterCommunicator = createMock(ClusterCommunicationService.class);
131
132 // Add expectation for adding cluster message subscribers which
133 // delegate to our ClusterCommunicationService implementation. This
134 // allows us to get a reference to the map's internal cluster message
135 // handlers so we can induce events coming in from a peer.
136 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
Madan Jampani2af244a2015-02-22 13:12:01 -0800137 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800138 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
139
140 replay(clusterCommunicator);
141
142 clockService = new SequentialClockService<>();
143
144 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
145 .register(KryoNamespaces.API)
146 .register(TestTimestamp.class);
147
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700148 ecMap = new EventuallyConsistentMapBuilderImpl<>(
149 clusterService, clusterCommunicator)
150 .withName(MAP_NAME)
151 .withSerializer(serializer)
152 .withClockService(clockService)
153 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
154 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800155
156 // Reset ready for tests to add their own expectations
157 reset(clusterCommunicator);
158 }
159
160 @After
161 public void tearDown() {
162 reset(clusterCommunicator);
163 ecMap.destroy();
164 }
165
Ray Milkey8dc82082015-02-20 16:22:38 -0800166 @SuppressWarnings("unchecked")
167 private EventuallyConsistentMapListener<String, String> getListener() {
168 return createMock(EventuallyConsistentMapListener.class);
169 }
170
Jonathan Hart584d2f32015-01-27 19:46:14 -0800171 @Test
172 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800173 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800174
175 assertEquals(0, ecMap.size());
176 ecMap.put(KEY1, VALUE1);
177 assertEquals(1, ecMap.size());
178 ecMap.put(KEY1, VALUE2);
179 assertEquals(1, ecMap.size());
180 ecMap.put(KEY2, VALUE2);
181 assertEquals(2, ecMap.size());
182 for (int i = 0; i < 10; i++) {
183 ecMap.put("" + i, "" + i);
184 }
185 assertEquals(12, ecMap.size());
186 ecMap.remove(KEY1);
187 assertEquals(11, ecMap.size());
188 ecMap.remove(KEY1);
189 assertEquals(11, ecMap.size());
190 }
191
192 @Test
193 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800194 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800195
196 assertTrue(ecMap.isEmpty());
197 ecMap.put(KEY1, VALUE1);
198 assertFalse(ecMap.isEmpty());
199 ecMap.remove(KEY1);
200 assertTrue(ecMap.isEmpty());
201 }
202
203 @Test
204 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800205 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800206
207 assertFalse(ecMap.containsKey(KEY1));
208 ecMap.put(KEY1, VALUE1);
209 assertTrue(ecMap.containsKey(KEY1));
210 assertFalse(ecMap.containsKey(KEY2));
211 ecMap.remove(KEY1);
212 assertFalse(ecMap.containsKey(KEY1));
213 }
214
215 @Test
216 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800217 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800218
219 assertFalse(ecMap.containsValue(VALUE1));
220 ecMap.put(KEY1, VALUE1);
221 assertTrue(ecMap.containsValue(VALUE1));
222 assertFalse(ecMap.containsValue(VALUE2));
223 ecMap.put(KEY1, VALUE2);
224 assertFalse(ecMap.containsValue(VALUE1));
225 assertTrue(ecMap.containsValue(VALUE2));
226 ecMap.remove(KEY1);
227 assertFalse(ecMap.containsValue(VALUE2));
228 }
229
230 @Test
231 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800232 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800233
234 CountDownLatch latch;
235
236 // Local put
237 assertNull(ecMap.get(KEY1));
238 ecMap.put(KEY1, VALUE1);
239 assertEquals(VALUE1, ecMap.get(KEY1));
240
241 // Remote put
242 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800243 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800244
245 // Create a latch so we know when the put operation has finished
246 latch = new CountDownLatch(1);
247 ecMap.addListener(new TestListener(latch));
248
249 assertNull(ecMap.get(KEY2));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800250 updateHandler.handle(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800251 assertTrue("External listener never got notified of internal event",
252 latch.await(100, TimeUnit.MILLISECONDS));
253 assertEquals(VALUE2, ecMap.get(KEY2));
254
255 // Local remove
256 ecMap.remove(KEY2);
257 assertNull(ecMap.get(KEY2));
258
259 // Remote remove
260 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800261 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800262
263 // Create a latch so we know when the remove operation has finished
264 latch = new CountDownLatch(1);
265 ecMap.addListener(new TestListener(latch));
266
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800267 updateHandler.handle(removeMessage);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800268 assertTrue("External listener never got notified of internal event",
269 latch.await(100, TimeUnit.MILLISECONDS));
270 assertNull(ecMap.get(KEY1));
271 }
272
273 @Test
274 public void testPut() throws Exception {
275 // Set up expectations of external events to be sent to listeners during
276 // the test. These don't use timestamps so we can set them all up at once.
277 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800278 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800279 listener.event(new EventuallyConsistentMapEvent<>(
280 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
281 listener.event(new EventuallyConsistentMapEvent<>(
282 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
283 replay(listener);
284
285 ecMap.addListener(listener);
286
287 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800288 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700289 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800290
291 // Put first value
292 assertNull(ecMap.get(KEY1));
293 ecMap.put(KEY1, VALUE1);
294 assertEquals(VALUE1, ecMap.get(KEY1));
295
296 verify(clusterCommunicator);
297
298 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800299 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700300 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800301
302 // Update same key to a new value
303 ecMap.put(KEY1, VALUE2);
304 assertEquals(VALUE2, ecMap.get(KEY1));
305
306 verify(clusterCommunicator);
307
308 // Do a put with a older timestamp than the value already there.
309 // The map data should not be changed and no notifications should be sent.
310 reset(clusterCommunicator);
311 replay(clusterCommunicator);
312
313 clockService.turnBackTime();
314 ecMap.put(KEY1, VALUE1);
315 // Value should not have changed.
316 assertEquals(VALUE2, ecMap.get(KEY1));
317
318 verify(clusterCommunicator);
319
320 // Check that our listener received the correct events during the test
321 verify(listener);
322 }
323
324 @Test
325 public void testRemove() throws Exception {
326 // Set up expectations of external events to be sent to listeners during
327 // the test. These don't use timestamps so we can set them all up at once.
328 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800329 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800330 listener.event(new EventuallyConsistentMapEvent<>(
331 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
332 expectLastCall().times(2);
333 listener.event(new EventuallyConsistentMapEvent<>(
334 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
335 listener.event(new EventuallyConsistentMapEvent<>(
336 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
337 replay(listener);
338
339 ecMap.addListener(listener);
340
341 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800342 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800343 ecMap.put(KEY1, VALUE1);
344 assertEquals(VALUE1, ecMap.get(KEY1));
345
346 // Remove the value and check the correct internal cluster messages
347 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800348 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700349 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800350
351 ecMap.remove(KEY1);
352 assertNull(ecMap.get(KEY1));
353
354 verify(clusterCommunicator);
355
356 // Remove the same value again. Even though the value is no longer in
357 // the map, we expect that the tombstone is updated and another remove
358 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800359 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700360 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800361
362 ecMap.remove(KEY1);
363 assertNull(ecMap.get(KEY1));
364
365 verify(clusterCommunicator);
366
367
368 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800369 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800370
371 ecMap.put(KEY2, VALUE2);
372
373 clockService.turnBackTime();
374
375 // Remove should have no effect, since it has an older timestamp than
376 // the put. Expect no notifications to be sent out
377 reset(clusterCommunicator);
378 replay(clusterCommunicator);
379
380 ecMap.remove(KEY2);
381
382 verify(clusterCommunicator);
383
384 // Check that our listener received the correct events during the test
385 verify(listener);
386 }
387
388 @Test
389 public void testPutAll() throws Exception {
390 // putAll() with an empty map is a no-op - no messages will be sent
391 reset(clusterCommunicator);
392 replay(clusterCommunicator);
393
394 ecMap.putAll(new HashMap<>());
395
396 verify(clusterCommunicator);
397
398 // Set up the listener with our expected events
399 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800400 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800401 listener.event(new EventuallyConsistentMapEvent<>(
402 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
403 listener.event(new EventuallyConsistentMapEvent<>(
404 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
405 replay(listener);
406
407 ecMap.addListener(listener);
408
409 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700410 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800411 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800412
413 Map<String, String> putAllValues = new HashMap<>();
414 putAllValues.put(KEY1, VALUE1);
415 putAllValues.put(KEY2, VALUE2);
416
417 // Put the values in the map
418 ecMap.putAll(putAllValues);
419
420 // Check the correct messages and events were sent
421 verify(clusterCommunicator);
422 verify(listener);
423 }
424
425 @Test
426 public void testClear() throws Exception {
427 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800428 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800429 listener.event(new EventuallyConsistentMapEvent<>(
430 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
431 listener.event(new EventuallyConsistentMapEvent<>(
432 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
433 replay(listener);
434
435 // clear() on an empty map is a no-op - no messages will be sent
436 reset(clusterCommunicator);
437 replay(clusterCommunicator);
438
439 assertTrue(ecMap.isEmpty());
440 ecMap.clear();
441 verify(clusterCommunicator);
442
443 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800444 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800445 ecMap.put(KEY1, VALUE1);
446 ecMap.put(KEY2, VALUE2);
447
448 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700449 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800450
451 ecMap.clear();
452
453 verify(clusterCommunicator);
454 verify(listener);
455 }
456
457 @Test
458 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800459 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800460
461 assertTrue(ecMap.keySet().isEmpty());
462
463 // Generate some keys
464 Set<String> keys = new HashSet<>();
465 for (int i = 1; i <= 10; i++) {
466 keys.add("" + i);
467 }
468
469 // Put each key in the map
470 keys.forEach(k -> ecMap.put(k, "value" + k));
471
472 // Check keySet() returns the correct value
473 assertEquals(keys, ecMap.keySet());
474
475 // Update the value for one of the keys
476 ecMap.put(keys.iterator().next(), "new-value");
477
478 // Check the key set is still the same
479 assertEquals(keys, ecMap.keySet());
480
481 // Remove a key
482 String removeKey = keys.iterator().next();
483 keys.remove(removeKey);
484 ecMap.remove(removeKey);
485
486 // Check the key set is still correct
487 assertEquals(keys, ecMap.keySet());
488 }
489
490 @Test
491 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800492 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800493
494 assertTrue(ecMap.values().isEmpty());
495
496 // Generate some values
497 Map<String, String> expectedValues = new HashMap<>();
498 for (int i = 1; i <= 10; i++) {
499 expectedValues.put("" + i, "value" + i);
500 }
501
502 // Add them into the map
503 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
504
505 // Check the values collection is correct
506 assertEquals(expectedValues.values().size(), ecMap.values().size());
507 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
508
509 // Update the value for one of the keys
510 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
511 expectedValues.put(first.getKey(), "new-value");
512 ecMap.put(first.getKey(), "new-value");
513
514 // Check the values collection is still correct
515 assertEquals(expectedValues.values().size(), ecMap.values().size());
516 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
517
518 // Remove a key
519 String removeKey = expectedValues.keySet().iterator().next();
520 expectedValues.remove(removeKey);
521 ecMap.remove(removeKey);
522
523 // Check the values collection is still correct
524 assertEquals(expectedValues.values().size(), ecMap.values().size());
525 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
526 }
527
528 @Test
529 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800530 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800531
532 assertTrue(ecMap.entrySet().isEmpty());
533
534 // Generate some values
535 Map<String, String> expectedValues = new HashMap<>();
536 for (int i = 1; i <= 10; i++) {
537 expectedValues.put("" + i, "value" + i);
538 }
539
540 // Add them into the map
541 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
542
543 // Check the entry set is correct
544 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
545
546 // Update the value for one of the keys
547 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
548 expectedValues.put(first.getKey(), "new-value");
549 ecMap.put(first.getKey(), "new-value");
550
551 // Check the entry set is still correct
552 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
553
554 // Remove a key
555 String removeKey = expectedValues.keySet().iterator().next();
556 expectedValues.remove(removeKey);
557 ecMap.remove(removeKey);
558
559 // Check the entry set is still correct
560 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
561 }
562
563 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
564 if (expectedMap.entrySet().size() != actual.size()) {
565 return false;
566 }
567
568 for (Map.Entry<String, String> e : actual) {
569 if (!expectedMap.containsKey(e.getKey())) {
570 return false;
571 }
572 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
573 return false;
574 }
575 }
576 return true;
577 }
578
579 @Test
580 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800581 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800582 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
583
584 replay(clusterCommunicator);
585
586 ecMap.destroy();
587
588 verify(clusterCommunicator);
589
590 try {
591 ecMap.get(KEY1);
592 fail("get after destroy should throw exception");
593 } catch (IllegalStateException e) {
594 assertTrue(true);
595 }
596
597 try {
598 ecMap.put(KEY1, VALUE1);
599 fail("put after destroy should throw exception");
600 } catch (IllegalStateException e) {
601 assertTrue(true);
602 }
603 }
604
605 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800606 PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800607
608 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800609 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
610 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800611 }
612
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700613 private List<PutEntry<String, String>> generatePutMessage(
614 String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800615 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800616
617 Timestamp timestamp1 = clockService.peek(1);
618 Timestamp timestamp2 = clockService.peek(2);
619
Jonathan Hartf9108232015-02-02 16:37:35 -0800620 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
621 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800622
623 list.add(pe1);
624 list.add(pe2);
625
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700626 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800627 }
628
629 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800630 RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800631
632 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800633 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
634 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800635 }
636
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700637 private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800638 ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800639
640 Timestamp timestamp1 = clockService.peek(1);
641 Timestamp timestamp2 = clockService.peek(2);
642
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800643 RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
644 RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800645
646 list.add(re1);
647 list.add(re2);
648
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700649 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800650 }
651
652 /**
653 * Sets up a mock ClusterCommunicationService to expect a specific cluster
654 * message to be broadcast to the cluster.
655 *
656 * @param m message we expect to be sent
657 * @param clusterCommunicator a mock ClusterCommunicationService to set up
658 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800659 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700660 private static <T> void expectSpecificBroadcastMessage(
661 T message,
662 MessageSubject subject,
663 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800664 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700665 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
666 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800667 replay(clusterCommunicator);
668 }
669
670 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800671 * Sets up a mock ClusterCommunicationService to expect a specific cluster
672 * message to be multicast to the cluster.
673 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700674 * @param message message we expect to be sent
675 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800676 * @param clusterCommunicator a mock ClusterCommunicationService to set up
677 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800678 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700679 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800680 ClusterCommunicationService clusterCommunicator) {
681 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700682 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
683 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800684 replay(clusterCommunicator);
685 }
686
687
688 /**
689 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800690 * that is sent to it. This is useful for unit tests where we aren't
691 * interested in testing the messaging component.
692 *
693 * @param clusterCommunicator a mock ClusterCommunicationService to set up
694 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800695 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700696 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800697 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800698// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
699// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700700 expect(clusterCommunicator.<T>unicast(
701 anyObject(),
702 anyObject(MessageSubject.class),
703 anyObject(Function.class),
704 anyObject(NodeId.class)))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800705 .andReturn(true)
706 .anyTimes();
707 replay(clusterCommunicator);
708 }
709
710 /**
711 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
712 * that is sent to it. This is useful for unit tests where we aren't
713 * interested in testing the messaging component.
714 *
715 * @param clusterCommunicator a mock ClusterCommunicationService to set up
716 */
717 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800718 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700719 clusterCommunicator.<AbstractEvent>multicast(
720 anyObject(AbstractEvent.class),
721 anyObject(MessageSubject.class),
722 anyObject(Function.class),
723 anyObject(Set.class));
724 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800725 replay(clusterCommunicator);
726 }
727
728 /**
729 * ClusterCommunicationService implementation that the map's addSubscriber
730 * call will delegate to. This means we can get a reference to the
731 * internal cluster message handler used by the map, so that we can simulate
732 * events coming in from other instances.
733 */
734 private final class TestClusterCommunicationService
735 implements ClusterCommunicationService {
736
737 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800738 public void addSubscriber(MessageSubject subject,
739 ClusterMessageHandler subscriber,
740 ExecutorService executor) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800741 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
742 updateHandler = subscriber;
Madan Jampani2af244a2015-02-22 13:12:01 -0800743 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
744 antiEntropyHandler = subscriber;
745 } else {
746 throw new RuntimeException("Unexpected message subject " + subject.toString());
747 }
748 }
749
750 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800751 public void removeSubscriber(MessageSubject subject) {}
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700752
753 @Override
754 public <M> void broadcast(M message, MessageSubject subject,
755 Function<M, byte[]> encoder) {
756 }
757
758 @Override
759 public <M> void broadcastIncludeSelf(M message,
760 MessageSubject subject, Function<M, byte[]> encoder) {
761 }
762
763 @Override
764 public <M> boolean unicast(M message, MessageSubject subject,
765 Function<M, byte[]> encoder, NodeId toNodeId) {
766 return false;
767 }
768
769 @Override
770 public <M> void multicast(M message, MessageSubject subject,
771 Function<M, byte[]> encoder, Set<NodeId> nodes) {
772 }
773
774 @Override
775 public <M, R> CompletableFuture<R> sendAndReceive(M message,
776 MessageSubject subject, Function<M, byte[]> encoder,
777 Function<byte[], R> decoder, NodeId toNodeId) {
778 return null;
779 }
780
781 @Override
782 public <M, R> void addSubscriber(MessageSubject subject,
783 Function<byte[], M> decoder, Function<M, R> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700784 Function<R, byte[]> encoder, Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700785 }
786
787 @Override
Madan Jampani27b69c62015-05-15 15:49:02 -0700788 public <M, R> void addSubscriber(MessageSubject subject,
789 Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler,
790 Function<R, byte[]> encoder) {
791 }
792
793 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700794 public <M> void addSubscriber(MessageSubject subject,
795 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700796 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700797 }
798
799 @Override
800 public boolean broadcast(ClusterMessage message) {
801 return false;
802 }
803
804 @Override
805 public boolean broadcastIncludeSelf(ClusterMessage message) {
806 return false;
807 }
808
809 @Override
810 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
811 return false;
812 }
813
814 @Override
815 public boolean multicast(ClusterMessage message,
816 Iterable<NodeId> nodeIds) {
817 return false;
818 }
819
820 @Override
821 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
822 NodeId toNodeId) {
823 return null;
824 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800825 }
826
827 /**
828 * ClockService implementation that gives out timestamps based on a
829 * sequential counter. This clock service enables more control over the
830 * timestamps that are given out, including being able to "turn back time"
831 * to give out timestamps from the past.
832 *
833 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800834 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800835 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800836 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800837
838 private static final long INITIAL_VALUE = 1;
839 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
840
841 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800842 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800843 return new TestTimestamp(counter.getAndIncrement());
844 }
845
846 /**
847 * Returns what the next timestamp will be without consuming the
848 * timestamp. This allows test code to set expectations correctly while
849 * still allowing the CUT to get the same timestamp.
850 *
851 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800852 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800853 */
854 public Timestamp peekAtNextTimestamp() {
855 return peek(1);
856 }
857
858 /**
859 * Returns the ith timestamp to be given out in the future without
860 * consuming the timestamp. For example, i=1 returns the next timestamp,
861 * i=2 returns the timestamp after that, and so on.
862 *
863 * @param i number of the timestamp to peek at
864 * @return the ith timestamp that will be given out
865 */
866 public Timestamp peek(int i) {
867 checkArgument(i > 0, "i must be a positive integer");
868
869 return new TestTimestamp(counter.get() + i - 1);
870 }
871
872 /**
873 * Turns the clock back two ticks, so the next call to getTimestamp will
874 * return an older timestamp than the previous call to getTimestamp.
875 */
876 public void turnBackTime() {
877 // Not atomic, but should be OK for these tests.
878 counter.decrementAndGet();
879 counter.decrementAndGet();
880 }
881
882 }
883
884 /**
885 * Timestamp implementation where the value of the timestamp can be
886 * specified explicitly at creation time.
887 */
888 private class TestTimestamp implements Timestamp {
889
890 private final long timestamp;
891
892 /**
893 * Creates a new timestamp that has the specified value.
894 *
895 * @param timestamp value of the timestamp
896 */
897 public TestTimestamp(long timestamp) {
898 this.timestamp = timestamp;
899 }
900
901 @Override
902 public int compareTo(Timestamp o) {
903 checkArgument(o instanceof TestTimestamp);
904 TestTimestamp otherTimestamp = (TestTimestamp) o;
905 return ComparisonChain.start()
906 .compare(this.timestamp, otherTimestamp.timestamp)
907 .result();
908 }
909 }
910
911 /**
912 * EventuallyConsistentMapListener implementation which triggers a latch
913 * when it receives an event.
914 */
915 private class TestListener implements EventuallyConsistentMapListener<String, String> {
916 private CountDownLatch latch;
917
918 /**
919 * Creates a new listener that will trigger the specified latch when it
920 * receives and event.
921 *
922 * @param latch the latch to trigger on events
923 */
924 public TestListener(CountDownLatch latch) {
925 this.latch = latch;
926 }
927
928 @Override
929 public void event(EventuallyConsistentMapEvent<String, String> event) {
930 latch.countDown();
931 }
932 }
933}