blob: 0012d68244abe6b2e46734da4a8d211aed884e29 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jonathan Hart584d2f32015-01-27 19:46:14 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static junit.framework.TestCase.assertFalse;
20import static org.easymock.EasyMock.anyObject;
21import static org.easymock.EasyMock.createMock;
22import static org.easymock.EasyMock.eq;
23import static org.easymock.EasyMock.expect;
24import static org.easymock.EasyMock.expectLastCall;
25import static org.easymock.EasyMock.replay;
26import static org.easymock.EasyMock.reset;
27import static org.easymock.EasyMock.verify;
28import static org.junit.Assert.assertEquals;
29import static org.junit.Assert.assertNull;
30import static org.junit.Assert.assertTrue;
31import static org.junit.Assert.fail;
Jonathan Hart584d2f32015-01-27 19:46:14 -080032
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070033import java.util.ArrayList;
34import java.util.Collection;
35import java.util.HashMap;
36import java.util.HashSet;
37import java.util.List;
38import java.util.Map;
39import java.util.Objects;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070040import java.util.Set;
41import java.util.concurrent.CompletableFuture;
42import java.util.concurrent.CountDownLatch;
43import java.util.concurrent.Executor;
44import java.util.concurrent.TimeUnit;
45import java.util.concurrent.atomic.AtomicLong;
46import java.util.function.Consumer;
47import java.util.function.Function;
Madan Jampani3e033bd2015-04-08 13:03:49 -070048
Jonathan Hart584d2f32015-01-27 19:46:14 -080049import org.junit.After;
50import org.junit.Before;
51import org.junit.Test;
52import org.onlab.packet.IpAddress;
53import org.onlab.util.KryoNamespace;
54import org.onosproject.cluster.ClusterService;
55import org.onosproject.cluster.ControllerNode;
56import org.onosproject.cluster.DefaultControllerNode;
57import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070058import org.onosproject.event.AbstractEvent;
Madan Jampani2b8e1892015-12-04 11:16:47 -080059import org.onosproject.persistence.PersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080060import org.onosproject.store.Timestamp;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070062import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
Jonathan Hart584d2f32015-01-27 19:46:14 -080063import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani2b8e1892015-12-04 11:16:47 -080064import org.onosproject.store.persistence.TestPersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080065import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapEvent;
68import org.onosproject.store.service.EventuallyConsistentMapListener;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070069import com.google.common.collect.ComparisonChain;
70import com.google.common.collect.ImmutableList;
71import com.google.common.collect.ImmutableSet;
72import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080073
Jonathan Hart584d2f32015-01-27 19:46:14 -080074/**
75 * Unit tests for EventuallyConsistentMapImpl.
76 */
77public class EventuallyConsistentMapImplTest {
78
79 private EventuallyConsistentMap<String, String> ecMap;
80
Madan Jampani2b8e1892015-12-04 11:16:47 -080081 private PersistenceService persistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080082 private ClusterService clusterService;
83 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080084 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080085
86 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080087 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080088 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080089 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
90 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
91
92 private static final String KEY1 = "one";
93 private static final String KEY2 = "two";
94 private static final String VALUE1 = "oneValue";
95 private static final String VALUE2 = "twoValue";
96
97 private final ControllerNode self =
98 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
99
Madan Jampani3d76c942015-06-29 23:37:10 -0700100 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700101 private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800102
Jonathan Hart584d2f32015-01-27 19:46:14 -0800103 @Before
104 public void setUp() throws Exception {
105 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800106 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
107 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800108 replay(clusterService);
109
110 clusterCommunicator = createMock(ClusterCommunicationService.class);
111
Madan Jampani2b8e1892015-12-04 11:16:47 -0800112 persistenceService = new TestPersistenceService();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800113 // Add expectation for adding cluster message subscribers which
114 // delegate to our ClusterCommunicationService implementation. This
115 // allows us to get a reference to the map's internal cluster message
116 // handlers so we can induce events coming in from a peer.
Madan Jampani29f52a32016-04-18 15:20:52 -0700117 clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
Madan Jampani3d76c942015-06-29 23:37:10 -0700118 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
Madan Jampani29f52a32016-04-18 15:20:52 -0700119 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
120 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
121 anyObject(Function.class),
122 anyObject(Function.class),
123 anyObject(Function.class),
124 anyObject(Executor.class));
125 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800126
127 replay(clusterCommunicator);
128
129 clockService = new SequentialClockService<>();
130
131 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
132 .register(KryoNamespaces.API)
133 .register(TestTimestamp.class);
134
Madan Jampani175e8fd2015-05-20 14:10:45 -0700135 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700136 clusterService, clusterCommunicator, persistenceService)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700137 .withName(MAP_NAME)
138 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700139 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700140 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700141 .withPersistence()
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700142 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800143
144 // Reset ready for tests to add their own expectations
145 reset(clusterCommunicator);
146 }
147
148 @After
149 public void tearDown() {
150 reset(clusterCommunicator);
151 ecMap.destroy();
152 }
153
Ray Milkey8dc82082015-02-20 16:22:38 -0800154 @SuppressWarnings("unchecked")
155 private EventuallyConsistentMapListener<String, String> getListener() {
156 return createMock(EventuallyConsistentMapListener.class);
157 }
158
Jonathan Hart584d2f32015-01-27 19:46:14 -0800159 @Test
160 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800161 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800162
163 assertEquals(0, ecMap.size());
164 ecMap.put(KEY1, VALUE1);
165 assertEquals(1, ecMap.size());
166 ecMap.put(KEY1, VALUE2);
167 assertEquals(1, ecMap.size());
168 ecMap.put(KEY2, VALUE2);
169 assertEquals(2, ecMap.size());
170 for (int i = 0; i < 10; i++) {
171 ecMap.put("" + i, "" + i);
172 }
173 assertEquals(12, ecMap.size());
174 ecMap.remove(KEY1);
175 assertEquals(11, ecMap.size());
176 ecMap.remove(KEY1);
177 assertEquals(11, ecMap.size());
178 }
179
180 @Test
181 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800182 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800183
184 assertTrue(ecMap.isEmpty());
185 ecMap.put(KEY1, VALUE1);
186 assertFalse(ecMap.isEmpty());
187 ecMap.remove(KEY1);
188 assertTrue(ecMap.isEmpty());
189 }
190
191 @Test
192 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800193 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800194
195 assertFalse(ecMap.containsKey(KEY1));
196 ecMap.put(KEY1, VALUE1);
197 assertTrue(ecMap.containsKey(KEY1));
198 assertFalse(ecMap.containsKey(KEY2));
199 ecMap.remove(KEY1);
200 assertFalse(ecMap.containsKey(KEY1));
201 }
202
203 @Test
204 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800205 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800206
207 assertFalse(ecMap.containsValue(VALUE1));
208 ecMap.put(KEY1, VALUE1);
209 assertTrue(ecMap.containsValue(VALUE1));
210 assertFalse(ecMap.containsValue(VALUE2));
211 ecMap.put(KEY1, VALUE2);
212 assertFalse(ecMap.containsValue(VALUE1));
213 assertTrue(ecMap.containsValue(VALUE2));
214 ecMap.remove(KEY1);
215 assertFalse(ecMap.containsValue(VALUE2));
216 }
217
218 @Test
219 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800220 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800221
222 CountDownLatch latch;
223
224 // Local put
225 assertNull(ecMap.get(KEY1));
226 ecMap.put(KEY1, VALUE1);
227 assertEquals(VALUE1, ecMap.get(KEY1));
228
229 // Remote put
Madan Jampani3d76c942015-06-29 23:37:10 -0700230 List<UpdateEntry<String, String>> message
231 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800232
233 // Create a latch so we know when the put operation has finished
234 latch = new CountDownLatch(1);
235 ecMap.addListener(new TestListener(latch));
236
237 assertNull(ecMap.get(KEY2));
Madan Jampani3d76c942015-06-29 23:37:10 -0700238 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800239 assertTrue("External listener never got notified of internal event",
240 latch.await(100, TimeUnit.MILLISECONDS));
241 assertEquals(VALUE2, ecMap.get(KEY2));
242
243 // Local remove
244 ecMap.remove(KEY2);
245 assertNull(ecMap.get(KEY2));
246
247 // Remote remove
Madan Jampani3d76c942015-06-29 23:37:10 -0700248 message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800249
250 // Create a latch so we know when the remove operation has finished
251 latch = new CountDownLatch(1);
252 ecMap.addListener(new TestListener(latch));
253
Madan Jampani3d76c942015-06-29 23:37:10 -0700254 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800255 assertTrue("External listener never got notified of internal event",
256 latch.await(100, TimeUnit.MILLISECONDS));
257 assertNull(ecMap.get(KEY1));
258 }
259
260 @Test
261 public void testPut() throws Exception {
262 // Set up expectations of external events to be sent to listeners during
263 // the test. These don't use timestamps so we can set them all up at once.
264 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800265 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800266 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700267 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800268 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700269 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800270 replay(listener);
271
272 ecMap.addListener(listener);
273
274 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800275 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700276 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800277
278 // Put first value
279 assertNull(ecMap.get(KEY1));
280 ecMap.put(KEY1, VALUE1);
281 assertEquals(VALUE1, ecMap.get(KEY1));
282
283 verify(clusterCommunicator);
284
285 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800286 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700287 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800288
289 // Update same key to a new value
290 ecMap.put(KEY1, VALUE2);
291 assertEquals(VALUE2, ecMap.get(KEY1));
292
293 verify(clusterCommunicator);
294
295 // Do a put with a older timestamp than the value already there.
296 // The map data should not be changed and no notifications should be sent.
297 reset(clusterCommunicator);
298 replay(clusterCommunicator);
299
300 clockService.turnBackTime();
301 ecMap.put(KEY1, VALUE1);
302 // Value should not have changed.
303 assertEquals(VALUE2, ecMap.get(KEY1));
304
305 verify(clusterCommunicator);
306
307 // Check that our listener received the correct events during the test
308 verify(listener);
309 }
310
311 @Test
312 public void testRemove() throws Exception {
313 // Set up expectations of external events to be sent to listeners during
314 // the test. These don't use timestamps so we can set them all up at once.
315 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800316 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800317 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700318 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700319 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700320 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800321 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700322 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800323 replay(listener);
324
325 ecMap.addListener(listener);
326
327 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800328 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800329 ecMap.put(KEY1, VALUE1);
330 assertEquals(VALUE1, ecMap.get(KEY1));
331
332 // Remove the value and check the correct internal cluster messages
333 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800334 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700335 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800336
337 ecMap.remove(KEY1);
338 assertNull(ecMap.get(KEY1));
339
340 verify(clusterCommunicator);
341
342 // Remove the same value again. Even though the value is no longer in
343 // the map, we expect that the tombstone is updated and another remove
344 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800345 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700346 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800347
348 ecMap.remove(KEY1);
349 assertNull(ecMap.get(KEY1));
350
351 verify(clusterCommunicator);
352
353
354 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800355 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800356
357 ecMap.put(KEY2, VALUE2);
358
359 clockService.turnBackTime();
360
361 // Remove should have no effect, since it has an older timestamp than
362 // the put. Expect no notifications to be sent out
363 reset(clusterCommunicator);
364 replay(clusterCommunicator);
365
366 ecMap.remove(KEY2);
367
368 verify(clusterCommunicator);
369
370 // Check that our listener received the correct events during the test
371 verify(listener);
372 }
373
374 @Test
Madan Jampani4727a112015-07-16 12:12:58 -0700375 public void testCompute() throws Exception {
376 // Set up expectations of external events to be sent to listeners during
377 // the test. These don't use timestamps so we can set them all up at once.
378 EventuallyConsistentMapListener<String, String> listener
379 = getListener();
380 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700381 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700382 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700383 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700384 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700385 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Madan Jampani4727a112015-07-16 12:12:58 -0700386 replay(listener);
387
388 ecMap.addListener(listener);
389
390 // Put in an initial value
391 expectPeerMessage(clusterCommunicator);
392 ecMap.compute(KEY1, (k, v) -> VALUE1);
393 assertEquals(VALUE1, ecMap.get(KEY1));
394
395 // Remove the value and check the correct internal cluster messages
396 // are sent
397 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
398 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
399
400 ecMap.compute(KEY1, (k, v) -> null);
401 assertNull(ecMap.get(KEY1));
402
403 verify(clusterCommunicator);
404
405 // Remove the same value again. Even though the value is no longer in
406 // the map, we expect that the tombstone is updated and another remove
407 // event is sent to the cluster and external listeners.
408 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
409 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
410
411 ecMap.compute(KEY1, (k, v) -> null);
412 assertNull(ecMap.get(KEY1));
413
414 verify(clusterCommunicator);
415
416 // Put in a new value for us to try and remove
417 expectPeerMessage(clusterCommunicator);
418
419 ecMap.compute(KEY2, (k, v) -> VALUE2);
420
421 clockService.turnBackTime();
422
423 // Remove should have no effect, since it has an older timestamp than
424 // the put. Expect no notifications to be sent out
425 reset(clusterCommunicator);
426 replay(clusterCommunicator);
427
428 ecMap.compute(KEY2, (k, v) -> null);
429
430 verify(clusterCommunicator);
431
432 // Check that our listener received the correct events during the test
433 verify(listener);
434 }
435
436 @Test
Jonathan Hart584d2f32015-01-27 19:46:14 -0800437 public void testPutAll() throws Exception {
438 // putAll() with an empty map is a no-op - no messages will be sent
439 reset(clusterCommunicator);
440 replay(clusterCommunicator);
441
442 ecMap.putAll(new HashMap<>());
443
444 verify(clusterCommunicator);
445
446 // Set up the listener with our expected events
447 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800448 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800449 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700450 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800451 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700452 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800453 replay(listener);
454
455 ecMap.addListener(listener);
456
457 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700458 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800459 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800460
461 Map<String, String> putAllValues = new HashMap<>();
462 putAllValues.put(KEY1, VALUE1);
463 putAllValues.put(KEY2, VALUE2);
464
465 // Put the values in the map
466 ecMap.putAll(putAllValues);
467
468 // Check the correct messages and events were sent
469 verify(clusterCommunicator);
470 verify(listener);
471 }
472
473 @Test
474 public void testClear() throws Exception {
475 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800476 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800477 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700478 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800479 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700480 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800481 replay(listener);
482
483 // clear() on an empty map is a no-op - no messages will be sent
484 reset(clusterCommunicator);
485 replay(clusterCommunicator);
486
487 assertTrue(ecMap.isEmpty());
488 ecMap.clear();
489 verify(clusterCommunicator);
490
491 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800492 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800493 ecMap.put(KEY1, VALUE1);
494 ecMap.put(KEY2, VALUE2);
495
496 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700497 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800498
499 ecMap.clear();
500
501 verify(clusterCommunicator);
502 verify(listener);
503 }
504
505 @Test
506 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800507 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800508
509 assertTrue(ecMap.keySet().isEmpty());
510
511 // Generate some keys
512 Set<String> keys = new HashSet<>();
513 for (int i = 1; i <= 10; i++) {
514 keys.add("" + i);
515 }
516
517 // Put each key in the map
518 keys.forEach(k -> ecMap.put(k, "value" + k));
519
520 // Check keySet() returns the correct value
521 assertEquals(keys, ecMap.keySet());
522
523 // Update the value for one of the keys
524 ecMap.put(keys.iterator().next(), "new-value");
525
526 // Check the key set is still the same
527 assertEquals(keys, ecMap.keySet());
528
529 // Remove a key
530 String removeKey = keys.iterator().next();
531 keys.remove(removeKey);
532 ecMap.remove(removeKey);
533
534 // Check the key set is still correct
535 assertEquals(keys, ecMap.keySet());
536 }
537
538 @Test
539 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800540 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800541
542 assertTrue(ecMap.values().isEmpty());
543
544 // Generate some values
545 Map<String, String> expectedValues = new HashMap<>();
546 for (int i = 1; i <= 10; i++) {
547 expectedValues.put("" + i, "value" + i);
548 }
549
550 // Add them into the map
551 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
552
553 // Check the values collection is correct
554 assertEquals(expectedValues.values().size(), ecMap.values().size());
555 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
556
557 // Update the value for one of the keys
558 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
559 expectedValues.put(first.getKey(), "new-value");
560 ecMap.put(first.getKey(), "new-value");
561
562 // Check the values collection is still correct
563 assertEquals(expectedValues.values().size(), ecMap.values().size());
564 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
565
566 // Remove a key
567 String removeKey = expectedValues.keySet().iterator().next();
568 expectedValues.remove(removeKey);
569 ecMap.remove(removeKey);
570
571 // Check the values collection is still correct
572 assertEquals(expectedValues.values().size(), ecMap.values().size());
573 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
574 }
575
576 @Test
577 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800578 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800579
580 assertTrue(ecMap.entrySet().isEmpty());
581
582 // Generate some values
583 Map<String, String> expectedValues = new HashMap<>();
584 for (int i = 1; i <= 10; i++) {
585 expectedValues.put("" + i, "value" + i);
586 }
587
588 // Add them into the map
589 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
590
591 // Check the entry set is correct
592 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
593
594 // Update the value for one of the keys
595 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
596 expectedValues.put(first.getKey(), "new-value");
597 ecMap.put(first.getKey(), "new-value");
598
599 // Check the entry set is still correct
600 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
601
602 // Remove a key
603 String removeKey = expectedValues.keySet().iterator().next();
604 expectedValues.remove(removeKey);
605 ecMap.remove(removeKey);
606
607 // Check the entry set is still correct
608 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
609 }
610
611 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
612 if (expectedMap.entrySet().size() != actual.size()) {
613 return false;
614 }
615
616 for (Map.Entry<String, String> e : actual) {
617 if (!expectedMap.containsKey(e.getKey())) {
618 return false;
619 }
620 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
621 return false;
622 }
623 }
624 return true;
625 }
626
627 @Test
628 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800629 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800630 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
631
632 replay(clusterCommunicator);
633
634 ecMap.destroy();
635
636 verify(clusterCommunicator);
637
638 try {
639 ecMap.get(KEY1);
640 fail("get after destroy should throw exception");
641 } catch (IllegalStateException e) {
642 assertTrue(true);
643 }
644
645 try {
646 ecMap.put(KEY1, VALUE1);
647 fail("put after destroy should throw exception");
648 } catch (IllegalStateException e) {
649 assertTrue(true);
650 }
651 }
652
Madan Jampani3d76c942015-06-29 23:37:10 -0700653 private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
654 return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800655 }
656
Madan Jampani3d76c942015-06-29 23:37:10 -0700657 private List<UpdateEntry<String, String>> generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700658 String key1, String value1, String key2, String value2) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700659 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800660
661 Timestamp timestamp1 = clockService.peek(1);
662 Timestamp timestamp2 = clockService.peek(2);
663
Madan Jampani3d76c942015-06-29 23:37:10 -0700664 list.add(generatePutMessage(key1, value1, timestamp1));
665 list.add(generatePutMessage(key2, value2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800666
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700667 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800668 }
669
Madan Jampani3d76c942015-06-29 23:37:10 -0700670 private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
671 return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800672 }
673
Madan Jampani3d76c942015-06-29 23:37:10 -0700674 private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
675 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800676
677 Timestamp timestamp1 = clockService.peek(1);
678 Timestamp timestamp2 = clockService.peek(2);
679
Madan Jampani3d76c942015-06-29 23:37:10 -0700680 list.add(generateRemoveMessage(key1, timestamp1));
681 list.add(generateRemoveMessage(key2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800682
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700683 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800684 }
685
686 /**
687 * Sets up a mock ClusterCommunicationService to expect a specific cluster
688 * message to be broadcast to the cluster.
689 *
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700690 * @param message message we expect to be sent
Jonathan Hart584d2f32015-01-27 19:46:14 -0800691 * @param clusterCommunicator a mock ClusterCommunicationService to set up
692 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800693 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700694 private static <T> void expectSpecificBroadcastMessage(
695 T message,
696 MessageSubject subject,
697 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800698 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700699 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
700 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800701 replay(clusterCommunicator);
702 }
703
704 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800705 * Sets up a mock ClusterCommunicationService to expect a specific cluster
706 * message to be multicast to the cluster.
707 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700708 * @param message message we expect to be sent
709 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800710 * @param clusterCommunicator a mock ClusterCommunicationService to set up
711 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800712 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700713 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800714 ClusterCommunicationService clusterCommunicator) {
715 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700716 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
717 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800718 replay(clusterCommunicator);
719 }
720
721
722 /**
723 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800724 * that is sent to it. This is useful for unit tests where we aren't
725 * interested in testing the messaging component.
726 *
727 * @param clusterCommunicator a mock ClusterCommunicationService to set up
728 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800729 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700730 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800731 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800732// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
733// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700734 expect(clusterCommunicator.<T>unicast(
735 anyObject(),
736 anyObject(MessageSubject.class),
737 anyObject(Function.class),
738 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700739 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800740 .anyTimes();
741 replay(clusterCommunicator);
742 }
743
744 /**
745 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
746 * that is sent to it. This is useful for unit tests where we aren't
747 * interested in testing the messaging component.
748 *
749 * @param clusterCommunicator a mock ClusterCommunicationService to set up
750 */
751 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800752 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700753 clusterCommunicator.<AbstractEvent>multicast(
754 anyObject(AbstractEvent.class),
755 anyObject(MessageSubject.class),
756 anyObject(Function.class),
757 anyObject(Set.class));
758 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800759 replay(clusterCommunicator);
760 }
761
762 /**
763 * ClusterCommunicationService implementation that the map's addSubscriber
764 * call will delegate to. This means we can get a reference to the
765 * internal cluster message handler used by the map, so that we can simulate
766 * events coming in from other instances.
767 */
768 private final class TestClusterCommunicationService
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700769 extends ClusterCommunicationServiceAdapter {
Madan Jampani27b69c62015-05-15 15:49:02 -0700770
771 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700772 public <M> void addSubscriber(MessageSubject subject,
773 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700774 Executor executor) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700775 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
776 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700777 } else {
778 throw new RuntimeException("Unexpected message subject " + subject.toString());
779 }
780 }
781
782 @Override
783 public <M, R> void addSubscriber(MessageSubject subject,
784 Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
785 if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
786 antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
Madan Jampani3d76c942015-06-29 23:37:10 -0700787 } else {
788 throw new RuntimeException("Unexpected message subject " + subject.toString());
789 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700790 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800791 }
792
793 /**
794 * ClockService implementation that gives out timestamps based on a
795 * sequential counter. This clock service enables more control over the
796 * timestamps that are given out, including being able to "turn back time"
797 * to give out timestamps from the past.
798 *
799 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800800 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800801 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700802 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800803
804 private static final long INITIAL_VALUE = 1;
805 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
806
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800807 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800808 return new TestTimestamp(counter.getAndIncrement());
809 }
810
811 /**
812 * Returns what the next timestamp will be without consuming the
813 * timestamp. This allows test code to set expectations correctly while
814 * still allowing the CUT to get the same timestamp.
815 *
816 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800817 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800818 */
819 public Timestamp peekAtNextTimestamp() {
820 return peek(1);
821 }
822
823 /**
824 * Returns the ith timestamp to be given out in the future without
825 * consuming the timestamp. For example, i=1 returns the next timestamp,
826 * i=2 returns the timestamp after that, and so on.
827 *
828 * @param i number of the timestamp to peek at
829 * @return the ith timestamp that will be given out
830 */
831 public Timestamp peek(int i) {
832 checkArgument(i > 0, "i must be a positive integer");
833
834 return new TestTimestamp(counter.get() + i - 1);
835 }
836
837 /**
838 * Turns the clock back two ticks, so the next call to getTimestamp will
839 * return an older timestamp than the previous call to getTimestamp.
840 */
841 public void turnBackTime() {
842 // Not atomic, but should be OK for these tests.
843 counter.decrementAndGet();
844 counter.decrementAndGet();
845 }
846
847 }
848
849 /**
850 * Timestamp implementation where the value of the timestamp can be
851 * specified explicitly at creation time.
852 */
853 private class TestTimestamp implements Timestamp {
854
855 private final long timestamp;
856
857 /**
858 * Creates a new timestamp that has the specified value.
859 *
860 * @param timestamp value of the timestamp
861 */
862 public TestTimestamp(long timestamp) {
863 this.timestamp = timestamp;
864 }
865
866 @Override
867 public int compareTo(Timestamp o) {
868 checkArgument(o instanceof TestTimestamp);
869 TestTimestamp otherTimestamp = (TestTimestamp) o;
870 return ComparisonChain.start()
871 .compare(this.timestamp, otherTimestamp.timestamp)
872 .result();
873 }
874 }
875
876 /**
877 * EventuallyConsistentMapListener implementation which triggers a latch
878 * when it receives an event.
879 */
880 private class TestListener implements EventuallyConsistentMapListener<String, String> {
881 private CountDownLatch latch;
882
883 /**
884 * Creates a new listener that will trigger the specified latch when it
885 * receives and event.
886 *
887 * @param latch the latch to trigger on events
888 */
889 public TestListener(CountDownLatch latch) {
890 this.latch = latch;
891 }
892
893 @Override
894 public void event(EventuallyConsistentMapEvent<String, String> event) {
895 latch.countDown();
896 }
897 }
898}