[ONOS-6479] Synchronously bootstrap EventuallyConsistentMap
Change-Id: I62a800ee731d1b42265b475c219d9d108adc08eb
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 8d41fd2..b12a7f4 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -84,6 +84,10 @@
private SequentialClockService<String, String> clockService;
private static final String MAP_NAME = "test";
+ private static final MessageSubject BOOTSTRAP_MESSAGE_SUBJECT
+ = new MessageSubject("ecm-" + MAP_NAME + "-bootstrap");
+ private static final MessageSubject INITIALIZE_MESSAGE_SUBJECT
+ = new MessageSubject("ecm-" + MAP_NAME + "-initialize");
private static final MessageSubject UPDATE_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-update");
private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
@@ -117,6 +121,17 @@
// delegate to our ClusterCommunicationService implementation. This
// allows us to get a reference to the map's internal cluster message
// handlers so we can induce events coming in from a peer.
+ clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(Function.class),
+ anyObject(Function.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+ clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(Function.class),
+ anyObject(Function.class),
+ anyObject(Executor.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
@@ -481,6 +496,10 @@
EventuallyConsistentMapListener<String, String> listener
= getListener();
listener.event(new EventuallyConsistentMapEvent<>(
+ MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
+ listener.event(new EventuallyConsistentMapEvent<>(
+ MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
+ listener.event(new EventuallyConsistentMapEvent<>(
MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
listener.event(new EventuallyConsistentMapEvent<>(
MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
@@ -632,6 +651,8 @@
@Test
public void testDestroy() throws Exception {
+ clusterCommunicator.removeSubscriber(BOOTSTRAP_MESSAGE_SUBJECT);
+ clusterCommunicator.removeSubscriber(INITIALIZE_MESSAGE_SUBJECT);
clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
@@ -793,7 +814,7 @@
Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
- } else {
+ } else if (!subject.equals(INITIALIZE_MESSAGE_SUBJECT)) {
throw new RuntimeException("Unexpected message subject " + subject.toString());
}
}