blob: 8d41fd2fed02739863f9670ba144cce6efd7ffc8 [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jonathan Hart584d2f32015-01-27 19:46:14 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static junit.framework.TestCase.assertFalse;
20import static org.easymock.EasyMock.anyObject;
21import static org.easymock.EasyMock.createMock;
22import static org.easymock.EasyMock.eq;
23import static org.easymock.EasyMock.expect;
24import static org.easymock.EasyMock.expectLastCall;
25import static org.easymock.EasyMock.replay;
26import static org.easymock.EasyMock.reset;
27import static org.easymock.EasyMock.verify;
28import static org.junit.Assert.assertEquals;
29import static org.junit.Assert.assertNull;
30import static org.junit.Assert.assertTrue;
31import static org.junit.Assert.fail;
Jonathan Hart584d2f32015-01-27 19:46:14 -080032
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070033import java.util.ArrayList;
34import java.util.Collection;
35import java.util.HashMap;
36import java.util.HashSet;
37import java.util.List;
38import java.util.Map;
39import java.util.Objects;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070040import java.util.Set;
41import java.util.concurrent.CompletableFuture;
42import java.util.concurrent.CountDownLatch;
43import java.util.concurrent.Executor;
44import java.util.concurrent.TimeUnit;
45import java.util.concurrent.atomic.AtomicLong;
46import java.util.function.Consumer;
47import java.util.function.Function;
Madan Jampani3e033bd2015-04-08 13:03:49 -070048
Jonathan Hart584d2f32015-01-27 19:46:14 -080049import org.junit.After;
50import org.junit.Before;
51import org.junit.Test;
52import org.onlab.packet.IpAddress;
53import org.onlab.util.KryoNamespace;
54import org.onosproject.cluster.ClusterService;
55import org.onosproject.cluster.ControllerNode;
56import org.onosproject.cluster.DefaultControllerNode;
57import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070058import org.onosproject.event.AbstractEvent;
Madan Jampani2b8e1892015-12-04 11:16:47 -080059import org.onosproject.persistence.PersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080060import org.onosproject.store.Timestamp;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070062import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
Jonathan Hart584d2f32015-01-27 19:46:14 -080063import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani2b8e1892015-12-04 11:16:47 -080064import org.onosproject.store.persistence.TestPersistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080065import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070066import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapEvent;
68import org.onosproject.store.service.EventuallyConsistentMapListener;
Ray Milkeyb3c5ce22015-08-10 09:07:36 -070069import com.google.common.collect.ComparisonChain;
70import com.google.common.collect.ImmutableList;
71import com.google.common.collect.ImmutableSet;
72import com.google.common.util.concurrent.MoreExecutors;
Jonathan Hart584d2f32015-01-27 19:46:14 -080073
Jonathan Hart584d2f32015-01-27 19:46:14 -080074/**
75 * Unit tests for EventuallyConsistentMapImpl.
76 */
77public class EventuallyConsistentMapImplTest {
78
79 private EventuallyConsistentMap<String, String> ecMap;
80
Madan Jampani2b8e1892015-12-04 11:16:47 -080081 private PersistenceService persistenceService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080082 private ClusterService clusterService;
83 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080084 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080085
86 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080087 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080088 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080089 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
90 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
Jon Halld198b882016-05-18 16:44:40 -070091 private static final MessageSubject UPDATE_REQUEST_SUBJECT
92 = new MessageSubject("ecm-" + MAP_NAME + "-update-request");
Jonathan Hart584d2f32015-01-27 19:46:14 -080093
94 private static final String KEY1 = "one";
95 private static final String KEY2 = "two";
96 private static final String VALUE1 = "oneValue";
97 private static final String VALUE2 = "twoValue";
98
99 private final ControllerNode self =
100 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
101
Madan Jampani3d76c942015-06-29 23:37:10 -0700102 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
Jon Halld198b882016-05-18 16:44:40 -0700103 private Consumer<Collection<UpdateRequest<String>>> requestHandler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700104 private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800105
Jonathan Hart584d2f32015-01-27 19:46:14 -0800106 @Before
107 public void setUp() throws Exception {
108 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800109 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
110 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800111 replay(clusterService);
112
113 clusterCommunicator = createMock(ClusterCommunicationService.class);
114
Madan Jampani2b8e1892015-12-04 11:16:47 -0800115 persistenceService = new TestPersistenceService();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800116 // Add expectation for adding cluster message subscribers which
117 // delegate to our ClusterCommunicationService implementation. This
118 // allows us to get a reference to the map's internal cluster message
119 // handlers so we can induce events coming in from a peer.
Madan Jampani29f52a32016-04-18 15:20:52 -0700120 clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
Madan Jampani3d76c942015-06-29 23:37:10 -0700121 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
Madan Jampani29f52a32016-04-18 15:20:52 -0700122 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
123 clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
124 anyObject(Function.class),
125 anyObject(Function.class),
126 anyObject(Function.class),
127 anyObject(Executor.class));
128 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Jon Halld198b882016-05-18 16:44:40 -0700129 clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
130 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
131 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800132
133 replay(clusterCommunicator);
134
135 clockService = new SequentialClockService<>();
136
137 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
138 .register(KryoNamespaces.API)
139 .register(TestTimestamp.class);
140
Madan Jampani175e8fd2015-05-20 14:10:45 -0700141 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700142 clusterService, clusterCommunicator, persistenceService)
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700143 .withName(MAP_NAME)
144 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700145 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700146 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
Aaron Kruglikov66cf0b92015-10-26 15:46:54 -0700147 .withPersistence()
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700148 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800149
150 // Reset ready for tests to add their own expectations
151 reset(clusterCommunicator);
152 }
153
154 @After
155 public void tearDown() {
156 reset(clusterCommunicator);
157 ecMap.destroy();
158 }
159
Ray Milkey8dc82082015-02-20 16:22:38 -0800160 @SuppressWarnings("unchecked")
161 private EventuallyConsistentMapListener<String, String> getListener() {
162 return createMock(EventuallyConsistentMapListener.class);
163 }
164
Jonathan Hart584d2f32015-01-27 19:46:14 -0800165 @Test
166 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800167 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800168
169 assertEquals(0, ecMap.size());
170 ecMap.put(KEY1, VALUE1);
171 assertEquals(1, ecMap.size());
172 ecMap.put(KEY1, VALUE2);
173 assertEquals(1, ecMap.size());
174 ecMap.put(KEY2, VALUE2);
175 assertEquals(2, ecMap.size());
176 for (int i = 0; i < 10; i++) {
177 ecMap.put("" + i, "" + i);
178 }
179 assertEquals(12, ecMap.size());
180 ecMap.remove(KEY1);
181 assertEquals(11, ecMap.size());
182 ecMap.remove(KEY1);
183 assertEquals(11, ecMap.size());
184 }
185
186 @Test
187 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800188 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800189
190 assertTrue(ecMap.isEmpty());
191 ecMap.put(KEY1, VALUE1);
192 assertFalse(ecMap.isEmpty());
193 ecMap.remove(KEY1);
194 assertTrue(ecMap.isEmpty());
195 }
196
197 @Test
198 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800199 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800200
201 assertFalse(ecMap.containsKey(KEY1));
202 ecMap.put(KEY1, VALUE1);
203 assertTrue(ecMap.containsKey(KEY1));
204 assertFalse(ecMap.containsKey(KEY2));
205 ecMap.remove(KEY1);
206 assertFalse(ecMap.containsKey(KEY1));
207 }
208
209 @Test
210 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800211 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800212
213 assertFalse(ecMap.containsValue(VALUE1));
214 ecMap.put(KEY1, VALUE1);
215 assertTrue(ecMap.containsValue(VALUE1));
216 assertFalse(ecMap.containsValue(VALUE2));
217 ecMap.put(KEY1, VALUE2);
218 assertFalse(ecMap.containsValue(VALUE1));
219 assertTrue(ecMap.containsValue(VALUE2));
220 ecMap.remove(KEY1);
221 assertFalse(ecMap.containsValue(VALUE2));
222 }
223
224 @Test
225 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800226 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800227
228 CountDownLatch latch;
229
230 // Local put
231 assertNull(ecMap.get(KEY1));
232 ecMap.put(KEY1, VALUE1);
233 assertEquals(VALUE1, ecMap.get(KEY1));
234
235 // Remote put
Madan Jampani3d76c942015-06-29 23:37:10 -0700236 List<UpdateEntry<String, String>> message
237 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800238
239 // Create a latch so we know when the put operation has finished
240 latch = new CountDownLatch(1);
241 ecMap.addListener(new TestListener(latch));
242
243 assertNull(ecMap.get(KEY2));
Madan Jampani3d76c942015-06-29 23:37:10 -0700244 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800245 assertTrue("External listener never got notified of internal event",
246 latch.await(100, TimeUnit.MILLISECONDS));
247 assertEquals(VALUE2, ecMap.get(KEY2));
248
249 // Local remove
250 ecMap.remove(KEY2);
251 assertNull(ecMap.get(KEY2));
252
253 // Remote remove
Madan Jampani3d76c942015-06-29 23:37:10 -0700254 message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800255
256 // Create a latch so we know when the remove operation has finished
257 latch = new CountDownLatch(1);
258 ecMap.addListener(new TestListener(latch));
259
Madan Jampani3d76c942015-06-29 23:37:10 -0700260 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800261 assertTrue("External listener never got notified of internal event",
262 latch.await(100, TimeUnit.MILLISECONDS));
263 assertNull(ecMap.get(KEY1));
264 }
265
266 @Test
267 public void testPut() throws Exception {
268 // Set up expectations of external events to be sent to listeners during
269 // the test. These don't use timestamps so we can set them all up at once.
270 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800271 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800272 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700273 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800274 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700275 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800276 replay(listener);
277
278 ecMap.addListener(listener);
279
280 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800281 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700282 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800283
284 // Put first value
285 assertNull(ecMap.get(KEY1));
286 ecMap.put(KEY1, VALUE1);
287 assertEquals(VALUE1, ecMap.get(KEY1));
288
289 verify(clusterCommunicator);
290
291 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800292 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700293 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800294
295 // Update same key to a new value
296 ecMap.put(KEY1, VALUE2);
297 assertEquals(VALUE2, ecMap.get(KEY1));
298
299 verify(clusterCommunicator);
300
301 // Do a put with a older timestamp than the value already there.
302 // The map data should not be changed and no notifications should be sent.
303 reset(clusterCommunicator);
304 replay(clusterCommunicator);
305
306 clockService.turnBackTime();
307 ecMap.put(KEY1, VALUE1);
308 // Value should not have changed.
309 assertEquals(VALUE2, ecMap.get(KEY1));
310
311 verify(clusterCommunicator);
312
313 // Check that our listener received the correct events during the test
314 verify(listener);
315 }
316
317 @Test
318 public void testRemove() throws Exception {
319 // Set up expectations of external events to be sent to listeners during
320 // the test. These don't use timestamps so we can set them all up at once.
321 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800322 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800323 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700324 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700325 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700326 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800327 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700328 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800329 replay(listener);
330
331 ecMap.addListener(listener);
332
333 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800334 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800335 ecMap.put(KEY1, VALUE1);
336 assertEquals(VALUE1, ecMap.get(KEY1));
337
338 // Remove the value and check the correct internal cluster messages
339 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800340 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700341 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800342
343 ecMap.remove(KEY1);
344 assertNull(ecMap.get(KEY1));
345
346 verify(clusterCommunicator);
347
348 // Remove the same value again. Even though the value is no longer in
349 // the map, we expect that the tombstone is updated and another remove
350 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800351 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700352 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800353
354 ecMap.remove(KEY1);
355 assertNull(ecMap.get(KEY1));
356
357 verify(clusterCommunicator);
358
359
360 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800361 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800362
363 ecMap.put(KEY2, VALUE2);
364
365 clockService.turnBackTime();
366
367 // Remove should have no effect, since it has an older timestamp than
368 // the put. Expect no notifications to be sent out
369 reset(clusterCommunicator);
370 replay(clusterCommunicator);
371
372 ecMap.remove(KEY2);
373
374 verify(clusterCommunicator);
375
376 // Check that our listener received the correct events during the test
377 verify(listener);
378 }
379
380 @Test
Madan Jampani4727a112015-07-16 12:12:58 -0700381 public void testCompute() throws Exception {
382 // Set up expectations of external events to be sent to listeners during
383 // the test. These don't use timestamps so we can set them all up at once.
384 EventuallyConsistentMapListener<String, String> listener
385 = getListener();
386 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700387 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700388 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700389 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700390 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700391 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Madan Jampani4727a112015-07-16 12:12:58 -0700392 replay(listener);
393
394 ecMap.addListener(listener);
395
396 // Put in an initial value
397 expectPeerMessage(clusterCommunicator);
398 ecMap.compute(KEY1, (k, v) -> VALUE1);
399 assertEquals(VALUE1, ecMap.get(KEY1));
400
401 // Remove the value and check the correct internal cluster messages
402 // are sent
403 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
404 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
405
406 ecMap.compute(KEY1, (k, v) -> null);
407 assertNull(ecMap.get(KEY1));
408
409 verify(clusterCommunicator);
410
411 // Remove the same value again. Even though the value is no longer in
412 // the map, we expect that the tombstone is updated and another remove
413 // event is sent to the cluster and external listeners.
414 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
415 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
416
417 ecMap.compute(KEY1, (k, v) -> null);
418 assertNull(ecMap.get(KEY1));
419
420 verify(clusterCommunicator);
421
422 // Put in a new value for us to try and remove
423 expectPeerMessage(clusterCommunicator);
424
425 ecMap.compute(KEY2, (k, v) -> VALUE2);
426
427 clockService.turnBackTime();
428
429 // Remove should have no effect, since it has an older timestamp than
430 // the put. Expect no notifications to be sent out
431 reset(clusterCommunicator);
432 replay(clusterCommunicator);
433
434 ecMap.compute(KEY2, (k, v) -> null);
435
436 verify(clusterCommunicator);
437
438 // Check that our listener received the correct events during the test
439 verify(listener);
440 }
441
442 @Test
Jonathan Hart584d2f32015-01-27 19:46:14 -0800443 public void testPutAll() throws Exception {
444 // putAll() with an empty map is a no-op - no messages will be sent
445 reset(clusterCommunicator);
446 replay(clusterCommunicator);
447
448 ecMap.putAll(new HashMap<>());
449
450 verify(clusterCommunicator);
451
452 // Set up the listener with our expected events
453 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800454 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800455 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700456 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800457 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700458 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800459 replay(listener);
460
461 ecMap.addListener(listener);
462
463 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700464 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800465 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800466
467 Map<String, String> putAllValues = new HashMap<>();
468 putAllValues.put(KEY1, VALUE1);
469 putAllValues.put(KEY2, VALUE2);
470
471 // Put the values in the map
472 ecMap.putAll(putAllValues);
473
474 // Check the correct messages and events were sent
475 verify(clusterCommunicator);
476 verify(listener);
477 }
478
479 @Test
480 public void testClear() throws Exception {
481 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800482 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800483 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700484 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800485 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700486 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800487 replay(listener);
488
489 // clear() on an empty map is a no-op - no messages will be sent
490 reset(clusterCommunicator);
491 replay(clusterCommunicator);
492
493 assertTrue(ecMap.isEmpty());
494 ecMap.clear();
495 verify(clusterCommunicator);
496
497 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800498 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800499 ecMap.put(KEY1, VALUE1);
500 ecMap.put(KEY2, VALUE2);
501
502 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700503 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800504
505 ecMap.clear();
506
507 verify(clusterCommunicator);
508 verify(listener);
509 }
510
511 @Test
512 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800513 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800514
515 assertTrue(ecMap.keySet().isEmpty());
516
517 // Generate some keys
518 Set<String> keys = new HashSet<>();
519 for (int i = 1; i <= 10; i++) {
520 keys.add("" + i);
521 }
522
523 // Put each key in the map
524 keys.forEach(k -> ecMap.put(k, "value" + k));
525
526 // Check keySet() returns the correct value
527 assertEquals(keys, ecMap.keySet());
528
529 // Update the value for one of the keys
530 ecMap.put(keys.iterator().next(), "new-value");
531
532 // Check the key set is still the same
533 assertEquals(keys, ecMap.keySet());
534
535 // Remove a key
536 String removeKey = keys.iterator().next();
537 keys.remove(removeKey);
538 ecMap.remove(removeKey);
539
540 // Check the key set is still correct
541 assertEquals(keys, ecMap.keySet());
542 }
543
544 @Test
545 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800546 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800547
548 assertTrue(ecMap.values().isEmpty());
549
550 // Generate some values
551 Map<String, String> expectedValues = new HashMap<>();
552 for (int i = 1; i <= 10; i++) {
553 expectedValues.put("" + i, "value" + i);
554 }
555
556 // Add them into the map
557 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
558
559 // Check the values collection is correct
560 assertEquals(expectedValues.values().size(), ecMap.values().size());
561 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
562
563 // Update the value for one of the keys
564 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
565 expectedValues.put(first.getKey(), "new-value");
566 ecMap.put(first.getKey(), "new-value");
567
568 // Check the values collection is still correct
569 assertEquals(expectedValues.values().size(), ecMap.values().size());
570 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
571
572 // Remove a key
573 String removeKey = expectedValues.keySet().iterator().next();
574 expectedValues.remove(removeKey);
575 ecMap.remove(removeKey);
576
577 // Check the values collection is still correct
578 assertEquals(expectedValues.values().size(), ecMap.values().size());
579 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
580 }
581
582 @Test
583 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800584 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800585
586 assertTrue(ecMap.entrySet().isEmpty());
587
588 // Generate some values
589 Map<String, String> expectedValues = new HashMap<>();
590 for (int i = 1; i <= 10; i++) {
591 expectedValues.put("" + i, "value" + i);
592 }
593
594 // Add them into the map
595 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
596
597 // Check the entry set is correct
598 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
599
600 // Update the value for one of the keys
601 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
602 expectedValues.put(first.getKey(), "new-value");
603 ecMap.put(first.getKey(), "new-value");
604
605 // Check the entry set is still correct
606 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
607
608 // Remove a key
609 String removeKey = expectedValues.keySet().iterator().next();
610 expectedValues.remove(removeKey);
611 ecMap.remove(removeKey);
612
613 // Check the entry set is still correct
614 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
615 }
616
617 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
618 if (expectedMap.entrySet().size() != actual.size()) {
619 return false;
620 }
621
622 for (Map.Entry<String, String> e : actual) {
623 if (!expectedMap.containsKey(e.getKey())) {
624 return false;
625 }
626 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
627 return false;
628 }
629 }
630 return true;
631 }
632
633 @Test
634 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800635 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jon Halld198b882016-05-18 16:44:40 -0700636 clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800637 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
638
639 replay(clusterCommunicator);
640
641 ecMap.destroy();
642
643 verify(clusterCommunicator);
644
645 try {
646 ecMap.get(KEY1);
647 fail("get after destroy should throw exception");
648 } catch (IllegalStateException e) {
649 assertTrue(true);
650 }
651
652 try {
653 ecMap.put(KEY1, VALUE1);
654 fail("put after destroy should throw exception");
655 } catch (IllegalStateException e) {
656 assertTrue(true);
657 }
658 }
659
Madan Jampani3d76c942015-06-29 23:37:10 -0700660 private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
661 return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800662 }
663
Madan Jampani3d76c942015-06-29 23:37:10 -0700664 private List<UpdateEntry<String, String>> generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700665 String key1, String value1, String key2, String value2) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700666 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800667
668 Timestamp timestamp1 = clockService.peek(1);
669 Timestamp timestamp2 = clockService.peek(2);
670
Madan Jampani3d76c942015-06-29 23:37:10 -0700671 list.add(generatePutMessage(key1, value1, timestamp1));
672 list.add(generatePutMessage(key2, value2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800673
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700674 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800675 }
676
Madan Jampani3d76c942015-06-29 23:37:10 -0700677 private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
678 return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800679 }
680
Madan Jampani3d76c942015-06-29 23:37:10 -0700681 private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
682 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800683
684 Timestamp timestamp1 = clockService.peek(1);
685 Timestamp timestamp2 = clockService.peek(2);
686
Madan Jampani3d76c942015-06-29 23:37:10 -0700687 list.add(generateRemoveMessage(key1, timestamp1));
688 list.add(generateRemoveMessage(key2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800689
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700690 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800691 }
692
693 /**
694 * Sets up a mock ClusterCommunicationService to expect a specific cluster
695 * message to be broadcast to the cluster.
696 *
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700697 * @param message message we expect to be sent
Jonathan Hart584d2f32015-01-27 19:46:14 -0800698 * @param clusterCommunicator a mock ClusterCommunicationService to set up
699 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800700 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700701 private static <T> void expectSpecificBroadcastMessage(
702 T message,
703 MessageSubject subject,
704 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800705 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700706 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
707 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800708 replay(clusterCommunicator);
709 }
710
711 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800712 * Sets up a mock ClusterCommunicationService to expect a specific cluster
713 * message to be multicast to the cluster.
714 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700715 * @param message message we expect to be sent
716 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800717 * @param clusterCommunicator a mock ClusterCommunicationService to set up
718 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800719 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700720 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800721 ClusterCommunicationService clusterCommunicator) {
722 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700723 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
724 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800725 replay(clusterCommunicator);
726 }
727
728
729 /**
730 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800731 * that is sent to it. This is useful for unit tests where we aren't
732 * interested in testing the messaging component.
733 *
734 * @param clusterCommunicator a mock ClusterCommunicationService to set up
735 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800736 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700737 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800738 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800739// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
740// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700741 expect(clusterCommunicator.<T>unicast(
742 anyObject(),
743 anyObject(MessageSubject.class),
744 anyObject(Function.class),
745 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700746 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800747 .anyTimes();
748 replay(clusterCommunicator);
749 }
750
751 /**
752 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
753 * that is sent to it. This is useful for unit tests where we aren't
754 * interested in testing the messaging component.
755 *
756 * @param clusterCommunicator a mock ClusterCommunicationService to set up
757 */
758 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800759 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700760 clusterCommunicator.<AbstractEvent>multicast(
761 anyObject(AbstractEvent.class),
762 anyObject(MessageSubject.class),
763 anyObject(Function.class),
764 anyObject(Set.class));
765 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800766 replay(clusterCommunicator);
767 }
768
769 /**
770 * ClusterCommunicationService implementation that the map's addSubscriber
771 * call will delegate to. This means we can get a reference to the
772 * internal cluster message handler used by the map, so that we can simulate
773 * events coming in from other instances.
774 */
775 private final class TestClusterCommunicationService
Ray Milkeyb3c5ce22015-08-10 09:07:36 -0700776 extends ClusterCommunicationServiceAdapter {
Madan Jampani27b69c62015-05-15 15:49:02 -0700777
778 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700779 public <M> void addSubscriber(MessageSubject subject,
780 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700781 Executor executor) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700782 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
783 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
Jon Halld198b882016-05-18 16:44:40 -0700784 } else if (subject.equals(UPDATE_REQUEST_SUBJECT)) {
785 requestHandler = (Consumer<Collection<UpdateRequest<String>>>) handler;
Madan Jampani29f52a32016-04-18 15:20:52 -0700786 } else {
787 throw new RuntimeException("Unexpected message subject " + subject.toString());
788 }
789 }
790
791 @Override
792 public <M, R> void addSubscriber(MessageSubject subject,
793 Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
794 if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
795 antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
Madan Jampani3d76c942015-06-29 23:37:10 -0700796 } else {
797 throw new RuntimeException("Unexpected message subject " + subject.toString());
798 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700799 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800800 }
801
802 /**
803 * ClockService implementation that gives out timestamps based on a
804 * sequential counter. This clock service enables more control over the
805 * timestamps that are given out, including being able to "turn back time"
806 * to give out timestamps from the past.
807 *
808 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800809 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800810 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700811 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800812
813 private static final long INITIAL_VALUE = 1;
814 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
815
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800816 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800817 return new TestTimestamp(counter.getAndIncrement());
818 }
819
820 /**
821 * Returns what the next timestamp will be without consuming the
822 * timestamp. This allows test code to set expectations correctly while
823 * still allowing the CUT to get the same timestamp.
824 *
825 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800826 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800827 */
828 public Timestamp peekAtNextTimestamp() {
829 return peek(1);
830 }
831
832 /**
833 * Returns the ith timestamp to be given out in the future without
834 * consuming the timestamp. For example, i=1 returns the next timestamp,
835 * i=2 returns the timestamp after that, and so on.
836 *
837 * @param i number of the timestamp to peek at
838 * @return the ith timestamp that will be given out
839 */
840 public Timestamp peek(int i) {
841 checkArgument(i > 0, "i must be a positive integer");
842
843 return new TestTimestamp(counter.get() + i - 1);
844 }
845
846 /**
847 * Turns the clock back two ticks, so the next call to getTimestamp will
848 * return an older timestamp than the previous call to getTimestamp.
849 */
850 public void turnBackTime() {
851 // Not atomic, but should be OK for these tests.
852 counter.decrementAndGet();
853 counter.decrementAndGet();
854 }
855
856 }
857
858 /**
859 * Timestamp implementation where the value of the timestamp can be
860 * specified explicitly at creation time.
861 */
862 private class TestTimestamp implements Timestamp {
863
864 private final long timestamp;
865
866 /**
867 * Creates a new timestamp that has the specified value.
868 *
869 * @param timestamp value of the timestamp
870 */
871 public TestTimestamp(long timestamp) {
872 this.timestamp = timestamp;
873 }
874
875 @Override
876 public int compareTo(Timestamp o) {
877 checkArgument(o instanceof TestTimestamp);
878 TestTimestamp otherTimestamp = (TestTimestamp) o;
879 return ComparisonChain.start()
880 .compare(this.timestamp, otherTimestamp.timestamp)
881 .result();
882 }
883 }
884
885 /**
886 * EventuallyConsistentMapListener implementation which triggers a latch
887 * when it receives an event.
888 */
889 private class TestListener implements EventuallyConsistentMapListener<String, String> {
890 private CountDownLatch latch;
891
892 /**
893 * Creates a new listener that will trigger the specified latch when it
894 * receives and event.
895 *
896 * @param latch the latch to trigger on events
897 */
898 public TestListener(CountDownLatch latch) {
899 this.latch = latch;
900 }
901
902 @Override
903 public void event(EventuallyConsistentMapEvent<String, String> event) {
904 latch.countDown();
905 }
906 }
907}