Integrated Kryo serializers with the communications manager and IO loop stuff.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
index 8acc8fe..bac32e9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
@@ -13,6 +13,7 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
@@ -83,9 +84,11 @@
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
+ private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
@Activate
public void activate() {
+ addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
log.info("Activated but waiting for delegate");
}
@@ -102,9 +105,20 @@
}
@Override
+ public boolean send(ClusterMessage message) {
+ boolean ok = true;
+ for (DefaultControllerNode node : nodes) {
+ if (!node.equals(localNode)) {
+ ok = send(message, node.id()) && ok;
+ }
+ }
+ return ok;
+ }
+
+ @Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
ClusterMessageStream stream = streams.get(toNodeId);
- if (stream != null) {
+ if (stream != null && !toNodeId.equals(localNode.id())) {
try {
stream.write(message);
return true;
@@ -140,6 +154,7 @@
@Override
public void removeNode(DefaultControllerNode node) {
+ send(new GoodbyeMessage(node.id()));
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
@@ -159,6 +174,16 @@
log.info("Started");
}
+ @Override
+ public void clearAllNodesAndStreams() {
+ nodes.clear();
+ send(new GoodbyeMessage(localNode.id()));
+ for (ClusterMessageStream stream : streams.values()) {
+ stream.close();
+ }
+ streams.clear();
+ }
+
/**
* Dispatches the specified message to all subscribers to its subject.
*
@@ -304,4 +329,11 @@
}
}
+ private class GoodbyeSubscriber implements MessageSubscriber {
+ @Override
+ public void receive(ClusterMessage message, NodeId fromNodeId) {
+ log.info("Received goodbye message from {}", fromNodeId);
+ nodesDelegate.nodeRemoved(fromNodeId);
+ }
+ }
}