ClusterCommunicationManager bugfixes
Change-Id: I0cb433bf2197c7c745733657607d9e62bb23567d
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7b05401..3a9fd8d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -26,6 +26,7 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
@@ -50,8 +51,6 @@
private ClusterService clusterService;
private ClusterNodesDelegate nodesDelegate;
- // FIXME: `members` should go away and should be using ClusterService
- private Map<NodeId, ControllerNode> members = new HashMap<>();
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
@@ -59,11 +58,14 @@
private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
- .register(ClusterMessage.class)
+ .register(ClusterMessage.class, new ClusterMessageSerializer())
.register(ClusterMembershipEvent.class)
+ .register(byte[].class)
+ .register(MessageSubject.class)
.build()
.populate(1);
}
@@ -73,7 +75,15 @@
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
- messagingService = new NettyMessagingService(localNode.tcpPort());
+ NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
+ // FIXME: workaround until it becomes a service.
+ try {
+ netty.activate();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ log.error("NettyMessagingService#activate", e);
+ }
+ messagingService = netty;
log.info("Started");
}
@@ -86,7 +96,7 @@
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
- for (ControllerNode node : members.values()) {
+ for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
}
@@ -107,7 +117,7 @@
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- ControllerNode node = members.get(toNodeId);
+ ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
@@ -137,7 +147,7 @@
@Override
public void addNode(ControllerNode node) {
- members.put(node.id(), node);
+ //members.put(node.id(), node);
}
@Override
@@ -146,7 +156,7 @@
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
- members.remove(node.id());
+ //members.remove(node.id());
}
// Sends a heart beat to all peers.