ClusterMessagingProtocolServer: start listening at correct timing

Change-Id: Ie8ed1894ae16c41242aee861440174f011dd689b
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index 283f92d..cf17fbe 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -73,4 +73,12 @@
      * @param subscriber message subscriber
      */
     void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
+
+    /**
+     * Removes a subscriber for the specified message subject.
+     *
+     * @param subject    message subject
+     */
+    void removeSubscriber(MessageSubject subject);
+
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 849ad17..1e47a00 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -166,10 +166,15 @@
 
     @Override
     public void addSubscriber(MessageSubject subject,
-            ClusterMessageHandler subscriber) {
+                              ClusterMessageHandler subscriber) {
         messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
     }
 
+    @Override
+    public void removeSubscriber(MessageSubject subject) {
+        messagingService.unregisterHandler(subject.value());
+    }
+
     private final class InternalClusterMessageHandler implements MessageHandler {
 
         private final ClusterMessageHandler handler;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index b3eaeb4..e2c06c7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -27,18 +27,12 @@
 public class ClusterMessagingProtocolServer implements ProtocolServer {
 
     private final Logger log = getLogger(getClass());
-    private RequestHandler handler;
+    private volatile RequestHandler handler;
+    private ClusterCommunicationService clusterCommunicator;
 
     public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
+        this.clusterCommunicator = clusterCommunicator;
 
-        clusterCommunicator.addSubscriber(
-                ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
-        clusterCommunicator.addSubscriber(
-                ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
-        clusterCommunicator.addSubscriber(
-                ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
-        clusterCommunicator.addSubscriber(
-                ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
     }
 
     @Override
@@ -48,11 +42,23 @@
 
     @Override
     public CompletableFuture<Void> listen() {
+        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
+                                          new CopycatMessageHandler<PingRequest>());
+        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
+                                          new CopycatMessageHandler<SyncRequest>());
+        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
+                                          new CopycatMessageHandler<PollRequest>());
+        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
+                                          new CopycatMessageHandler<SubmitRequest>());
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public CompletableFuture<Void> close() {
+        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
+        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
+        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
+        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
         return CompletableFuture.completedFuture(null);
     }