blob: b5696a24fecdc9a08cad6f968242d91323c558ac [file] [log] [blame]
Jonathan Hart584d2f32015-01-27 19:46:14 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart77bdd262015-02-03 09:07:48 -080016package org.onosproject.store.ecmap;
Jonathan Hart584d2f32015-01-27 19:46:14 -080017
18import com.google.common.collect.ComparisonChain;
Madan Jampani3d76c942015-06-29 23:37:10 -070019import com.google.common.collect.ImmutableList;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080020import com.google.common.collect.ImmutableSet;
Madan Jampanib28e4ad2015-02-19 12:31:37 -080021import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani3e033bd2015-04-08 13:03:49 -070022
Jonathan Hart584d2f32015-01-27 19:46:14 -080023import org.junit.After;
24import org.junit.Before;
25import org.junit.Test;
26import org.onlab.packet.IpAddress;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.DefaultControllerNode;
31import org.onosproject.cluster.NodeId;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070032import org.onosproject.event.AbstractEvent;
Jonathan Hart584d2f32015-01-27 19:46:14 -080033import org.onosproject.store.Timestamp;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080035import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
36import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani3e033bd2015-04-08 13:03:49 -070037import org.onosproject.store.impl.LogicalTimestamp;
Jonathan Hart63939a32015-05-08 11:57:03 -070038import org.onosproject.store.service.WallClockTimestamp;
Jonathan Hart584d2f32015-01-27 19:46:14 -080039import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.serializers.KryoSerializer;
Jonathan Hart6ec029a2015-03-24 17:12:35 -070041import org.onosproject.store.service.EventuallyConsistentMap;
42import org.onosproject.store.service.EventuallyConsistentMapEvent;
43import org.onosproject.store.service.EventuallyConsistentMapListener;
Jonathan Hart584d2f32015-01-27 19:46:14 -080044
Jonathan Hart584d2f32015-01-27 19:46:14 -080045import java.util.ArrayList;
Madan Jampani3d76c942015-06-29 23:37:10 -070046import java.util.Collection;
Jonathan Hart584d2f32015-01-27 19:46:14 -080047import java.util.HashMap;
48import java.util.HashSet;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070049import java.util.List;
Jonathan Hart584d2f32015-01-27 19:46:14 -080050import java.util.Map;
51import java.util.Objects;
Madan Jampani3d76c942015-06-29 23:37:10 -070052import java.util.Optional;
Jonathan Hart584d2f32015-01-27 19:46:14 -080053import java.util.Set;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070054import java.util.concurrent.CompletableFuture;
Jonathan Hart584d2f32015-01-27 19:46:14 -080055import java.util.concurrent.CountDownLatch;
Madan Jampaniec5ae342015-04-13 15:43:10 -070056import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080057import java.util.concurrent.ExecutorService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080058import java.util.concurrent.TimeUnit;
59import java.util.concurrent.atomic.AtomicLong;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070060import java.util.function.Consumer;
61import java.util.function.Function;
Jonathan Hart584d2f32015-01-27 19:46:14 -080062
63import static com.google.common.base.Preconditions.checkArgument;
64import static junit.framework.TestCase.assertFalse;
65import static org.easymock.EasyMock.*;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080066import static org.junit.Assert.*;
Jonathan Hart584d2f32015-01-27 19:46:14 -080067
68/**
69 * Unit tests for EventuallyConsistentMapImpl.
70 */
71public class EventuallyConsistentMapImplTest {
72
73 private EventuallyConsistentMap<String, String> ecMap;
74
75 private ClusterService clusterService;
76 private ClusterCommunicationService clusterCommunicator;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080077 private SequentialClockService<String, String> clockService;
Jonathan Hart584d2f32015-01-27 19:46:14 -080078
79 private static final String MAP_NAME = "test";
Brian O'Connoreeaea2c2015-03-05 16:24:34 -080080 private static final MessageSubject UPDATE_MESSAGE_SUBJECT
Jonathan Hart584d2f32015-01-27 19:46:14 -080081 = new MessageSubject("ecm-" + MAP_NAME + "-update");
Jonathan Hart584d2f32015-01-27 19:46:14 -080082 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
83 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
84
85 private static final String KEY1 = "one";
86 private static final String KEY2 = "two";
87 private static final String VALUE1 = "oneValue";
88 private static final String VALUE2 = "twoValue";
89
90 private final ControllerNode self =
91 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
92
Madan Jampani3d76c942015-06-29 23:37:10 -070093 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
94 private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
Jonathan Hart584d2f32015-01-27 19:46:14 -080095
96 /*
97 * Serialization is a bit tricky here. We need to serialize in the tests
98 * to set the expectations, which will use this serializer here, but the
99 * EventuallyConsistentMap will use its own internal serializer. This means
100 * this serializer must be set up exactly the same as map's internal
101 * serializer.
102 */
103 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
104 @Override
105 protected void setupKryoPool() {
106 serializerPool = KryoNamespace.newBuilder()
107 // Classes we give to the map
108 .register(KryoNamespaces.API)
109 .register(TestTimestamp.class)
110 // Below is the classes that the map internally registers
Madan Jampani3e033bd2015-04-08 13:03:49 -0700111 .register(LogicalTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800112 .register(WallClockTimestamp.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800113 .register(ArrayList.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800114 .register(AntiEntropyAdvertisement.class)
115 .register(HashMap.class)
Madan Jampani3d76c942015-06-29 23:37:10 -0700116 .register(Optional.class)
Jonathan Hart584d2f32015-01-27 19:46:14 -0800117 .build();
118 }
119 };
120
121 @Before
122 public void setUp() throws Exception {
123 clusterService = createMock(ClusterService.class);
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800124 expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
125 expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800126 replay(clusterService);
127
128 clusterCommunicator = createMock(ClusterCommunicationService.class);
129
130 // Add expectation for adding cluster message subscribers which
131 // delegate to our ClusterCommunicationService implementation. This
132 // allows us to get a reference to the map's internal cluster message
133 // handlers so we can induce events coming in from a peer.
Madan Jampani3d76c942015-06-29 23:37:10 -0700134 clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
135 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
136 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800137
138 replay(clusterCommunicator);
139
140 clockService = new SequentialClockService<>();
141
142 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
143 .register(KryoNamespaces.API)
144 .register(TestTimestamp.class);
145
Madan Jampani175e8fd2015-05-20 14:10:45 -0700146 ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700147 clusterService, clusterCommunicator)
148 .withName(MAP_NAME)
149 .withSerializer(serializer)
Madan Jampanibcf1a482015-06-24 19:05:56 -0700150 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
Jonathan Hart6ec029a2015-03-24 17:12:35 -0700151 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
152 .build();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800153
154 // Reset ready for tests to add their own expectations
155 reset(clusterCommunicator);
156 }
157
158 @After
159 public void tearDown() {
160 reset(clusterCommunicator);
161 ecMap.destroy();
162 }
163
Ray Milkey8dc82082015-02-20 16:22:38 -0800164 @SuppressWarnings("unchecked")
165 private EventuallyConsistentMapListener<String, String> getListener() {
166 return createMock(EventuallyConsistentMapListener.class);
167 }
168
Jonathan Hart584d2f32015-01-27 19:46:14 -0800169 @Test
170 public void testSize() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800171 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800172
173 assertEquals(0, ecMap.size());
174 ecMap.put(KEY1, VALUE1);
175 assertEquals(1, ecMap.size());
176 ecMap.put(KEY1, VALUE2);
177 assertEquals(1, ecMap.size());
178 ecMap.put(KEY2, VALUE2);
179 assertEquals(2, ecMap.size());
180 for (int i = 0; i < 10; i++) {
181 ecMap.put("" + i, "" + i);
182 }
183 assertEquals(12, ecMap.size());
184 ecMap.remove(KEY1);
185 assertEquals(11, ecMap.size());
186 ecMap.remove(KEY1);
187 assertEquals(11, ecMap.size());
188 }
189
190 @Test
191 public void testIsEmpty() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800192 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800193
194 assertTrue(ecMap.isEmpty());
195 ecMap.put(KEY1, VALUE1);
196 assertFalse(ecMap.isEmpty());
197 ecMap.remove(KEY1);
198 assertTrue(ecMap.isEmpty());
199 }
200
201 @Test
202 public void testContainsKey() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800203 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800204
205 assertFalse(ecMap.containsKey(KEY1));
206 ecMap.put(KEY1, VALUE1);
207 assertTrue(ecMap.containsKey(KEY1));
208 assertFalse(ecMap.containsKey(KEY2));
209 ecMap.remove(KEY1);
210 assertFalse(ecMap.containsKey(KEY1));
211 }
212
213 @Test
214 public void testContainsValue() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800215 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800216
217 assertFalse(ecMap.containsValue(VALUE1));
218 ecMap.put(KEY1, VALUE1);
219 assertTrue(ecMap.containsValue(VALUE1));
220 assertFalse(ecMap.containsValue(VALUE2));
221 ecMap.put(KEY1, VALUE2);
222 assertFalse(ecMap.containsValue(VALUE1));
223 assertTrue(ecMap.containsValue(VALUE2));
224 ecMap.remove(KEY1);
225 assertFalse(ecMap.containsValue(VALUE2));
226 }
227
228 @Test
229 public void testGet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800230 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800231
232 CountDownLatch latch;
233
234 // Local put
235 assertNull(ecMap.get(KEY1));
236 ecMap.put(KEY1, VALUE1);
237 assertEquals(VALUE1, ecMap.get(KEY1));
238
239 // Remote put
Madan Jampani3d76c942015-06-29 23:37:10 -0700240 List<UpdateEntry<String, String>> message
241 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800242
243 // Create a latch so we know when the put operation has finished
244 latch = new CountDownLatch(1);
245 ecMap.addListener(new TestListener(latch));
246
247 assertNull(ecMap.get(KEY2));
Madan Jampani3d76c942015-06-29 23:37:10 -0700248 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800249 assertTrue("External listener never got notified of internal event",
250 latch.await(100, TimeUnit.MILLISECONDS));
251 assertEquals(VALUE2, ecMap.get(KEY2));
252
253 // Local remove
254 ecMap.remove(KEY2);
255 assertNull(ecMap.get(KEY2));
256
257 // Remote remove
Madan Jampani3d76c942015-06-29 23:37:10 -0700258 message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800259
260 // Create a latch so we know when the remove operation has finished
261 latch = new CountDownLatch(1);
262 ecMap.addListener(new TestListener(latch));
263
Madan Jampani3d76c942015-06-29 23:37:10 -0700264 updateHandler.accept(message);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800265 assertTrue("External listener never got notified of internal event",
266 latch.await(100, TimeUnit.MILLISECONDS));
267 assertNull(ecMap.get(KEY1));
268 }
269
270 @Test
271 public void testPut() throws Exception {
272 // Set up expectations of external events to be sent to listeners during
273 // the test. These don't use timestamps so we can set them all up at once.
274 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800275 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800276 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700277 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800278 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700279 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800280 replay(listener);
281
282 ecMap.addListener(listener);
283
284 // Set up expected internal message to be broadcast to peers on first put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800285 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700286 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800287
288 // Put first value
289 assertNull(ecMap.get(KEY1));
290 ecMap.put(KEY1, VALUE1);
291 assertEquals(VALUE1, ecMap.get(KEY1));
292
293 verify(clusterCommunicator);
294
295 // Set up expected internal message to be broadcast to peers on second put
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800296 expectSpecificMulticastMessage(generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700297 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800298
299 // Update same key to a new value
300 ecMap.put(KEY1, VALUE2);
301 assertEquals(VALUE2, ecMap.get(KEY1));
302
303 verify(clusterCommunicator);
304
305 // Do a put with a older timestamp than the value already there.
306 // The map data should not be changed and no notifications should be sent.
307 reset(clusterCommunicator);
308 replay(clusterCommunicator);
309
310 clockService.turnBackTime();
311 ecMap.put(KEY1, VALUE1);
312 // Value should not have changed.
313 assertEquals(VALUE2, ecMap.get(KEY1));
314
315 verify(clusterCommunicator);
316
317 // Check that our listener received the correct events during the test
318 verify(listener);
319 }
320
321 @Test
322 public void testRemove() throws Exception {
323 // Set up expectations of external events to be sent to listeners during
324 // the test. These don't use timestamps so we can set them all up at once.
325 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800326 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800327 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700328 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani43e9c9c2015-06-26 14:16:46 -0700329 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700330 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800331 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700332 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800333 replay(listener);
334
335 ecMap.addListener(listener);
336
337 // Put in an initial value
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800338 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800339 ecMap.put(KEY1, VALUE1);
340 assertEquals(VALUE1, ecMap.get(KEY1));
341
342 // Remove the value and check the correct internal cluster messages
343 // are sent
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800344 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700345 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800346
347 ecMap.remove(KEY1);
348 assertNull(ecMap.get(KEY1));
349
350 verify(clusterCommunicator);
351
352 // Remove the same value again. Even though the value is no longer in
353 // the map, we expect that the tombstone is updated and another remove
354 // event is sent to the cluster and external listeners.
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800355 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700356 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800357
358 ecMap.remove(KEY1);
359 assertNull(ecMap.get(KEY1));
360
361 verify(clusterCommunicator);
362
363
364 // Put in a new value for us to try and remove
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800365 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800366
367 ecMap.put(KEY2, VALUE2);
368
369 clockService.turnBackTime();
370
371 // Remove should have no effect, since it has an older timestamp than
372 // the put. Expect no notifications to be sent out
373 reset(clusterCommunicator);
374 replay(clusterCommunicator);
375
376 ecMap.remove(KEY2);
377
378 verify(clusterCommunicator);
379
380 // Check that our listener received the correct events during the test
381 verify(listener);
382 }
383
384 @Test
Madan Jampani4727a112015-07-16 12:12:58 -0700385 public void testCompute() throws Exception {
386 // Set up expectations of external events to be sent to listeners during
387 // the test. These don't use timestamps so we can set them all up at once.
388 EventuallyConsistentMapListener<String, String> listener
389 = getListener();
390 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700391 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700392 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700393 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Madan Jampani4727a112015-07-16 12:12:58 -0700394 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700395 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Madan Jampani4727a112015-07-16 12:12:58 -0700396 replay(listener);
397
398 ecMap.addListener(listener);
399
400 // Put in an initial value
401 expectPeerMessage(clusterCommunicator);
402 ecMap.compute(KEY1, (k, v) -> VALUE1);
403 assertEquals(VALUE1, ecMap.get(KEY1));
404
405 // Remove the value and check the correct internal cluster messages
406 // are sent
407 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
408 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
409
410 ecMap.compute(KEY1, (k, v) -> null);
411 assertNull(ecMap.get(KEY1));
412
413 verify(clusterCommunicator);
414
415 // Remove the same value again. Even though the value is no longer in
416 // the map, we expect that the tombstone is updated and another remove
417 // event is sent to the cluster and external listeners.
418 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
419 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
420
421 ecMap.compute(KEY1, (k, v) -> null);
422 assertNull(ecMap.get(KEY1));
423
424 verify(clusterCommunicator);
425
426 // Put in a new value for us to try and remove
427 expectPeerMessage(clusterCommunicator);
428
429 ecMap.compute(KEY2, (k, v) -> VALUE2);
430
431 clockService.turnBackTime();
432
433 // Remove should have no effect, since it has an older timestamp than
434 // the put. Expect no notifications to be sent out
435 reset(clusterCommunicator);
436 replay(clusterCommunicator);
437
438 ecMap.compute(KEY2, (k, v) -> null);
439
440 verify(clusterCommunicator);
441
442 // Check that our listener received the correct events during the test
443 verify(listener);
444 }
445
446 @Test
Jonathan Hart584d2f32015-01-27 19:46:14 -0800447 public void testPutAll() throws Exception {
448 // putAll() with an empty map is a no-op - no messages will be sent
449 reset(clusterCommunicator);
450 replay(clusterCommunicator);
451
452 ecMap.putAll(new HashMap<>());
453
454 verify(clusterCommunicator);
455
456 // Set up the listener with our expected events
457 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800458 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800459 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700460 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800461 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700462 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800463 replay(listener);
464
465 ecMap.addListener(listener);
466
467 // Expect a multi-update inter-instance message
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700468 expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800469 clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800470
471 Map<String, String> putAllValues = new HashMap<>();
472 putAllValues.put(KEY1, VALUE1);
473 putAllValues.put(KEY2, VALUE2);
474
475 // Put the values in the map
476 ecMap.putAll(putAllValues);
477
478 // Check the correct messages and events were sent
479 verify(clusterCommunicator);
480 verify(listener);
481 }
482
483 @Test
484 public void testClear() throws Exception {
485 EventuallyConsistentMapListener<String, String> listener
Ray Milkey8dc82082015-02-20 16:22:38 -0800486 = getListener();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800487 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700488 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800489 listener.event(new EventuallyConsistentMapEvent<>(
Madan Jampanicab114c2015-07-23 00:14:19 -0700490 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800491 replay(listener);
492
493 // clear() on an empty map is a no-op - no messages will be sent
494 reset(clusterCommunicator);
495 replay(clusterCommunicator);
496
497 assertTrue(ecMap.isEmpty());
498 ecMap.clear();
499 verify(clusterCommunicator);
500
501 // Put some items in the map
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800502 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800503 ecMap.put(KEY1, VALUE1);
504 ecMap.put(KEY2, VALUE2);
505
506 ecMap.addListener(listener);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700507 expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800508
509 ecMap.clear();
510
511 verify(clusterCommunicator);
512 verify(listener);
513 }
514
515 @Test
516 public void testKeySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800517 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800518
519 assertTrue(ecMap.keySet().isEmpty());
520
521 // Generate some keys
522 Set<String> keys = new HashSet<>();
523 for (int i = 1; i <= 10; i++) {
524 keys.add("" + i);
525 }
526
527 // Put each key in the map
528 keys.forEach(k -> ecMap.put(k, "value" + k));
529
530 // Check keySet() returns the correct value
531 assertEquals(keys, ecMap.keySet());
532
533 // Update the value for one of the keys
534 ecMap.put(keys.iterator().next(), "new-value");
535
536 // Check the key set is still the same
537 assertEquals(keys, ecMap.keySet());
538
539 // Remove a key
540 String removeKey = keys.iterator().next();
541 keys.remove(removeKey);
542 ecMap.remove(removeKey);
543
544 // Check the key set is still correct
545 assertEquals(keys, ecMap.keySet());
546 }
547
548 @Test
549 public void testValues() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800550 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800551
552 assertTrue(ecMap.values().isEmpty());
553
554 // Generate some values
555 Map<String, String> expectedValues = new HashMap<>();
556 for (int i = 1; i <= 10; i++) {
557 expectedValues.put("" + i, "value" + i);
558 }
559
560 // Add them into the map
561 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
562
563 // Check the values collection is correct
564 assertEquals(expectedValues.values().size(), ecMap.values().size());
565 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
566
567 // Update the value for one of the keys
568 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
569 expectedValues.put(first.getKey(), "new-value");
570 ecMap.put(first.getKey(), "new-value");
571
572 // Check the values collection is still correct
573 assertEquals(expectedValues.values().size(), ecMap.values().size());
574 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
575
576 // Remove a key
577 String removeKey = expectedValues.keySet().iterator().next();
578 expectedValues.remove(removeKey);
579 ecMap.remove(removeKey);
580
581 // Check the values collection is still correct
582 assertEquals(expectedValues.values().size(), ecMap.values().size());
583 expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
584 }
585
586 @Test
587 public void testEntrySet() throws Exception {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800588 expectPeerMessage(clusterCommunicator);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800589
590 assertTrue(ecMap.entrySet().isEmpty());
591
592 // Generate some values
593 Map<String, String> expectedValues = new HashMap<>();
594 for (int i = 1; i <= 10; i++) {
595 expectedValues.put("" + i, "value" + i);
596 }
597
598 // Add them into the map
599 expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
600
601 // Check the entry set is correct
602 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
603
604 // Update the value for one of the keys
605 Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
606 expectedValues.put(first.getKey(), "new-value");
607 ecMap.put(first.getKey(), "new-value");
608
609 // Check the entry set is still correct
610 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
611
612 // Remove a key
613 String removeKey = expectedValues.keySet().iterator().next();
614 expectedValues.remove(removeKey);
615 ecMap.remove(removeKey);
616
617 // Check the entry set is still correct
618 assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
619 }
620
621 private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
622 if (expectedMap.entrySet().size() != actual.size()) {
623 return false;
624 }
625
626 for (Map.Entry<String, String> e : actual) {
627 if (!expectedMap.containsKey(e.getKey())) {
628 return false;
629 }
630 if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
631 return false;
632 }
633 }
634 return true;
635 }
636
637 @Test
638 public void testDestroy() throws Exception {
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800639 clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
Jonathan Hart584d2f32015-01-27 19:46:14 -0800640 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
641
642 replay(clusterCommunicator);
643
644 ecMap.destroy();
645
646 verify(clusterCommunicator);
647
648 try {
649 ecMap.get(KEY1);
650 fail("get after destroy should throw exception");
651 } catch (IllegalStateException e) {
652 assertTrue(true);
653 }
654
655 try {
656 ecMap.put(KEY1, VALUE1);
657 fail("put after destroy should throw exception");
658 } catch (IllegalStateException e) {
659 assertTrue(true);
660 }
661 }
662
Madan Jampani3d76c942015-06-29 23:37:10 -0700663 private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
664 return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800665 }
666
Madan Jampani3d76c942015-06-29 23:37:10 -0700667 private List<UpdateEntry<String, String>> generatePutMessage(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700668 String key1, String value1, String key2, String value2) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700669 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800670
671 Timestamp timestamp1 = clockService.peek(1);
672 Timestamp timestamp2 = clockService.peek(2);
673
Madan Jampani3d76c942015-06-29 23:37:10 -0700674 list.add(generatePutMessage(key1, value1, timestamp1));
675 list.add(generatePutMessage(key2, value2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800676
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700677 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800678 }
679
Madan Jampani3d76c942015-06-29 23:37:10 -0700680 private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
681 return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800682 }
683
Madan Jampani3d76c942015-06-29 23:37:10 -0700684 private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
685 List<UpdateEntry<String, String>> list = new ArrayList<>();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800686
687 Timestamp timestamp1 = clockService.peek(1);
688 Timestamp timestamp2 = clockService.peek(2);
689
Madan Jampani3d76c942015-06-29 23:37:10 -0700690 list.add(generateRemoveMessage(key1, timestamp1));
691 list.add(generateRemoveMessage(key2, timestamp2));
Jonathan Hart584d2f32015-01-27 19:46:14 -0800692
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700693 return list;
Jonathan Hart584d2f32015-01-27 19:46:14 -0800694 }
695
696 /**
697 * Sets up a mock ClusterCommunicationService to expect a specific cluster
698 * message to be broadcast to the cluster.
699 *
700 * @param m message we expect to be sent
701 * @param clusterCommunicator a mock ClusterCommunicationService to set up
702 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800703 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700704 private static <T> void expectSpecificBroadcastMessage(
705 T message,
706 MessageSubject subject,
707 ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800708 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700709 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
710 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800711 replay(clusterCommunicator);
712 }
713
714 /**
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800715 * Sets up a mock ClusterCommunicationService to expect a specific cluster
716 * message to be multicast to the cluster.
717 *
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700718 * @param message message we expect to be sent
719 * @param subject subject we expect to be sent to
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800720 * @param clusterCommunicator a mock ClusterCommunicationService to set up
721 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800722 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700723 private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800724 ClusterCommunicationService clusterCommunicator) {
725 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700726 clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
727 expectLastCall().anyTimes();
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800728 replay(clusterCommunicator);
729 }
730
731
732 /**
733 * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
Jonathan Hart584d2f32015-01-27 19:46:14 -0800734 * that is sent to it. This is useful for unit tests where we aren't
735 * interested in testing the messaging component.
736 *
737 * @param clusterCommunicator a mock ClusterCommunicationService to set up
738 */
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800739 //FIXME rename
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700740 private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800741 reset(clusterCommunicator);
Brian O'Connoreeaea2c2015-03-05 16:24:34 -0800742// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
743// anyObject(Iterable.class)))
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700744 expect(clusterCommunicator.<T>unicast(
745 anyObject(),
746 anyObject(MessageSubject.class),
747 anyObject(Function.class),
748 anyObject(NodeId.class)))
Madan Jampani175e8fd2015-05-20 14:10:45 -0700749 .andReturn(CompletableFuture.completedFuture(null))
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800750 .anyTimes();
751 replay(clusterCommunicator);
752 }
753
754 /**
755 * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
756 * that is sent to it. This is useful for unit tests where we aren't
757 * interested in testing the messaging component.
758 *
759 * @param clusterCommunicator a mock ClusterCommunicationService to set up
760 */
761 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800762 reset(clusterCommunicator);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700763 clusterCommunicator.<AbstractEvent>multicast(
764 anyObject(AbstractEvent.class),
765 anyObject(MessageSubject.class),
766 anyObject(Function.class),
767 anyObject(Set.class));
768 expectLastCall().anyTimes();
Jonathan Hart584d2f32015-01-27 19:46:14 -0800769 replay(clusterCommunicator);
770 }
771
772 /**
773 * ClusterCommunicationService implementation that the map's addSubscriber
774 * call will delegate to. This means we can get a reference to the
775 * internal cluster message handler used by the map, so that we can simulate
776 * events coming in from other instances.
777 */
778 private final class TestClusterCommunicationService
779 implements ClusterCommunicationService {
780
781 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800782 public void addSubscriber(MessageSubject subject,
783 ClusterMessageHandler subscriber,
784 ExecutorService executor) {
Madan Jampani2af244a2015-02-22 13:12:01 -0800785 }
786
787 @Override
Jonathan Hart584d2f32015-01-27 19:46:14 -0800788 public void removeSubscriber(MessageSubject subject) {}
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700789
790 @Override
791 public <M> void broadcast(M message, MessageSubject subject,
792 Function<M, byte[]> encoder) {
793 }
794
795 @Override
796 public <M> void broadcastIncludeSelf(M message,
797 MessageSubject subject, Function<M, byte[]> encoder) {
798 }
799
800 @Override
Madan Jampani175e8fd2015-05-20 14:10:45 -0700801 public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700802 Function<M, byte[]> encoder, NodeId toNodeId) {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700803 return null;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700804 }
805
806 @Override
807 public <M> void multicast(M message, MessageSubject subject,
808 Function<M, byte[]> encoder, Set<NodeId> nodes) {
809 }
810
811 @Override
812 public <M, R> CompletableFuture<R> sendAndReceive(M message,
813 MessageSubject subject, Function<M, byte[]> encoder,
814 Function<byte[], R> decoder, NodeId toNodeId) {
815 return null;
816 }
817
818 @Override
819 public <M, R> void addSubscriber(MessageSubject subject,
820 Function<byte[], M> decoder, Function<M, R> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700821 Function<R, byte[]> encoder, Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700822 }
823
824 @Override
Madan Jampani27b69c62015-05-15 15:49:02 -0700825 public <M, R> void addSubscriber(MessageSubject subject,
826 Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler,
827 Function<R, byte[]> encoder) {
828 }
829
830 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700831 public <M> void addSubscriber(MessageSubject subject,
832 Function<byte[], M> decoder, Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700833 Executor executor) {
Madan Jampani3d76c942015-06-29 23:37:10 -0700834 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
835 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
836 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
837 antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
838 } else {
839 throw new RuntimeException("Unexpected message subject " + subject.toString());
840 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700841 }
Jonathan Hart584d2f32015-01-27 19:46:14 -0800842 }
843
844 /**
845 * ClockService implementation that gives out timestamps based on a
846 * sequential counter. This clock service enables more control over the
847 * timestamps that are given out, including being able to "turn back time"
848 * to give out timestamps from the past.
849 *
850 * @param <T> Type that the clock service will give out timestamps for
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800851 * @param <U> Second type that the clock service will give out values for
Jonathan Hart584d2f32015-01-27 19:46:14 -0800852 */
Madan Jampanibcf1a482015-06-24 19:05:56 -0700853 private class SequentialClockService<T, U> {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800854
855 private static final long INITIAL_VALUE = 1;
856 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
857
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800858 public Timestamp getTimestamp(T object, U object2) {
Jonathan Hart584d2f32015-01-27 19:46:14 -0800859 return new TestTimestamp(counter.getAndIncrement());
860 }
861
862 /**
863 * Returns what the next timestamp will be without consuming the
864 * timestamp. This allows test code to set expectations correctly while
865 * still allowing the CUT to get the same timestamp.
866 *
867 * @return timestamp equal to the timestamp that will be returned by the
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800868 * next call to {@link #getTimestamp(T, U)}.
Jonathan Hart584d2f32015-01-27 19:46:14 -0800869 */
870 public Timestamp peekAtNextTimestamp() {
871 return peek(1);
872 }
873
874 /**
875 * Returns the ith timestamp to be given out in the future without
876 * consuming the timestamp. For example, i=1 returns the next timestamp,
877 * i=2 returns the timestamp after that, and so on.
878 *
879 * @param i number of the timestamp to peek at
880 * @return the ith timestamp that will be given out
881 */
882 public Timestamp peek(int i) {
883 checkArgument(i > 0, "i must be a positive integer");
884
885 return new TestTimestamp(counter.get() + i - 1);
886 }
887
888 /**
889 * Turns the clock back two ticks, so the next call to getTimestamp will
890 * return an older timestamp than the previous call to getTimestamp.
891 */
892 public void turnBackTime() {
893 // Not atomic, but should be OK for these tests.
894 counter.decrementAndGet();
895 counter.decrementAndGet();
896 }
897
898 }
899
900 /**
901 * Timestamp implementation where the value of the timestamp can be
902 * specified explicitly at creation time.
903 */
904 private class TestTimestamp implements Timestamp {
905
906 private final long timestamp;
907
908 /**
909 * Creates a new timestamp that has the specified value.
910 *
911 * @param timestamp value of the timestamp
912 */
913 public TestTimestamp(long timestamp) {
914 this.timestamp = timestamp;
915 }
916
917 @Override
918 public int compareTo(Timestamp o) {
919 checkArgument(o instanceof TestTimestamp);
920 TestTimestamp otherTimestamp = (TestTimestamp) o;
921 return ComparisonChain.start()
922 .compare(this.timestamp, otherTimestamp.timestamp)
923 .result();
924 }
925 }
926
927 /**
928 * EventuallyConsistentMapListener implementation which triggers a latch
929 * when it receives an event.
930 */
931 private class TestListener implements EventuallyConsistentMapListener<String, String> {
932 private CountDownLatch latch;
933
934 /**
935 * Creates a new listener that will trigger the specified latch when it
936 * receives and event.
937 *
938 * @param latch the latch to trigger on events
939 */
940 public TestListener(CountDownLatch latch) {
941 this.latch = latch;
942 }
943
944 @Override
945 public void event(EventuallyConsistentMapEvent<String, String> event) {
946 latch.countDown();
947 }
948 }
949}