Bypass netty stack for messages that are sent to self
Change-Id: Ifb1fd610892bd22a291cda472a8a5ef7a1dcfe6d
Manual serde for ClusterMessage to avoid one additional kryo serialization overhead for each message sent/received
Change-Id: I08d9a2c10403b0e9e9e1736c6bd36fa008bb8db0
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index f11a513..317f01a 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -87,9 +87,7 @@
.build();
private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
- .softValues()
.build(new CacheLoader<String, Long>() {
-
@Override
public Long load(String type) {
return hashToLong(type);
@@ -171,6 +169,10 @@
}
protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
+ if (ep.equals(localEp)) {
+ dispatchLocally(message);
+ return;
+ }
Channel channel = null;
try {
try {
@@ -329,29 +331,7 @@
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
- long type = message.type();
- if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
- try {
- SettableFuture<byte[]> futureResponse =
- NettyMessagingService.this.responseFutures.getIfPresent(message.id());
- if (futureResponse != null) {
- futureResponse.set(message.payload());
- } else {
- log.warn("Received a reply for message id:[{}]. "
- + " from {}. But was unable to locate the"
- + " request handle", message.id(), message.sender());
- }
- } finally {
- NettyMessagingService.this.responseFutures.invalidate(message.id());
- }
- return;
- }
- MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
- if (handler != null) {
- handler.handle(message);
- } else {
- log.debug("No handler registered for {}", type);
- }
+ dispatchLocally(message);
}
@Override
@@ -361,6 +341,32 @@
}
}
+ private void dispatchLocally(InternalMessage message) throws IOException {
+ long type = message.type();
+ if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
+ try {
+ SettableFuture<byte[]> futureResponse =
+ NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+ if (futureResponse != null) {
+ futureResponse.set(message.payload());
+ } else {
+ log.warn("Received a reply for message id:[{}]. "
+ + " from {}. But was unable to locate the"
+ + " request handle", message.id(), message.sender());
+ }
+ } finally {
+ NettyMessagingService.this.responseFutures.invalidate(message.id());
+ }
+ return;
+ }
+ MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+ if (handler != null) {
+ handler.handle(message);
+ } else {
+ log.debug("No handler registered for {}", type);
+ }
+ }
+
/**
* Returns the md5 hash of the specified input string as a long.
* @param input input string.