[ONOS-6479] Synchronously bootstrap EventuallyConsistentMap

Change-Id: I62a800ee731d1b42265b475c219d9d108adc08eb
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 8d41fd2..b12a7f4 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -84,6 +84,10 @@
     private SequentialClockService<String, String> clockService;
 
     private static final String MAP_NAME = "test";
+    private static final MessageSubject BOOTSTRAP_MESSAGE_SUBJECT
+            = new MessageSubject("ecm-" + MAP_NAME + "-bootstrap");
+    private static final MessageSubject INITIALIZE_MESSAGE_SUBJECT
+            = new MessageSubject("ecm-" + MAP_NAME + "-initialize");
     private static final MessageSubject UPDATE_MESSAGE_SUBJECT
             = new MessageSubject("ecm-" + MAP_NAME + "-update");
     private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
@@ -117,6 +121,17 @@
         // delegate to our ClusterCommunicationService implementation. This
         // allows us to get a reference to the map's internal cluster message
         // handlers so we can induce events coming in from a peer.
+        clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
+                anyObject(Function.class),
+                anyObject(Function.class),
+                anyObject(Function.class));
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+        clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
+                anyObject(Function.class),
+                anyObject(Function.class),
+                anyObject(Function.class),
+                anyObject(Executor.class));
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
         clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
                 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
         expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
@@ -481,6 +496,10 @@
         EventuallyConsistentMapListener<String, String> listener
                 = getListener();
         listener.event(new EventuallyConsistentMapEvent<>(
+                MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
+        listener.event(new EventuallyConsistentMapEvent<>(
+                MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
+        listener.event(new EventuallyConsistentMapEvent<>(
                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
         listener.event(new EventuallyConsistentMapEvent<>(
                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
@@ -632,6 +651,8 @@
 
     @Test
     public void testDestroy() throws Exception {
+        clusterCommunicator.removeSubscriber(BOOTSTRAP_MESSAGE_SUBJECT);
+        clusterCommunicator.removeSubscriber(INITIALIZE_MESSAGE_SUBJECT);
         clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
         clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
         clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
@@ -793,7 +814,7 @@
                 Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
             if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
                 antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
-            } else {
+            } else if (!subject.equals(INITIALIZE_MESSAGE_SUBJECT)) {
                 throw new RuntimeException("Unexpected message subject " + subject.toString());
             }
         }