blob: b74aa370b300a429069e2b7439033e7250d61d88 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070018import java.util.ArrayList;
19import java.util.Collection;
20import java.util.HashMap;
21import java.util.HashSet;
22import java.util.List;
23import java.util.Map;
24import java.util.Objects;
25import java.util.Optional;
26import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.CountDownLatch;
29import java.util.concurrent.Executor;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.atomic.AtomicLong;
32import java.util.function.Consumer;
33import java.util.function.Function;
Madan Jampani3e033bd2015-04-08 13:03:49 -070034
Jonathan Hart584d2f32015-01-27 19:46:14 -080035import org.junit.After;
36import org.junit.Before;
37import org.junit.Test;
38import org.onlab.packet.IpAddress;
39import org.onlab.util.KryoNamespace;
40import org.onosproject.cluster.ClusterService;
41import org.onosproject.cluster.ControllerNode;
42import org.onosproject.cluster.DefaultControllerNode;
43import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070044import org.onosproject.event.AbstractEvent;
Madan Jampani2b8e1892015-12-04 11:16:47 -080045import org.onosproject.persistence.PersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080046import org.onosproject.store.Timestamp;
47import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070048import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
Jonathan Hart584d2f32015-01-27 19:46:14 -080049import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070050import org.onosproject.store.impl.LogicalTimestamp;
Madan Jampani2b8e1892015-12-04 11:16:47 -080051import org.onosproject.store.persistence.TestPersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080052import org.onosproject.store.serializers.KryoNamespaces;
53import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070054import org.onosproject.store.service.EventuallyConsistentMap;
55import org.onosproject.store.service.EventuallyConsistentMapEvent;
56import org.onosproject.store.service.EventuallyConsistentMapListener;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070057import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080058
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070059import com.google.common.collect.ComparisonChain;
60import com.google.common.collect.ImmutableList;
61import com.google.common.collect.ImmutableSet;
62import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080063
64import static com.google.common.base.Preconditions.checkArgument;
65import static junit.framework.TestCase.assertFalse;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070066import static org.easymock.EasyMock.anyObject;
67import static org.easymock.EasyMock.createMock;
68import static org.easymock.EasyMock.eq;
69import static org.easymock.EasyMock.expect;
70import static org.easymock.EasyMock.expectLastCall;
71import static org.easymock.EasyMock.replay;
72import static org.easymock.EasyMock.reset;
73import static org.easymock.EasyMock.verify;
74import static org.junit.Assert.assertEquals;
75import static org.junit.Assert.assertNull;
76import static org.junit.Assert.assertTrue;
77import static org.junit.Assert.fail;
Jonathan Hart584d2f32015-01-27 19:46:14 -080078
79/**
80 * Unit tests for EventuallyConsistentMapImpl.
81 */
82public class EventuallyConsistentMapImplTest {
83
84 private EventuallyConsistentMap<String, String> ecMap;
85
Madan Jampani2b8e1892015-12-04 11:16:47 -080086 private PersistenceService persistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080087 private ClusterService clusterService;
88 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080089 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080090
91 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080092 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080093 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080094 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
95 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
96
97 private static final String KEY1 = "one";
98 private static final String KEY2 = "two";
99 private static final String VALUE1 = "oneValue";
100 private static final String VALUE2 = "twoValue";
101
102 private final ControllerNode self =
103 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
104
Madan Jampani3d76c942015-06-29 23:37:10 -0700105 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
106 private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800107
108 /*
109 * Serialization is a bit tricky here. We need to serialize in the tests
110 * to set the expectations, which will use this serializer here, but the
111 * EventuallyConsistentMap will use its own internal serializer. This means
112 * this serializer must be set up exactly the same as map's internal
113 * serializer.
114 */
115 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
116 @Override
117 protected void setupKryoPool() {
118 serializerPool = KryoNamespace.newBuilder()
119 // Classes we give to the map
120 .register(KryoNamespaces.API)
121 .register(TestTimestamp.class)
122 // Below is the classes that the map internally registers
Madan Jampani3e033bd2015-04-08 13:03:49 -0700123 .register(LogicalTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800124 .register(WallClockTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800125 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800126 .register(AntiEntropyAdvertisement.class)
127 .register(HashMap.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700128 .register(Optional.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800129 .build();
130 }
131 };
132
133 @Before
134 public void setUp() throws Exception {
135 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800136 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
137 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800138 replay(clusterService);
139
140 clusterCommunicator = createMock(ClusterCommunicationService.class);
141
Madan Jampani2b8e1892015-12-04 11:16:47 -0800142 persistenceService = new TestPersistenceService();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800143 // Add expectation for adding cluster message subscribers which
144 // delegate to our ClusterCommunicationService implementation. This
145 // allows us to get a reference to the map's internal cluster message
146 // handlers so we can induce events coming in from a peer.
Madan Jampani3d76c942015-06-29 23:37:10 -0700147 clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
148 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
149 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800150
151 replay(clusterCommunicator);
152
153 clockService = new SequentialClockService<>();
154
155 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
156 .register(KryoNamespaces.API)
157 .register(TestTimestamp.class);
158
Madan Jampani175e8fd2015-05-20 14:10:45 -0700159 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700160 clusterService, clusterCommunicator, persistenceService)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700161 .withName(MAP_NAME)
162 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700163 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700164 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700165 .withPersistence()
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700166 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800167
168 // Reset ready for tests to add their own expectations
169 reset(clusterCommunicator);
170 }
171
172 @After
173 public void tearDown() {
174 reset(clusterCommunicator);
175 ecMap.destroy();
176 }
177
Ray Milkey8dc82082015-02-20 16:22:38 -0800178 @SuppressWarnings("unchecked")
179 private EventuallyConsistentMapListener<String, String> getListener() {
180 return createMock(EventuallyConsistentMapListener.class);
181 }
182
Jonathan Hart584d2f32015-01-27 19:46:14 -0800183 @Test
184 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800185 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800186
187 assertEquals(0, ecMap.size());
188 ecMap.put(KEY1, VALUE1);
189 assertEquals(1, ecMap.size());
190 ecMap.put(KEY1, VALUE2);
191 assertEquals(1, ecMap.size());
192 ecMap.put(KEY2, VALUE2);
193 assertEquals(2, ecMap.size());
194 for (int i = 0; i < 10; i++) {
195 ecMap.put("" + i, "" + i);
196 }
197 assertEquals(12, ecMap.size());
198 ecMap.remove(KEY1);
199 assertEquals(11, ecMap.size());
200 ecMap.remove(KEY1);
201 assertEquals(11, ecMap.size());
202 }
203
204 @Test
205 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800206 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800207
208 assertTrue(ecMap.isEmpty());
209 ecMap.put(KEY1, VALUE1);
210 assertFalse(ecMap.isEmpty());
211 ecMap.remove(KEY1);
212 assertTrue(ecMap.isEmpty());
213 }
214
215 @Test
216 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800217 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800218
219 assertFalse(ecMap.containsKey(KEY1));
220 ecMap.put(KEY1, VALUE1);
221 assertTrue(ecMap.containsKey(KEY1));
222 assertFalse(ecMap.containsKey(KEY2));
223 ecMap.remove(KEY1);
224 assertFalse(ecMap.containsKey(KEY1));
225 }
226
227 @Test
228 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800229 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800230
231 assertFalse(ecMap.containsValue(VALUE1));
232 ecMap.put(KEY1, VALUE1);
233 assertTrue(ecMap.containsValue(VALUE1));
234 assertFalse(ecMap.containsValue(VALUE2));
235 ecMap.put(KEY1, VALUE2);
236 assertFalse(ecMap.containsValue(VALUE1));
237 assertTrue(ecMap.containsValue(VALUE2));
238 ecMap.remove(KEY1);
239 assertFalse(ecMap.containsValue(VALUE2));
240 }
241
242 @Test
243 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800244 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800245
246 CountDownLatch latch;
247
248 // Local put
249 assertNull(ecMap.get(KEY1));
250 ecMap.put(KEY1, VALUE1);
251 assertEquals(VALUE1, ecMap.get(KEY1));
252
253 // Remote put
Madan Jampani3d76c942015-06-29 23:37:10 -0700254 List<UpdateEntry<String, String>> message
255 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800256
257 // Create a latch so we know when the put operation has finished
258 latch = new CountDownLatch(1);
259 ecMap.addListener(new TestListener(latch));
260
261 assertNull(ecMap.get(KEY2));
Madan Jampani3d76c942015-06-29 23:37:10 -0700262 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800263 assertTrue("External listener never got notified of internal event",
264 latch.await(100, TimeUnit.MILLISECONDS));
265 assertEquals(VALUE2, ecMap.get(KEY2));
266
267 // Local remove
268 ecMap.remove(KEY2);
269 assertNull(ecMap.get(KEY2));
270
271 // Remote remove
Madan Jampani3d76c942015-06-29 23:37:10 -0700272 message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800273
274 // Create a latch so we know when the remove operation has finished
275 latch = new CountDownLatch(1);
276 ecMap.addListener(new TestListener(latch));
277
Madan Jampani3d76c942015-06-29 23:37:10 -0700278 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800279 assertTrue("External listener never got notified of internal event",
280 latch.await(100, TimeUnit.MILLISECONDS));
281 assertNull(ecMap.get(KEY1));
282 }
283
284 @Test
285 public void testPut() throws Exception {
286 // Set up expectations of external events to be sent to listeners during
287 // the test. These don't use timestamps so we can set them all up at once.
288 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800289 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800290 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700291 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800292 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700293 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800294 replay(listener);
295
296 ecMap.addListener(listener);
297
298 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800299 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700300 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800301
302 // Put first value
303 assertNull(ecMap.get(KEY1));
304 ecMap.put(KEY1, VALUE1);
305 assertEquals(VALUE1, ecMap.get(KEY1));
306
307 verify(clusterCommunicator);
308
309 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800310 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700311 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800312
313 // Update same key to a new value
314 ecMap.put(KEY1, VALUE2);
315 assertEquals(VALUE2, ecMap.get(KEY1));
316
317 verify(clusterCommunicator);
318
319 // Do a put with a older timestamp than the value already there.
320 // The map data should not be changed and no notifications should be sent.
321 reset(clusterCommunicator);
322 replay(clusterCommunicator);
323
324 clockService.turnBackTime();
325 ecMap.put(KEY1, VALUE1);
326 // Value should not have changed.
327 assertEquals(VALUE2, ecMap.get(KEY1));
328
329 verify(clusterCommunicator);
330
331 // Check that our listener received the correct events during the test
332 verify(listener);
333 }
334
335 @Test
336 public void testRemove() throws Exception {
337 // Set up expectations of external events to be sent to listeners during
338 // the test. These don't use timestamps so we can set them all up at once.
339 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800340 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800341 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700342 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700343 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700344 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800345 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700346 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800347 replay(listener);
348
349 ecMap.addListener(listener);
350
351 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800352 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800353 ecMap.put(KEY1, VALUE1);
354 assertEquals(VALUE1, ecMap.get(KEY1));
355
356 // Remove the value and check the correct internal cluster messages
357 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800358 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700359 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800360
361 ecMap.remove(KEY1);
362 assertNull(ecMap.get(KEY1));
363
364 verify(clusterCommunicator);
365
366 // Remove the same value again. Even though the value is no longer in
367 // the map, we expect that the tombstone is updated and another remove
368 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800369 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700370 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800371
372 ecMap.remove(KEY1);
373 assertNull(ecMap.get(KEY1));
374
375 verify(clusterCommunicator);
376
377
378 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800379 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800380
381 ecMap.put(KEY2, VALUE2);
382
383 clockService.turnBackTime();
384
385 // Remove should have no effect, since it has an older timestamp than
386 // the put. Expect no notifications to be sent out
387 reset(clusterCommunicator);
388 replay(clusterCommunicator);
389
390 ecMap.remove(KEY2);
391
392 verify(clusterCommunicator);
393
394 // Check that our listener received the correct events during the test
395 verify(listener);
396 }
397
398 @Test
Madan Jampani4727a112015-07-16 12:12:58 -0700399 public void testCompute() throws Exception {
400 // Set up expectations of external events to be sent to listeners during
401 // the test. These don't use timestamps so we can set them all up at once.
402 EventuallyConsistentMapListener<String, String> listener
403 = getListener();
404 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700405 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700406 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700407 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700408 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700409 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Madan Jampani4727a112015-07-16 12:12:58 -0700410 replay(listener);
411
412 ecMap.addListener(listener);
413
414 // Put in an initial value
415 expectPeerMessage(clusterCommunicator);
416 ecMap.compute(KEY1, (k, v) -> VALUE1);
417 assertEquals(VALUE1, ecMap.get(KEY1));
418
419 // Remove the value and check the correct internal cluster messages
420 // are sent
421 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
422 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
423
424 ecMap.compute(KEY1, (k, v) -> null);
425 assertNull(ecMap.get(KEY1));
426
427 verify(clusterCommunicator);
428
429 // Remove the same value again. Even though the value is no longer in
430 // the map, we expect that the tombstone is updated and another remove
431 // event is sent to the cluster and external listeners.
432 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
433 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
434
435 ecMap.compute(KEY1, (k, v) -> null);
436 assertNull(ecMap.get(KEY1));
437
438 verify(clusterCommunicator);
439
440 // Put in a new value for us to try and remove
441 expectPeerMessage(clusterCommunicator);
442
443 ecMap.compute(KEY2, (k, v) -> VALUE2);
444
445 clockService.turnBackTime();
446
447 // Remove should have no effect, since it has an older timestamp than
448 // the put. Expect no notifications to be sent out
449 reset(clusterCommunicator);
450 replay(clusterCommunicator);
451
452 ecMap.compute(KEY2, (k, v) -> null);
453
454 verify(clusterCommunicator);
455
456 // Check that our listener received the correct events during the test
457 verify(listener);
458 }
459
460 @Test
Jonathan Hart584d2f32015-01-27 19:46:14 -0800461 public void testPutAll() throws Exception {
462 // putAll() with an empty map is a no-op - no messages will be sent
463 reset(clusterCommunicator);
464 replay(clusterCommunicator);
465
466 ecMap.putAll(new HashMap<>());
467
468 verify(clusterCommunicator);
469
470 // Set up the listener with our expected events
471 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800472 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800473 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700474 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800475 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700476 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800477 replay(listener);
478
479 ecMap.addListener(listener);
480
481 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700482 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800483 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800484
485 Map<String, String> putAllValues = new HashMap<>();
486 putAllValues.put(KEY1, VALUE1);
487 putAllValues.put(KEY2, VALUE2);
488
489 // Put the values in the map
490 ecMap.putAll(putAllValues);
491
492 // Check the correct messages and events were sent
493 verify(clusterCommunicator);
494 verify(listener);
495 }
496
497 @Test
498 public void testClear() throws Exception {
499 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800500 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800501 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700502 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800503 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700504 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800505 replay(listener);
506
507 // clear() on an empty map is a no-op - no messages will be sent
508 reset(clusterCommunicator);
509 replay(clusterCommunicator);
510
511 assertTrue(ecMap.isEmpty());
512 ecMap.clear();
513 verify(clusterCommunicator);
514
515 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800516 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800517 ecMap.put(KEY1, VALUE1);
518 ecMap.put(KEY2, VALUE2);
519
520 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700521 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800522
523 ecMap.clear();
524
525 verify(clusterCommunicator);
526 verify(listener);
527 }
528
529 @Test
530 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800531 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800532
533 assertTrue(ecMap.keySet().isEmpty());
534
535 // Generate some keys
536 Set<String> keys = new HashSet<>();
537 for (int i = 1; i <= 10; i++) {
538 keys.add("" + i);
539 }
540
541 // Put each key in the map
542 keys.forEach(k -> ecMap.put(k, "value" + k));
543
544 // Check keySet() returns the correct value
545 assertEquals(keys, ecMap.keySet());
546
547 // Update the value for one of the keys
548 ecMap.put(keys.iterator().next(), "new-value");
549
550 // Check the key set is still the same
551 assertEquals(keys, ecMap.keySet());
552
553 // Remove a key
554 String removeKey = keys.iterator().next();
555 keys.remove(removeKey);
556 ecMap.remove(removeKey);
557
558 // Check the key set is still correct
559 assertEquals(keys, ecMap.keySet());
560 }
561
562 @Test
563 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800564 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800565
566 assertTrue(ecMap.values().isEmpty());
567
568 // Generate some values
569 Map<String, String> expectedValues = new HashMap<>();
570 for (int i = 1; i <= 10; i++) {
571 expectedValues.put("" + i, "value" + i);
572 }
573
574 // Add them into the map
575 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
576
577 // Check the values collection is correct
578 assertEquals(expectedValues.values().size(), ecMap.values().size());
579 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
580
581 // Update the value for one of the keys
582 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
583 expectedValues.put(first.getKey(), "new-value");
584 ecMap.put(first.getKey(), "new-value");
585
586 // Check the values collection is still correct
587 assertEquals(expectedValues.values().size(), ecMap.values().size());
588 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
589
590 // Remove a key
591 String removeKey = expectedValues.keySet().iterator().next();
592 expectedValues.remove(removeKey);
593 ecMap.remove(removeKey);
594
595 // Check the values collection is still correct
596 assertEquals(expectedValues.values().size(), ecMap.values().size());
597 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
598 }
599
600 @Test
601 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800602 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800603
604 assertTrue(ecMap.entrySet().isEmpty());
605
606 // Generate some values
607 Map<String, String> expectedValues = new HashMap<>();
608 for (int i = 1; i <= 10; i++) {
609 expectedValues.put("" + i, "value" + i);
610 }
611
612 // Add them into the map
613 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
614
615 // Check the entry set is correct
616 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
617
618 // Update the value for one of the keys
619 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
620 expectedValues.put(first.getKey(), "new-value");
621 ecMap.put(first.getKey(), "new-value");
622
623 // Check the entry set is still correct
624 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
625
626 // Remove a key
627 String removeKey = expectedValues.keySet().iterator().next();
628 expectedValues.remove(removeKey);
629 ecMap.remove(removeKey);
630
631 // Check the entry set is still correct
632 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
633 }
634
635 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
636 if (expectedMap.entrySet().size() != actual.size()) {
637 return false;
638 }
639
640 for (Map.Entry<String, String> e : actual) {
641 if (!expectedMap.containsKey(e.getKey())) {
642 return false;
643 }
644 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
645 return false;
646 }
647 }
648 return true;
649 }
650
651 @Test
652 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800653 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800654 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
655
656 replay(clusterCommunicator);
657
658 ecMap.destroy();
659
660 verify(clusterCommunicator);
661
662 try {
663 ecMap.get(KEY1);
664 fail("get after destroy should throw exception");
665 } catch (IllegalStateException e) {
666 assertTrue(true);
667 }
668
669 try {
670 ecMap.put(KEY1, VALUE1);
671 fail("put after destroy should throw exception");
672 } catch (IllegalStateException e) {
673 assertTrue(true);
674 }
675 }
676
Madan Jampani3d76c942015-06-29 23:37:10 -0700677 private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
678 return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800679 }
680
Madan Jampani3d76c942015-06-29 23:37:10 -0700681 private List<UpdateEntry<String, String>> generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700682 String key1, String value1, String key2, String value2) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700683 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800684
685 Timestamp timestamp1 = clockService.peek(1);
686 Timestamp timestamp2 = clockService.peek(2);
687
Madan Jampani3d76c942015-06-29 23:37:10 -0700688 list.add(generatePutMessage(key1, value1, timestamp1));
689 list.add(generatePutMessage(key2, value2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800690
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700691 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800692 }
693
Madan Jampani3d76c942015-06-29 23:37:10 -0700694 private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
695 return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800696 }
697
Madan Jampani3d76c942015-06-29 23:37:10 -0700698 private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
699 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800700
701 Timestamp timestamp1 = clockService.peek(1);
702 Timestamp timestamp2 = clockService.peek(2);
703
Madan Jampani3d76c942015-06-29 23:37:10 -0700704 list.add(generateRemoveMessage(key1, timestamp1));
705 list.add(generateRemoveMessage(key2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800706
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700707 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800708 }
709
710 /**
711 * Sets up a mock ClusterCommunicationService to expect a specific cluster
712 * message to be broadcast to the cluster.
713 *
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700714 * @param message message we expect to be sent
Jonathan Hart584d2f32015-01-27 19:46:14 -0800715 * @param clusterCommunicator a mock ClusterCommunicationService to set up
716 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800717 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700718 private static <T> void expectSpecificBroadcastMessage(
719 T message,
720 MessageSubject subject,
721 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800722 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700723 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
724 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800725 replay(clusterCommunicator);
726 }
727
728 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800729 * Sets up a mock ClusterCommunicationService to expect a specific cluster
730 * message to be multicast to the cluster.
731 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700732 * @param message message we expect to be sent
733 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800734 * @param clusterCommunicator a mock ClusterCommunicationService to set up
735 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800736 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700737 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800738 ClusterCommunicationService clusterCommunicator) {
739 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700740 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
741 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800742 replay(clusterCommunicator);
743 }
744
745
746 /**
747 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800748 * that is sent to it. This is useful for unit tests where we aren't
749 * interested in testing the messaging component.
750 *
751 * @param clusterCommunicator a mock ClusterCommunicationService to set up
752 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800753 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700754 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800755 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800756// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
757// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700758 expect(clusterCommunicator.<T>unicast(
759 anyObject(),
760 anyObject(MessageSubject.class),
761 anyObject(Function.class),
762 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700763 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800764 .anyTimes();
765 replay(clusterCommunicator);
766 }
767
768 /**
769 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
770 * that is sent to it. This is useful for unit tests where we aren't
771 * interested in testing the messaging component.
772 *
773 * @param clusterCommunicator a mock ClusterCommunicationService to set up
774 */
775 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800776 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700777 clusterCommunicator.<AbstractEvent>multicast(
778 anyObject(AbstractEvent.class),
779 anyObject(MessageSubject.class),
780 anyObject(Function.class),
781 anyObject(Set.class));
782 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800783 replay(clusterCommunicator);
784 }
785
786 /**
787 * ClusterCommunicationService implementation that the map's addSubscriber
788 * call will delegate to. This means we can get a reference to the
789 * internal cluster message handler used by the map, so that we can simulate
790 * events coming in from other instances.
791 */
792 private final class TestClusterCommunicationService
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700793 extends ClusterCommunicationServiceAdapter {
Madan Jampani27b69c62015-05-15 15:49:02 -0700794
795 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700796 public <M> void addSubscriber(MessageSubject subject,
797 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700798 Executor executor) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700799 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
800 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
801 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
802 antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
803 } else {
804 throw new RuntimeException("Unexpected message subject " + subject.toString());
805 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700806 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800807 }
808
809 /**
810 * ClockService implementation that gives out timestamps based on a
811 * sequential counter. This clock service enables more control over the
812 * timestamps that are given out, including being able to "turn back time"
813 * to give out timestamps from the past.
814 *
815 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800816 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800817 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700818 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800819
820 private static final long INITIAL_VALUE = 1;
821 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
822
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800823 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800824 return new TestTimestamp(counter.getAndIncrement());
825 }
826
827 /**
828 * Returns what the next timestamp will be without consuming the
829 * timestamp. This allows test code to set expectations correctly while
830 * still allowing the CUT to get the same timestamp.
831 *
832 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800833 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800834 */
835 public Timestamp peekAtNextTimestamp() {
836 return peek(1);
837 }
838
839 /**
840 * Returns the ith timestamp to be given out in the future without
841 * consuming the timestamp. For example, i=1 returns the next timestamp,
842 * i=2 returns the timestamp after that, and so on.
843 *
844 * @param i number of the timestamp to peek at
845 * @return the ith timestamp that will be given out
846 */
847 public Timestamp peek(int i) {
848 checkArgument(i > 0, "i must be a positive integer");
849
850 return new TestTimestamp(counter.get() + i - 1);
851 }
852
853 /**
854 * Turns the clock back two ticks, so the next call to getTimestamp will
855 * return an older timestamp than the previous call to getTimestamp.
856 */
857 public void turnBackTime() {
858 // Not atomic, but should be OK for these tests.
859 counter.decrementAndGet();
860 counter.decrementAndGet();
861 }
862
863 }
864
865 /**
866 * Timestamp implementation where the value of the timestamp can be
867 * specified explicitly at creation time.
868 */
869 private class TestTimestamp implements Timestamp {
870
871 private final long timestamp;
872
873 /**
874 * Creates a new timestamp that has the specified value.
875 *
876 * @param timestamp value of the timestamp
877 */
878 public TestTimestamp(long timestamp) {
879 this.timestamp = timestamp;
880 }
881
882 @Override
883 public int compareTo(Timestamp o) {
884 checkArgument(o instanceof TestTimestamp);
885 TestTimestamp otherTimestamp = (TestTimestamp) o;
886 return ComparisonChain.start()
887 .compare(this.timestamp, otherTimestamp.timestamp)
888 .result();
889 }
890 }
891
892 /**
893 * EventuallyConsistentMapListener implementation which triggers a latch
894 * when it receives an event.
895 */
896 private class TestListener implements EventuallyConsistentMapListener<String, String> {
897 private CountDownLatch latch;
898
899 /**
900 * Creates a new listener that will trigger the specified latch when it
901 * receives and event.
902 *
903 * @param latch the latch to trigger on events
904 */
905 public TestListener(CountDownLatch latch) {
906 this.latch = latch;
907 }
908
909 @Override
910 public void event(EventuallyConsistentMapEvent<String, String> event) {
911 latch.countDown();
912 }
913 }
914}