Bypass netty stack for messages that are sent to self
Change-Id: Ifb1fd610892bd22a291cda472a8a5ef7a1dcfe6d
Manual serde for ClusterMessage to avoid one additional kryo serialization overhead for each message sent/received
Change-Id: I08d9a2c10403b0e9e9e1736c6bd36fa008bb8db0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index b27e009..59671c0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -28,7 +28,6 @@
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
-import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
@@ -36,10 +35,6 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.impl.ClusterMessageSerializer;
-import org.onosproject.store.serializers.impl.MessageSubjectSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,19 +57,6 @@
// TODO: This probably should not be a OSGi service.
private MessagingService messagingService;
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .register(new ClusterMessageSerializer(), ClusterMessage.class)
- .register(new MessageSubjectSerializer(), MessageSubject.class)
- .build();
- }
-
- };
-
@Activate
public void activate() {
ControllerNode localNode = clusterService.getLocalNode();
@@ -105,7 +87,7 @@
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
- byte[] payload = SERIALIZER.encode(message);
+ byte[] payload = message.getBytes();
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
@@ -117,7 +99,7 @@
@Override
public boolean broadcastIncludeSelf(ClusterMessage message) {
boolean ok = true;
- byte[] payload = SERIALIZER.encode(message);
+ byte[] payload = message.getBytes();
for (ControllerNode node : clusterService.getNodes()) {
ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
}
@@ -128,7 +110,7 @@
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
- byte[] payload = SERIALIZER.encode(message);
+ byte[] payload = message.getBytes();
for (NodeId nodeId : nodes) {
if (!nodeId.equals(localNode.id())) {
ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
@@ -139,7 +121,7 @@
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
- return unicast(message.subject(), SERIALIZER.encode(message), toNodeId);
+ return unicast(message.subject(), message.getBytes(), toNodeId);
}
private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
@@ -170,7 +152,7 @@
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
try {
- return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+ return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
} catch (IOException e) {
log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
@@ -209,7 +191,7 @@
public void handle(Message message) {
final ClusterMessage clusterMessage;
try {
- clusterMessage = SERIALIZER.decode(message.payload());
+ clusterMessage = ClusterMessage.fromBytes(message.payload());
} catch (Exception e) {
log.error("Failed decoding {}", message, e);
throw e;