ClusterMessagingProtocolServer: start listening at correct timing

Change-Id: Ie8ed1894ae16c41242aee861440174f011dd689b
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index b3eaeb4..e2c06c7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -27,18 +27,12 @@
 public class ClusterMessagingProtocolServer implements ProtocolServer {
 
     private final Logger log = getLogger(getClass());
-    private RequestHandler handler;
+    private volatile RequestHandler handler;
+    private ClusterCommunicationService clusterCommunicator;
 
     public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
+        this.clusterCommunicator = clusterCommunicator;
 
-        clusterCommunicator.addSubscriber(
-                ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
-        clusterCommunicator.addSubscriber(
-                ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
-        clusterCommunicator.addSubscriber(
-                ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
-        clusterCommunicator.addSubscriber(
-                ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
     }
 
     @Override
@@ -48,11 +42,23 @@
 
     @Override
     public CompletableFuture<Void> listen() {
+        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
+                                          new CopycatMessageHandler<PingRequest>());
+        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
+                                          new CopycatMessageHandler<SyncRequest>());
+        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
+                                          new CopycatMessageHandler<PollRequest>());
+        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
+                                          new CopycatMessageHandler<SubmitRequest>());
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public CompletableFuture<Void> close() {
+        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
+        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
+        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
+        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
         return CompletableFuture.completedFuture(null);
     }