ClusterMessagingProtocolServer: start listening at correct timing
Change-Id: Ie8ed1894ae16c41242aee861440174f011dd689b
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index 283f92d..cf17fbe 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -73,4 +73,12 @@
* @param subscriber message subscriber
*/
void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
+
+ /**
+ * Removes a subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ */
+ void removeSubscriber(MessageSubject subject);
+
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 849ad17..1e47a00 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -166,10 +166,15 @@
@Override
public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber) {
+ ClusterMessageHandler subscriber) {
messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
}
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+ messagingService.unregisterHandler(subject.value());
+ }
+
private final class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index b3eaeb4..e2c06c7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -27,18 +27,12 @@
public class ClusterMessagingProtocolServer implements ProtocolServer {
private final Logger log = getLogger(getClass());
- private RequestHandler handler;
+ private volatile RequestHandler handler;
+ private ClusterCommunicationService clusterCommunicator;
public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
+ this.clusterCommunicator = clusterCommunicator;
- clusterCommunicator.addSubscriber(
- ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
- clusterCommunicator.addSubscriber(
- ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
- clusterCommunicator.addSubscriber(
- ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
- clusterCommunicator.addSubscriber(
- ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
}
@Override
@@ -48,11 +42,23 @@
@Override
public CompletableFuture<Void> listen() {
+ clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
+ new CopycatMessageHandler<PingRequest>());
+ clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
+ new CopycatMessageHandler<SyncRequest>());
+ clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
+ new CopycatMessageHandler<PollRequest>());
+ clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
+ new CopycatMessageHandler<SubmitRequest>());
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> close() {
+ clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
+ clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
+ clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
+ clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
return CompletableFuture.completedFuture(null);
}