Simplified how message payloads get serialized/deserialized
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 98be0b1..babe4d3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -23,6 +23,9 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
@@ -48,6 +51,18 @@
//@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MessagingService messagingService;
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(ClusterMessage.class)
+ .register(ClusterMembershipEvent.class)
+ .build()
+ .populate(1);
+ }
+
+ };
+
@Activate
public void activate() {
// TODO: initialize messagingService
@@ -92,7 +107,7 @@
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- messagingService.sendAsync(nodeEp, message.subject().value(), message);
+ messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
return true;
} catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -126,7 +141,7 @@
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
+ SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
members.remove(node.id());
}
@@ -138,7 +153,7 @@
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+ SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
}
}
@@ -147,7 +162,7 @@
@Override
public void handle(ClusterMessage message) {
- ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+ ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
ControllerNode node = event.node();
if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
log.info("Node {} sent a hearbeat", node.id());
@@ -172,7 +187,8 @@
@Override
public void handle(Message message) {
- handler.handle((ClusterMessage) message.payload());
+ ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ handler.handle(clusterMessage);
}
}
}