blob: 869976c082e1caccc874cccf118f26ba4e10c954 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080019import com.google.common.collect.ImmutableSet;
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080020import com.google.common.collect.Lists;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080021import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani3e033bd2015-04-08 13:03:49 -070022
Jonathan Hart584d2f32015-01-27 19:46:14 -080023import org.junit.After;
24import org.junit.Before;
25import org.junit.Test;
26import org.onlab.packet.IpAddress;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.DefaultControllerNode;
31import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070032import org.onosproject.event.AbstractEvent;
Jonathan Hart584d2f32015-01-27 19:46:14 -080033import org.onosproject.store.Timestamp;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.ClusterMessage;
36import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070038import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart63939a32015-05-08 11:57:03 -070039import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080040import org.onosproject.store.serializers.KryoNamespaces;
41import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070042import org.onosproject.store.service.EventuallyConsistentMap;
43import org.onosproject.store.service.EventuallyConsistentMapEvent;
44import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hart584d2f32015-01-27 19:46:14 -080045
Jonathan Hart584d2f32015-01-27 19:46:14 -080046import java.util.ArrayList;
47import java.util.HashMap;
48import java.util.HashSet;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070049import java.util.List;
Jonathan Hart584d2f32015-01-27 19:46:14 -080050import java.util.Map;
51import java.util.Objects;
52import java.util.Set;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070053import java.util.concurrent.CompletableFuture;
Jonathan Hart584d2f32015-01-27 19:46:14 -080054import java.util.concurrent.CountDownLatch;
Madan Jampaniec5ae342015-04-13 15:43:10 -070055import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080056import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080057import java.util.concurrent.TimeUnit;
58import java.util.concurrent.atomic.AtomicLong;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070059import java.util.function.Consumer;
60import java.util.function.Function;
Jonathan Hart584d2f32015-01-27 19:46:14 -080061
62import static com.google.common.base.Preconditions.checkArgument;
63import static junit.framework.TestCase.assertFalse;
64import static org.easymock.EasyMock.*;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080065import static org.junit.Assert.*;
Jonathan Hart584d2f32015-01-27 19:46:14 -080066
67/**
68 * Unit tests for EventuallyConsistentMapImpl.
69 */
70public class EventuallyConsistentMapImplTest {
71
72 private EventuallyConsistentMap<String, String> ecMap;
73
74 private ClusterService clusterService;
75 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080076 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080077
78 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080079 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080080 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080081 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
82 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
83
84 private static final String KEY1 = "one";
85 private static final String KEY2 = "two";
86 private static final String VALUE1 = "oneValue";
87 private static final String VALUE2 = "twoValue";
88
89 private final ControllerNode self =
90 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
91
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080092 private ClusterMessageHandler updateHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -080093 private ClusterMessageHandler antiEntropyHandler;
94
95 /*
96 * Serialization is a bit tricky here. We need to serialize in the tests
97 * to set the expectations, which will use this serializer here, but the
98 * EventuallyConsistentMap will use its own internal serializer. This means
99 * this serializer must be set up exactly the same as map's internal
100 * serializer.
101 */
102 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
103 @Override
104 protected void setupKryoPool() {
105 serializerPool = KryoNamespace.newBuilder()
106 // Classes we give to the map
107 .register(KryoNamespaces.API)
108 .register(TestTimestamp.class)
109 // Below is the classes that the map internally registers
Madan Jampani3e033bd2015-04-08 13:03:49 -0700110 .register(LogicalTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800111 .register(WallClockTimestamp.class)
Jonathan Hartf9108232015-02-02 16:37:35 -0800112 .register(PutEntry.class)
113 .register(RemoveEntry.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800114 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800115 .register(AntiEntropyAdvertisement.class)
116 .register(HashMap.class)
117 .build();
118 }
119 };
120
121 @Before
122 public void setUp() throws Exception {
123 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800124 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
125 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800126 replay(clusterService);
127
128 clusterCommunicator = createMock(ClusterCommunicationService.class);
129
130 // Add expectation for adding cluster message subscribers which
131 // delegate to our ClusterCommunicationService implementation. This
132 // allows us to get a reference to the map's internal cluster message
133 // handlers so we can induce events coming in from a peer.
134 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
Madan Jampani2af244a2015-02-22 13:12:01 -0800135 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800136 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
137
138 replay(clusterCommunicator);
139
140 clockService = new SequentialClockService<>();
141
142 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
143 .register(KryoNamespaces.API)
144 .register(TestTimestamp.class);
145
Madan Jampani175e8fd2015-05-20 14:10:45 -0700146 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 clusterService, clusterCommunicator)
148 .withName(MAP_NAME)
149 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700150 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
152 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800153
154 // Reset ready for tests to add their own expectations
155 reset(clusterCommunicator);
156 }
157
158 @After
159 public void tearDown() {
160 reset(clusterCommunicator);
161 ecMap.destroy();
162 }
163
Ray Milkey8dc82082015-02-20 16:22:38 -0800164 @SuppressWarnings("unchecked")
165 private EventuallyConsistentMapListener<String, String> getListener() {
166 return createMock(EventuallyConsistentMapListener.class);
167 }
168
Jonathan Hart584d2f32015-01-27 19:46:14 -0800169 @Test
170 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800171 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800172
173 assertEquals(0, ecMap.size());
174 ecMap.put(KEY1, VALUE1);
175 assertEquals(1, ecMap.size());
176 ecMap.put(KEY1, VALUE2);
177 assertEquals(1, ecMap.size());
178 ecMap.put(KEY2, VALUE2);
179 assertEquals(2, ecMap.size());
180 for (int i = 0; i < 10; i++) {
181 ecMap.put("" + i, "" + i);
182 }
183 assertEquals(12, ecMap.size());
184 ecMap.remove(KEY1);
185 assertEquals(11, ecMap.size());
186 ecMap.remove(KEY1);
187 assertEquals(11, ecMap.size());
188 }
189
190 @Test
191 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800192 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800193
194 assertTrue(ecMap.isEmpty());
195 ecMap.put(KEY1, VALUE1);
196 assertFalse(ecMap.isEmpty());
197 ecMap.remove(KEY1);
198 assertTrue(ecMap.isEmpty());
199 }
200
201 @Test
202 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800203 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800204
205 assertFalse(ecMap.containsKey(KEY1));
206 ecMap.put(KEY1, VALUE1);
207 assertTrue(ecMap.containsKey(KEY1));
208 assertFalse(ecMap.containsKey(KEY2));
209 ecMap.remove(KEY1);
210 assertFalse(ecMap.containsKey(KEY1));
211 }
212
213 @Test
214 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800215 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800216
217 assertFalse(ecMap.containsValue(VALUE1));
218 ecMap.put(KEY1, VALUE1);
219 assertTrue(ecMap.containsValue(VALUE1));
220 assertFalse(ecMap.containsValue(VALUE2));
221 ecMap.put(KEY1, VALUE2);
222 assertFalse(ecMap.containsValue(VALUE1));
223 assertTrue(ecMap.containsValue(VALUE2));
224 ecMap.remove(KEY1);
225 assertFalse(ecMap.containsValue(VALUE2));
226 }
227
228 @Test
229 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800230 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800231
232 CountDownLatch latch;
233
234 // Local put
235 assertNull(ecMap.get(KEY1));
236 ecMap.put(KEY1, VALUE1);
237 assertEquals(VALUE1, ecMap.get(KEY1));
238
239 // Remote put
240 ClusterMessage message
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800241 = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800242
243 // Create a latch so we know when the put operation has finished
244 latch = new CountDownLatch(1);
245 ecMap.addListener(new TestListener(latch));
246
247 assertNull(ecMap.get(KEY2));
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800248 updateHandler.handle(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800249 assertTrue("External listener never got notified of internal event",
250 latch.await(100, TimeUnit.MILLISECONDS));
251 assertEquals(VALUE2, ecMap.get(KEY2));
252
253 // Local remove
254 ecMap.remove(KEY2);
255 assertNull(ecMap.get(KEY2));
256
257 // Remote remove
258 ClusterMessage removeMessage
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800259 = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800260
261 // Create a latch so we know when the remove operation has finished
262 latch = new CountDownLatch(1);
263 ecMap.addListener(new TestListener(latch));
264
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800265 updateHandler.handle(removeMessage);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800266 assertTrue("External listener never got notified of internal event",
267 latch.await(100, TimeUnit.MILLISECONDS));
268 assertNull(ecMap.get(KEY1));
269 }
270
271 @Test
272 public void testPut() throws Exception {
273 // Set up expectations of external events to be sent to listeners during
274 // the test. These don't use timestamps so we can set them all up at once.
275 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800276 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800277 listener.event(new EventuallyConsistentMapEvent<>(
278 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
279 listener.event(new EventuallyConsistentMapEvent<>(
280 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
281 replay(listener);
282
283 ecMap.addListener(listener);
284
285 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800286 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700287 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800288
289 // Put first value
290 assertNull(ecMap.get(KEY1));
291 ecMap.put(KEY1, VALUE1);
292 assertEquals(VALUE1, ecMap.get(KEY1));
293
294 verify(clusterCommunicator);
295
296 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800297 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700298 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800299
300 // Update same key to a new value
301 ecMap.put(KEY1, VALUE2);
302 assertEquals(VALUE2, ecMap.get(KEY1));
303
304 verify(clusterCommunicator);
305
306 // Do a put with a older timestamp than the value already there.
307 // The map data should not be changed and no notifications should be sent.
308 reset(clusterCommunicator);
309 replay(clusterCommunicator);
310
311 clockService.turnBackTime();
312 ecMap.put(KEY1, VALUE1);
313 // Value should not have changed.
314 assertEquals(VALUE2, ecMap.get(KEY1));
315
316 verify(clusterCommunicator);
317
318 // Check that our listener received the correct events during the test
319 verify(listener);
320 }
321
322 @Test
323 public void testRemove() throws Exception {
324 // Set up expectations of external events to be sent to listeners during
325 // the test. These don't use timestamps so we can set them all up at once.
326 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800327 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800328 listener.event(new EventuallyConsistentMapEvent<>(
329 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
330 expectLastCall().times(2);
331 listener.event(new EventuallyConsistentMapEvent<>(
332 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
333 listener.event(new EventuallyConsistentMapEvent<>(
334 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
335 replay(listener);
336
337 ecMap.addListener(listener);
338
339 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800340 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800341 ecMap.put(KEY1, VALUE1);
342 assertEquals(VALUE1, ecMap.get(KEY1));
343
344 // Remove the value and check the correct internal cluster messages
345 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800346 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700347 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800348
349 ecMap.remove(KEY1);
350 assertNull(ecMap.get(KEY1));
351
352 verify(clusterCommunicator);
353
354 // Remove the same value again. Even though the value is no longer in
355 // the map, we expect that the tombstone is updated and another remove
356 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800357 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700358 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800359
360 ecMap.remove(KEY1);
361 assertNull(ecMap.get(KEY1));
362
363 verify(clusterCommunicator);
364
365
366 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800367 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800368
369 ecMap.put(KEY2, VALUE2);
370
371 clockService.turnBackTime();
372
373 // Remove should have no effect, since it has an older timestamp than
374 // the put. Expect no notifications to be sent out
375 reset(clusterCommunicator);
376 replay(clusterCommunicator);
377
378 ecMap.remove(KEY2);
379
380 verify(clusterCommunicator);
381
382 // Check that our listener received the correct events during the test
383 verify(listener);
384 }
385
386 @Test
387 public void testPutAll() throws Exception {
388 // putAll() with an empty map is a no-op - no messages will be sent
389 reset(clusterCommunicator);
390 replay(clusterCommunicator);
391
392 ecMap.putAll(new HashMap<>());
393
394 verify(clusterCommunicator);
395
396 // Set up the listener with our expected events
397 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800398 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800399 listener.event(new EventuallyConsistentMapEvent<>(
400 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
401 listener.event(new EventuallyConsistentMapEvent<>(
402 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
403 replay(listener);
404
405 ecMap.addListener(listener);
406
407 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700408 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800409 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800410
411 Map<String, String> putAllValues = new HashMap<>();
412 putAllValues.put(KEY1, VALUE1);
413 putAllValues.put(KEY2, VALUE2);
414
415 // Put the values in the map
416 ecMap.putAll(putAllValues);
417
418 // Check the correct messages and events were sent
419 verify(clusterCommunicator);
420 verify(listener);
421 }
422
423 @Test
424 public void testClear() throws Exception {
425 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800426 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800427 listener.event(new EventuallyConsistentMapEvent<>(
428 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
429 listener.event(new EventuallyConsistentMapEvent<>(
430 EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
431 replay(listener);
432
433 // clear() on an empty map is a no-op - no messages will be sent
434 reset(clusterCommunicator);
435 replay(clusterCommunicator);
436
437 assertTrue(ecMap.isEmpty());
438 ecMap.clear();
439 verify(clusterCommunicator);
440
441 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800442 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800443 ecMap.put(KEY1, VALUE1);
444 ecMap.put(KEY2, VALUE2);
445
446 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700447 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800448
449 ecMap.clear();
450
451 verify(clusterCommunicator);
452 verify(listener);
453 }
454
455 @Test
456 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800457 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800458
459 assertTrue(ecMap.keySet().isEmpty());
460
461 // Generate some keys
462 Set<String> keys = new HashSet<>();
463 for (int i = 1; i <= 10; i++) {
464 keys.add("" + i);
465 }
466
467 // Put each key in the map
468 keys.forEach(k -> ecMap.put(k, "value" + k));
469
470 // Check keySet() returns the correct value
471 assertEquals(keys, ecMap.keySet());
472
473 // Update the value for one of the keys
474 ecMap.put(keys.iterator().next(), "new-value");
475
476 // Check the key set is still the same
477 assertEquals(keys, ecMap.keySet());
478
479 // Remove a key
480 String removeKey = keys.iterator().next();
481 keys.remove(removeKey);
482 ecMap.remove(removeKey);
483
484 // Check the key set is still correct
485 assertEquals(keys, ecMap.keySet());
486 }
487
488 @Test
489 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800490 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800491
492 assertTrue(ecMap.values().isEmpty());
493
494 // Generate some values
495 Map<String, String> expectedValues = new HashMap<>();
496 for (int i = 1; i <= 10; i++) {
497 expectedValues.put("" + i, "value" + i);
498 }
499
500 // Add them into the map
501 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
502
503 // Check the values collection is correct
504 assertEquals(expectedValues.values().size(), ecMap.values().size());
505 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
506
507 // Update the value for one of the keys
508 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
509 expectedValues.put(first.getKey(), "new-value");
510 ecMap.put(first.getKey(), "new-value");
511
512 // Check the values collection is still correct
513 assertEquals(expectedValues.values().size(), ecMap.values().size());
514 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
515
516 // Remove a key
517 String removeKey = expectedValues.keySet().iterator().next();
518 expectedValues.remove(removeKey);
519 ecMap.remove(removeKey);
520
521 // Check the values collection is still correct
522 assertEquals(expectedValues.values().size(), ecMap.values().size());
523 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
524 }
525
526 @Test
527 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800528 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800529
530 assertTrue(ecMap.entrySet().isEmpty());
531
532 // Generate some values
533 Map<String, String> expectedValues = new HashMap<>();
534 for (int i = 1; i <= 10; i++) {
535 expectedValues.put("" + i, "value" + i);
536 }
537
538 // Add them into the map
539 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
540
541 // Check the entry set is correct
542 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
543
544 // Update the value for one of the keys
545 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
546 expectedValues.put(first.getKey(), "new-value");
547 ecMap.put(first.getKey(), "new-value");
548
549 // Check the entry set is still correct
550 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
551
552 // Remove a key
553 String removeKey = expectedValues.keySet().iterator().next();
554 expectedValues.remove(removeKey);
555 ecMap.remove(removeKey);
556
557 // Check the entry set is still correct
558 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
559 }
560
561 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
562 if (expectedMap.entrySet().size() != actual.size()) {
563 return false;
564 }
565
566 for (Map.Entry<String, String> e : actual) {
567 if (!expectedMap.containsKey(e.getKey())) {
568 return false;
569 }
570 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
571 return false;
572 }
573 }
574 return true;
575 }
576
577 @Test
578 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800579 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800580 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
581
582 replay(clusterCommunicator);
583
584 ecMap.destroy();
585
586 verify(clusterCommunicator);
587
588 try {
589 ecMap.get(KEY1);
590 fail("get after destroy should throw exception");
591 } catch (IllegalStateException e) {
592 assertTrue(true);
593 }
594
595 try {
596 ecMap.put(KEY1, VALUE1);
597 fail("put after destroy should throw exception");
598 } catch (IllegalStateException e) {
599 assertTrue(true);
600 }
601 }
602
603 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800604 PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800605
606 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800607 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
608 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800609 }
610
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700611 private List<PutEntry<String, String>> generatePutMessage(
612 String key1, String value1, String key2, String value2) {
Jonathan Hartf9108232015-02-02 16:37:35 -0800613 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800614
615 Timestamp timestamp1 = clockService.peek(1);
616 Timestamp timestamp2 = clockService.peek(2);
617
Jonathan Hartf9108232015-02-02 16:37:35 -0800618 PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
619 PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800620
621 list.add(pe1);
622 list.add(pe2);
623
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700624 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800625 }
626
627 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800628 RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800629
630 return new ClusterMessage(
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800631 clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
632 SERIALIZER.encode(Lists.newArrayList(event)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800633 }
634
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700635 private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800636 ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800637
638 Timestamp timestamp1 = clockService.peek(1);
639 Timestamp timestamp2 = clockService.peek(2);
640
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800641 RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
642 RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800643
644 list.add(re1);
645 list.add(re2);
646
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700647 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800648 }
649
650 /**
651 * Sets up a mock ClusterCommunicationService to expect a specific cluster
652 * message to be broadcast to the cluster.
653 *
654 * @param m message we expect to be sent
655 * @param clusterCommunicator a mock ClusterCommunicationService to set up
656 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800657 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700658 private static <T> void expectSpecificBroadcastMessage(
659 T message,
660 MessageSubject subject,
661 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800662 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700663 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
664 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800665 replay(clusterCommunicator);
666 }
667
668 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800669 * Sets up a mock ClusterCommunicationService to expect a specific cluster
670 * message to be multicast to the cluster.
671 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700672 * @param message message we expect to be sent
673 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800674 * @param clusterCommunicator a mock ClusterCommunicationService to set up
675 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800676 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700677 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800678 ClusterCommunicationService clusterCommunicator) {
679 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700680 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
681 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800682 replay(clusterCommunicator);
683 }
684
685
686 /**
687 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800688 * that is sent to it. This is useful for unit tests where we aren't
689 * interested in testing the messaging component.
690 *
691 * @param clusterCommunicator a mock ClusterCommunicationService to set up
692 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800693 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700694 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800695 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800696// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
697// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700698 expect(clusterCommunicator.<T>unicast(
699 anyObject(),
700 anyObject(MessageSubject.class),
701 anyObject(Function.class),
702 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700703 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800704 .anyTimes();
705 replay(clusterCommunicator);
706 }
707
708 /**
709 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
710 * that is sent to it. This is useful for unit tests where we aren't
711 * interested in testing the messaging component.
712 *
713 * @param clusterCommunicator a mock ClusterCommunicationService to set up
714 */
715 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800716 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700717 clusterCommunicator.<AbstractEvent>multicast(
718 anyObject(AbstractEvent.class),
719 anyObject(MessageSubject.class),
720 anyObject(Function.class),
721 anyObject(Set.class));
722 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800723 replay(clusterCommunicator);
724 }
725
726 /**
727 * ClusterCommunicationService implementation that the map's addSubscriber
728 * call will delegate to. This means we can get a reference to the
729 * internal cluster message handler used by the map, so that we can simulate
730 * events coming in from other instances.
731 */
732 private final class TestClusterCommunicationService
733 implements ClusterCommunicationService {
734
735 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800736 public void addSubscriber(MessageSubject subject,
737 ClusterMessageHandler subscriber,
738 ExecutorService executor) {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800739 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
740 updateHandler = subscriber;
Madan Jampani2af244a2015-02-22 13:12:01 -0800741 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
742 antiEntropyHandler = subscriber;
743 } else {
744 throw new RuntimeException("Unexpected message subject " + subject.toString());
745 }
746 }
747
748 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800749 public void removeSubscriber(MessageSubject subject) {}
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700750
751 @Override
752 public <M> void broadcast(M message, MessageSubject subject,
753 Function<M, byte[]> encoder) {
754 }
755
756 @Override
757 public <M> void broadcastIncludeSelf(M message,
758 MessageSubject subject, Function<M, byte[]> encoder) {
759 }
760
761 @Override
Madan Jampani175e8fd2015-05-20 14:10:45 -0700762 public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700763 Function<M, byte[]> encoder, NodeId toNodeId) {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700764 return null;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700765 }
766
767 @Override
768 public <M> void multicast(M message, MessageSubject subject,
769 Function<M, byte[]> encoder, Set<NodeId> nodes) {
770 }
771
772 @Override
773 public <M, R> CompletableFuture<R> sendAndReceive(M message,
774 MessageSubject subject, Function<M, byte[]> encoder,
775 Function<byte[], R> decoder, NodeId toNodeId) {
776 return null;
777 }
778
779 @Override
780 public <M, R> void addSubscriber(MessageSubject subject,
781 Function<byte[], M> decoder, Function<M, R> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700782 Function<R, byte[]> encoder, Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700783 }
784
785 @Override
Madan Jampani27b69c62015-05-15 15:49:02 -0700786 public <M, R> void addSubscriber(MessageSubject subject,
787 Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler,
788 Function<R, byte[]> encoder) {
789 }
790
791 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700792 public <M> void addSubscriber(MessageSubject subject,
793 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700794 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700795 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800796 }
797
798 /**
799 * ClockService implementation that gives out timestamps based on a
800 * sequential counter. This clock service enables more control over the
801 * timestamps that are given out, including being able to "turn back time"
802 * to give out timestamps from the past.
803 *
804 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800805 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800806 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700807 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800808
809 private static final long INITIAL_VALUE = 1;
810 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
811
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800812 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800813 return new TestTimestamp(counter.getAndIncrement());
814 }
815
816 /**
817 * Returns what the next timestamp will be without consuming the
818 * timestamp. This allows test code to set expectations correctly while
819 * still allowing the CUT to get the same timestamp.
820 *
821 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800822 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800823 */
824 public Timestamp peekAtNextTimestamp() {
825 return peek(1);
826 }
827
828 /**
829 * Returns the ith timestamp to be given out in the future without
830 * consuming the timestamp. For example, i=1 returns the next timestamp,
831 * i=2 returns the timestamp after that, and so on.
832 *
833 * @param i number of the timestamp to peek at
834 * @return the ith timestamp that will be given out
835 */
836 public Timestamp peek(int i) {
837 checkArgument(i > 0, "i must be a positive integer");
838
839 return new TestTimestamp(counter.get() + i - 1);
840 }
841
842 /**
843 * Turns the clock back two ticks, so the next call to getTimestamp will
844 * return an older timestamp than the previous call to getTimestamp.
845 */
846 public void turnBackTime() {
847 // Not atomic, but should be OK for these tests.
848 counter.decrementAndGet();
849 counter.decrementAndGet();
850 }
851
852 }
853
854 /**
855 * Timestamp implementation where the value of the timestamp can be
856 * specified explicitly at creation time.
857 */
858 private class TestTimestamp implements Timestamp {
859
860 private final long timestamp;
861
862 /**
863 * Creates a new timestamp that has the specified value.
864 *
865 * @param timestamp value of the timestamp
866 */
867 public TestTimestamp(long timestamp) {
868 this.timestamp = timestamp;
869 }
870
871 @Override
872 public int compareTo(Timestamp o) {
873 checkArgument(o instanceof TestTimestamp);
874 TestTimestamp otherTimestamp = (TestTimestamp) o;
875 return ComparisonChain.start()
876 .compare(this.timestamp, otherTimestamp.timestamp)
877 .result();
878 }
879 }
880
881 /**
882 * EventuallyConsistentMapListener implementation which triggers a latch
883 * when it receives an event.
884 */
885 private class TestListener implements EventuallyConsistentMapListener<String, String> {
886 private CountDownLatch latch;
887
888 /**
889 * Creates a new listener that will trigger the specified latch when it
890 * receives and event.
891 *
892 * @param latch the latch to trigger on events
893 */
894 public TestListener(CountDownLatch latch) {
895 this.latch = latch;
896 }
897
898 @Override
899 public void event(EventuallyConsistentMapEvent<String, String> event) {
900 latch.countDown();
901 }
902 }
903}