Starting to add membership management messages.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
index bac32e9..2e2887c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
@@ -12,11 +12,13 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
+import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
+import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
@@ -84,16 +86,20 @@
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
- private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
+ private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
@Activate
public void activate() {
- addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
+ addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
+ addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
log.info("Activated but waiting for delegate");
}
@Deactivate
public void deactivate() {
+ removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
+ removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
+
connectionCustodian.cancel();
if (connectionListener != null) {
connectionListener.shutdown();
@@ -154,7 +160,7 @@
@Override
public void removeNode(DefaultControllerNode node) {
- send(new GoodbyeMessage(node.id()));
+ send(new LeavingMemberMessage(node.id()));
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
@@ -177,7 +183,7 @@
@Override
public void clearAllNodesAndStreams() {
nodes.clear();
- send(new GoodbyeMessage(localNode.id()));
+ send(new LeavingMemberMessage(localNode.id()));
for (ClusterMessageStream stream : streams.values()) {
stream.close();
}
@@ -187,7 +193,7 @@
/**
* Dispatches the specified message to all subscribers to its subject.
*
- * @param message message to dispatch
+ * @param message message to dispatch
* @param fromNodeId node from which the message was received
*/
void dispatch(ClusterMessage message, NodeId fromNodeId) {
@@ -200,7 +206,7 @@
}
/**
- * Removes the stream associated with the specified node.
+ * Adds the stream associated with the specified node.
*
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
@@ -212,6 +218,7 @@
DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
stream.setNode(node);
streams.put(node.id(), stream);
+ send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
return node;
}
@@ -329,11 +336,19 @@
}
}
- private class GoodbyeSubscriber implements MessageSubscriber {
+ private class MembershipSubscriber implements MessageSubscriber {
@Override
public void receive(ClusterMessage message, NodeId fromNodeId) {
- log.info("Received goodbye message from {}", fromNodeId);
- nodesDelegate.nodeRemoved(fromNodeId);
+ MessageSubject subject = message.subject();
+ ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
+ if (message.subject() == MessageSubject.NEW_MEMBER) {
+ log.info("Node {} arrived", cmm.nodeId());
+ nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
+
+ } else if (subject == MessageSubject.LEAVING_MEMBER) {
+ log.info("Node {} is leaving", cmm.nodeId());
+ nodesDelegate.nodeRemoved(cmm.nodeId());
+ }
}
}
}