blob: 8d495eb140d92bd278d5f352d32b7a341cfbfdbe [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080019import com.google.common.collect.ImmutableSet;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080020import com.google.common.collect.Lists;
Jonathan Hart584d2f32015-01-27 19:46:14 -080021import com.google.common.util.concurrent.ListenableFuture;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080022import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani3e033bd2015-04-08 13:03:49 -070023
Jonathan Hart584d2f32015-01-27 19:46:14 -080024import org.junit.After;
25import org.junit.Before;
26import org.junit.Test;
27import org.onlab.packet.IpAddress;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.ControllerNode;
31import org.onosproject.cluster.DefaultControllerNode;
32import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070033import org.onosproject.event.AbstractEvent;
Jonathan Hart584d2f32015-01-27 19:46:14 -080034import org.onosproject.store.Timestamp;
35import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.ClusterMessage;
37import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
38import org.onosproject.store.cluster.messaging.MessageSubject;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070039import org.onosproject.store.service.ClockService;
Madan Jampani3e033bd2015-04-08 13:03:49 -070040import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart77bdd262015-02-03 09:07:48 -080041import org.onosproject.store.impl.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080042import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070044import org.onosproject.store.service.EventuallyConsistentMap;
45import org.onosproject.store.service.EventuallyConsistentMapEvent;
46import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hart584d2f32015-01-27 19:46:14 -080047
Jonathan Hart584d2f32015-01-27 19:46:14 -080048import java.util.ArrayList;
49import java.util.HashMap;
50import java.util.HashSet;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070051import java.util.List;
Jonathan Hart584d2f32015-01-27 19:46:14 -080052import java.util.Map;
53import java.util.Objects;
54import java.util.Set;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070055import java.util.concurrent.CompletableFuture;
Jonathan Hart584d2f32015-01-27 19:46:14 -080056import java.util.concurrent.CountDownLatch;
Madan Jampani2af244a2015-02-22 13:12:01 -080057import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080058import java.util.concurrent.TimeUnit;
59import java.util.concurrent.atomic.AtomicLong;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070060import java.util.function.Consumer;
61import java.util.function.Function;
Jonathan Hart584d2f32015-01-27 19:46:14 -080062
63import static com.google.common.base.Preconditions.checkArgument;
64import static junit.framework.TestCase.assertFalse;
65import static org.easymock.EasyMock.*;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080066import static org.junit.Assert.*;
Jonathan Hart584d2f32015-01-27 19:46:14 -080067
68/**
69 * Unit tests for EventuallyConsistentMapImpl.
70 */
71public class EventuallyConsistentMapImplTest {
72
73 private EventuallyConsistentMap<String, String> ecMap;
74
75 private ClusterService clusterService;
76 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080077 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080078
79 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080080 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080081 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080082 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
83 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
84
85 private static final String KEY1 = "one";
86 private static final String KEY2 = "two";
87 private static final String VALUE1 = "oneValue";
88 private static final String VALUE2 = "twoValue";
89
90 private final ControllerNode self =
91 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
92
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080093 private ClusterMessageHandler updateHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -080094 private ClusterMessageHandler antiEntropyHandler;
95
96 /*
97 * Serialization is a bit tricky here. We need to serialize in the tests
98 * to set the expectations, which will use this serializer here, but the
99 * EventuallyConsistentMap will use its own internal serializer. This means
100 * this serializer must be set up exactly the same as map's internal
101 * serializer.
102 */
103 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
104 @Override
105 protected void setupKryoPool() {
106 serializerPool = KryoNamespace.newBuilder()
107 // Classes we give to the map
108 .register(KryoNamespaces.API)
109 .register(TestTimestamp.class)
110 // Below is the classes that the map internally registers
Madan Jampani3e033bd2015-04-08 13:03:49 -0700111 .register(LogicalTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800112 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800113 .register(PutEntry.class)
114 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800115 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800116 .register(AntiEntropyAdvertisement.class)
117 .register(HashMap.class)
118 .build();
119 }
120 };
121
122 @Before
123 public void setUp() throws Exception {
124 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800125 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
126 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800127 replay(clusterService);
128
129 clusterCommunicator = createMock(ClusterCommunicationService.class);
130
131 // Add expectation for adding cluster message subscribers which
132 // delegate to our ClusterCommunicationService implementation. This
133 // allows us to get a reference to the map's internal cluster message
134 // handlers so we can induce events coming in from a peer.
135 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
Madan Jampani2af244a2015-02-22 13:12:01 -0800136 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800137 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
138
139 replay(clusterCommunicator);
140
141 clockService = new SequentialClockService<>();
142
143 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
144 .register(KryoNamespaces.API)
145 .register(TestTimestamp.class);
146
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 ecMap = new EventuallyConsistentMapBuilderImpl<>(
148 clusterService, clusterCommunicator)
149 .withName(MAP_NAME)
150 .withSerializer(serializer)
151 .withClockService(clockService)
152 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
153 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800154
155 // Reset ready for tests to add their own expectations
156 reset(clusterCommunicator);
157 }
158
159 @After
160 public void tearDown() {
161 reset(clusterCommunicator);
162 ecMap.destroy();
163 }
164
Ray Milkey8dc82082015-02-20 16:22:38 -0800165 @SuppressWarnings("unchecked")
166 private EventuallyConsistentMapListener<String, String> getListener() {
167 return createMock(EventuallyConsistentMapListener.class);
168 }
169
Jonathan Hart584d2f32015-01-27 19:46:14 -0800170 @Test
171 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800172 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800173
174 assertEquals(0, ecMap.size());
175 ecMap.put(KEY1, VALUE1);
176 assertEquals(1, ecMap.size());
177 ecMap.put(KEY1, VALUE2);
178 assertEquals(1, ecMap.size());
179 ecMap.put(KEY2, VALUE2);
180 assertEquals(2, ecMap.size());
181 for (int i = 0; i < 10; i++) {
182 ecMap.put("" + i, "" + i);
183 }
184 assertEquals(12, ecMap.size());
185 ecMap.remove(KEY1);
186 assertEquals(11, ecMap.size());
187 ecMap.remove(KEY1);
188 assertEquals(11, ecMap.size());
189 }
190
191 @Test
192 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800193 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800194
195 assertTrue(ecMap.isEmpty());
196 ecMap.put(KEY1, VALUE1);
197 assertFalse(ecMap.isEmpty());
198 ecMap.remove(KEY1);
199 assertTrue(ecMap.isEmpty());
200 }
201
202 @Test
203 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800204 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800205
206 assertFalse(ecMap.containsKey(KEY1));
207 ecMap.put(KEY1, VALUE1);
208 assertTrue(ecMap.containsKey(KEY1));
209 assertFalse(ecMap.containsKey(KEY2));
210 ecMap.remove(KEY1);
211 assertFalse(ecMap.containsKey(KEY1));
212 }
213
214 @Test
215 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800216 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800217
218 assertFalse(ecMap.containsValue(VALUE1));
219 ecMap.put(KEY1, VALUE1);
220 assertTrue(ecMap.containsValue(VALUE1));
221 assertFalse(ecMap.containsValue(VALUE2));
222 ecMap.put(KEY1, VALUE2);
223 assertFalse(ecMap.containsValue(VALUE1));
224 assertTrue(ecMap.containsValue(VALUE2));
225 ecMap.remove(KEY1);
226 assertFalse(ecMap.containsValue(VALUE2));
227 }
228
229 @Test
230 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800231 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800232
233 CountDownLatch latch;
234
235 // Local put
236 assertNull(ecMap.get(KEY1));
237 ecMap.put(KEY1, VALUE1);
238 assertEquals(VALUE1, ecMap.get(KEY1));
239
240 // Remote put
241 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800242 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800243
244 // Create a latch so we know when the put operation has finished
245 latch = new CountDownLatch(1);
246 ecMap.addListener(new TestListener(latch));
247
248 assertNull(ecMap.get(KEY2));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800249 updateHandler.handle(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800250 assertTrue("External listener never got notified of internal event",
251 latch.await(100, TimeUnit.MILLISECONDS));
252 assertEquals(VALUE2, ecMap.get(KEY2));
253
254 // Local remove
255 ecMap.remove(KEY2);
256 assertNull(ecMap.get(KEY2));
257
258 // Remote remove
259 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800260 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800261
262 // Create a latch so we know when the remove operation has finished
263 latch = new CountDownLatch(1);
264 ecMap.addListener(new TestListener(latch));
265
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800266 updateHandler.handle(removeMessage);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800267 assertTrue("External listener never got notified of internal event",
268 latch.await(100, TimeUnit.MILLISECONDS));
269 assertNull(ecMap.get(KEY1));
270 }
271
272 @Test
273 public void testPut() throws Exception {
274 // Set up expectations of external events to be sent to listeners during
275 // the test. These don't use timestamps so we can set them all up at once.
276 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800277 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800278 listener.event(new EventuallyConsistentMapEvent<>(
279 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
280 listener.event(new EventuallyConsistentMapEvent<>(
281 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
282 replay(listener);
283
284 ecMap.addListener(listener);
285
286 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800287 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700288 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800289
290 // Put first value
291 assertNull(ecMap.get(KEY1));
292 ecMap.put(KEY1, VALUE1);
293 assertEquals(VALUE1, ecMap.get(KEY1));
294
295 verify(clusterCommunicator);
296
297 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800298 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700299 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800300
301 // Update same key to a new value
302 ecMap.put(KEY1, VALUE2);
303 assertEquals(VALUE2, ecMap.get(KEY1));
304
305 verify(clusterCommunicator);
306
307 // Do a put with a older timestamp than the value already there.
308 // The map data should not be changed and no notifications should be sent.
309 reset(clusterCommunicator);
310 replay(clusterCommunicator);
311
312 clockService.turnBackTime();
313 ecMap.put(KEY1, VALUE1);
314 // Value should not have changed.
315 assertEquals(VALUE2, ecMap.get(KEY1));
316
317 verify(clusterCommunicator);
318
319 // Check that our listener received the correct events during the test
320 verify(listener);
321 }
322
323 @Test
324 public void testRemove() throws Exception {
325 // Set up expectations of external events to be sent to listeners during
326 // the test. These don't use timestamps so we can set them all up at once.
327 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800328 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800329 listener.event(new EventuallyConsistentMapEvent<>(
330 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
331 expectLastCall().times(2);
332 listener.event(new EventuallyConsistentMapEvent<>(
333 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
334 listener.event(new EventuallyConsistentMapEvent<>(
335 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
336 replay(listener);
337
338 ecMap.addListener(listener);
339
340 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800341 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800342 ecMap.put(KEY1, VALUE1);
343 assertEquals(VALUE1, ecMap.get(KEY1));
344
345 // Remove the value and check the correct internal cluster messages
346 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800347 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700348 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800349
350 ecMap.remove(KEY1);
351 assertNull(ecMap.get(KEY1));
352
353 verify(clusterCommunicator);
354
355 // Remove the same value again. Even though the value is no longer in
356 // the map, we expect that the tombstone is updated and another remove
357 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800358 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700359 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800360
361 ecMap.remove(KEY1);
362 assertNull(ecMap.get(KEY1));
363
364 verify(clusterCommunicator);
365
366
367 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800368 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800369
370 ecMap.put(KEY2, VALUE2);
371
372 clockService.turnBackTime();
373
374 // Remove should have no effect, since it has an older timestamp than
375 // the put. Expect no notifications to be sent out
376 reset(clusterCommunicator);
377 replay(clusterCommunicator);
378
379 ecMap.remove(KEY2);
380
381 verify(clusterCommunicator);
382
383 // Check that our listener received the correct events during the test
384 verify(listener);
385 }
386
387 @Test
388 public void testPutAll() throws Exception {
389 // putAll() with an empty map is a no-op - no messages will be sent
390 reset(clusterCommunicator);
391 replay(clusterCommunicator);
392
393 ecMap.putAll(new HashMap<>());
394
395 verify(clusterCommunicator);
396
397 // Set up the listener with our expected events
398 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800399 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800400 listener.event(new EventuallyConsistentMapEvent<>(
401 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
402 listener.event(new EventuallyConsistentMapEvent<>(
403 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
404 replay(listener);
405
406 ecMap.addListener(listener);
407
408 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700409 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800410 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800411
412 Map<String, String> putAllValues = new HashMap<>();
413 putAllValues.put(KEY1, VALUE1);
414 putAllValues.put(KEY2, VALUE2);
415
416 // Put the values in the map
417 ecMap.putAll(putAllValues);
418
419 // Check the correct messages and events were sent
420 verify(clusterCommunicator);
421 verify(listener);
422 }
423
424 @Test
425 public void testClear() throws Exception {
426 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800427 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800428 listener.event(new EventuallyConsistentMapEvent<>(
429 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
430 listener.event(new EventuallyConsistentMapEvent<>(
431 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
432 replay(listener);
433
434 // clear() on an empty map is a no-op - no messages will be sent
435 reset(clusterCommunicator);
436 replay(clusterCommunicator);
437
438 assertTrue(ecMap.isEmpty());
439 ecMap.clear();
440 verify(clusterCommunicator);
441
442 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800443 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800444 ecMap.put(KEY1, VALUE1);
445 ecMap.put(KEY2, VALUE2);
446
447 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700448 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800449
450 ecMap.clear();
451
452 verify(clusterCommunicator);
453 verify(listener);
454 }
455
456 @Test
457 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800458 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800459
460 assertTrue(ecMap.keySet().isEmpty());
461
462 // Generate some keys
463 Set<String> keys = new HashSet<>();
464 for (int i = 1; i <= 10; i++) {
465 keys.add("" + i);
466 }
467
468 // Put each key in the map
469 keys.forEach(k -> ecMap.put(k, "value" + k));
470
471 // Check keySet() returns the correct value
472 assertEquals(keys, ecMap.keySet());
473
474 // Update the value for one of the keys
475 ecMap.put(keys.iterator().next(), "new-value");
476
477 // Check the key set is still the same
478 assertEquals(keys, ecMap.keySet());
479
480 // Remove a key
481 String removeKey = keys.iterator().next();
482 keys.remove(removeKey);
483 ecMap.remove(removeKey);
484
485 // Check the key set is still correct
486 assertEquals(keys, ecMap.keySet());
487 }
488
489 @Test
490 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800491 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800492
493 assertTrue(ecMap.values().isEmpty());
494
495 // Generate some values
496 Map<String, String> expectedValues = new HashMap<>();
497 for (int i = 1; i <= 10; i++) {
498 expectedValues.put("" + i, "value" + i);
499 }
500
501 // Add them into the map
502 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
503
504 // Check the values collection is correct
505 assertEquals(expectedValues.values().size(), ecMap.values().size());
506 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
507
508 // Update the value for one of the keys
509 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
510 expectedValues.put(first.getKey(), "new-value");
511 ecMap.put(first.getKey(), "new-value");
512
513 // Check the values collection is still correct
514 assertEquals(expectedValues.values().size(), ecMap.values().size());
515 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
516
517 // Remove a key
518 String removeKey = expectedValues.keySet().iterator().next();
519 expectedValues.remove(removeKey);
520 ecMap.remove(removeKey);
521
522 // Check the values collection is still correct
523 assertEquals(expectedValues.values().size(), ecMap.values().size());
524 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
525 }
526
527 @Test
528 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800529 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800530
531 assertTrue(ecMap.entrySet().isEmpty());
532
533 // Generate some values
534 Map<String, String> expectedValues = new HashMap<>();
535 for (int i = 1; i <= 10; i++) {
536 expectedValues.put("" + i, "value" + i);
537 }
538
539 // Add them into the map
540 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
541
542 // Check the entry set is correct
543 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
544
545 // Update the value for one of the keys
546 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
547 expectedValues.put(first.getKey(), "new-value");
548 ecMap.put(first.getKey(), "new-value");
549
550 // Check the entry set is still correct
551 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
552
553 // Remove a key
554 String removeKey = expectedValues.keySet().iterator().next();
555 expectedValues.remove(removeKey);
556 ecMap.remove(removeKey);
557
558 // Check the entry set is still correct
559 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
560 }
561
562 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
563 if (expectedMap.entrySet().size() != actual.size()) {
564 return false;
565 }
566
567 for (Map.Entry<String, String> e : actual) {
568 if (!expectedMap.containsKey(e.getKey())) {
569 return false;
570 }
571 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
572 return false;
573 }
574 }
575 return true;
576 }
577
578 @Test
579 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800580 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800581 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
582
583 replay(clusterCommunicator);
584
585 ecMap.destroy();
586
587 verify(clusterCommunicator);
588
589 try {
590 ecMap.get(KEY1);
591 fail("get after destroy should throw exception");
592 } catch (IllegalStateException e) {
593 assertTrue(true);
594 }
595
596 try {
597 ecMap.put(KEY1, VALUE1);
598 fail("put after destroy should throw exception");
599 } catch (IllegalStateException e) {
600 assertTrue(true);
601 }
602 }
603
604 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800605 PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800606
607 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800608 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
609 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800610 }
611
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700612 private List<PutEntry<String, String>> generatePutMessage(
613 String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800614 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800615
616 Timestamp timestamp1 = clockService.peek(1);
617 Timestamp timestamp2 = clockService.peek(2);
618
Jonathan Hartf9108232015-02-02 16:37:35 -0800619 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
620 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800621
622 list.add(pe1);
623 list.add(pe2);
624
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700625 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800626 }
627
628 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800629 RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800630
631 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800632 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
633 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800634 }
635
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700636 private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800637 ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800638
639 Timestamp timestamp1 = clockService.peek(1);
640 Timestamp timestamp2 = clockService.peek(2);
641
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800642 RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
643 RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800644
645 list.add(re1);
646 list.add(re2);
647
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700648 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800649 }
650
651 /**
652 * Sets up a mock ClusterCommunicationService to expect a specific cluster
653 * message to be broadcast to the cluster.
654 *
655 * @param m message we expect to be sent
656 * @param clusterCommunicator a mock ClusterCommunicationService to set up
657 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800658 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700659 private static <T> void expectSpecificBroadcastMessage(
660 T message,
661 MessageSubject subject,
662 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800663 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700664 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
665 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800666 replay(clusterCommunicator);
667 }
668
669 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800670 * Sets up a mock ClusterCommunicationService to expect a specific cluster
671 * message to be multicast to the cluster.
672 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700673 * @param message message we expect to be sent
674 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800675 * @param clusterCommunicator a mock ClusterCommunicationService to set up
676 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800677 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700678 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800679 ClusterCommunicationService clusterCommunicator) {
680 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700681 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
682 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800683 replay(clusterCommunicator);
684 }
685
686
687 /**
688 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800689 * that is sent to it. This is useful for unit tests where we aren't
690 * interested in testing the messaging component.
691 *
692 * @param clusterCommunicator a mock ClusterCommunicationService to set up
693 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800694 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700695 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800696 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800697// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
698// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700699 expect(clusterCommunicator.<T>unicast(
700 anyObject(),
701 anyObject(MessageSubject.class),
702 anyObject(Function.class),
703 anyObject(NodeId.class)))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800704 .andReturn(true)
705 .anyTimes();
706 replay(clusterCommunicator);
707 }
708
709 /**
710 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
711 * that is sent to it. This is useful for unit tests where we aren't
712 * interested in testing the messaging component.
713 *
714 * @param clusterCommunicator a mock ClusterCommunicationService to set up
715 */
716 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800717 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700718 clusterCommunicator.<AbstractEvent>multicast(
719 anyObject(AbstractEvent.class),
720 anyObject(MessageSubject.class),
721 anyObject(Function.class),
722 anyObject(Set.class));
723 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800724 replay(clusterCommunicator);
725 }
726
727 /**
728 * ClusterCommunicationService implementation that the map's addSubscriber
729 * call will delegate to. This means we can get a reference to the
730 * internal cluster message handler used by the map, so that we can simulate
731 * events coming in from other instances.
732 */
733 private final class TestClusterCommunicationService
734 implements ClusterCommunicationService {
735
736 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800737 public void addSubscriber(MessageSubject subject,
738 ClusterMessageHandler subscriber,
739 ExecutorService executor) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800740 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
741 updateHandler = subscriber;
Madan Jampani2af244a2015-02-22 13:12:01 -0800742 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
743 antiEntropyHandler = subscriber;
744 } else {
745 throw new RuntimeException("Unexpected message subject " + subject.toString());
746 }
747 }
748
749 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800750 public void removeSubscriber(MessageSubject subject) {}
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700751
752 @Override
753 public <M> void broadcast(M message, MessageSubject subject,
754 Function<M, byte[]> encoder) {
755 }
756
757 @Override
758 public <M> void broadcastIncludeSelf(M message,
759 MessageSubject subject, Function<M, byte[]> encoder) {
760 }
761
762 @Override
763 public <M> boolean unicast(M message, MessageSubject subject,
764 Function<M, byte[]> encoder, NodeId toNodeId) {
765 return false;
766 }
767
768 @Override
769 public <M> void multicast(M message, MessageSubject subject,
770 Function<M, byte[]> encoder, Set<NodeId> nodes) {
771 }
772
773 @Override
774 public <M, R> CompletableFuture<R> sendAndReceive(M message,
775 MessageSubject subject, Function<M, byte[]> encoder,
776 Function<byte[], R> decoder, NodeId toNodeId) {
777 return null;
778 }
779
780 @Override
781 public <M, R> void addSubscriber(MessageSubject subject,
782 Function<byte[], M> decoder, Function<M, R> handler,
783 Function<R, byte[]> encoder, ExecutorService executor) {
784 }
785
786 @Override
787 public <M> void addSubscriber(MessageSubject subject,
788 Function<byte[], M> decoder, Consumer<M> handler,
789 ExecutorService executor) {
790 }
791
792 @Override
793 public boolean broadcast(ClusterMessage message) {
794 return false;
795 }
796
797 @Override
798 public boolean broadcastIncludeSelf(ClusterMessage message) {
799 return false;
800 }
801
802 @Override
803 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
804 return false;
805 }
806
807 @Override
808 public boolean multicast(ClusterMessage message,
809 Iterable<NodeId> nodeIds) {
810 return false;
811 }
812
813 @Override
814 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
815 NodeId toNodeId) {
816 return null;
817 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800818 }
819
820 /**
821 * ClockService implementation that gives out timestamps based on a
822 * sequential counter. This clock service enables more control over the
823 * timestamps that are given out, including being able to "turn back time"
824 * to give out timestamps from the past.
825 *
826 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800827 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800828 */
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800829 private class SequentialClockService<T, U> implements ClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800830
831 private static final long INITIAL_VALUE = 1;
832 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
833
834 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800835 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800836 return new TestTimestamp(counter.getAndIncrement());
837 }
838
839 /**
840 * Returns what the next timestamp will be without consuming the
841 * timestamp. This allows test code to set expectations correctly while
842 * still allowing the CUT to get the same timestamp.
843 *
844 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800845 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800846 */
847 public Timestamp peekAtNextTimestamp() {
848 return peek(1);
849 }
850
851 /**
852 * Returns the ith timestamp to be given out in the future without
853 * consuming the timestamp. For example, i=1 returns the next timestamp,
854 * i=2 returns the timestamp after that, and so on.
855 *
856 * @param i number of the timestamp to peek at
857 * @return the ith timestamp that will be given out
858 */
859 public Timestamp peek(int i) {
860 checkArgument(i > 0, "i must be a positive integer");
861
862 return new TestTimestamp(counter.get() + i - 1);
863 }
864
865 /**
866 * Turns the clock back two ticks, so the next call to getTimestamp will
867 * return an older timestamp than the previous call to getTimestamp.
868 */
869 public void turnBackTime() {
870 // Not atomic, but should be OK for these tests.
871 counter.decrementAndGet();
872 counter.decrementAndGet();
873 }
874
875 }
876
877 /**
878 * Timestamp implementation where the value of the timestamp can be
879 * specified explicitly at creation time.
880 */
881 private class TestTimestamp implements Timestamp {
882
883 private final long timestamp;
884
885 /**
886 * Creates a new timestamp that has the specified value.
887 *
888 * @param timestamp value of the timestamp
889 */
890 public TestTimestamp(long timestamp) {
891 this.timestamp = timestamp;
892 }
893
894 @Override
895 public int compareTo(Timestamp o) {
896 checkArgument(o instanceof TestTimestamp);
897 TestTimestamp otherTimestamp = (TestTimestamp) o;
898 return ComparisonChain.start()
899 .compare(this.timestamp, otherTimestamp.timestamp)
900 .result();
901 }
902 }
903
904 /**
905 * EventuallyConsistentMapListener implementation which triggers a latch
906 * when it receives an event.
907 */
908 private class TestListener implements EventuallyConsistentMapListener<String, String> {
909 private CountDownLatch latch;
910
911 /**
912 * Creates a new listener that will trigger the specified latch when it
913 * receives and event.
914 *
915 * @param latch the latch to trigger on events
916 */
917 public TestListener(CountDownLatch latch) {
918 this.latch = latch;
919 }
920
921 @Override
922 public void event(EventuallyConsistentMapEvent<String, String> event) {
923 latch.countDown();
924 }
925 }
926}