Netty performance improvements
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index ad88b62..5581747 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -1,12 +1,14 @@
package org.onlab.netty;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Encode InternalMessage out into a byte buffer.
*/
+@Sharable
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// onosiscool in ascii
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 8a609bd..f0c4861 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -11,6 +11,7 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -37,14 +38,19 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private GenericKeyedObjectPool<Endpoint, Channel> channels;
-
private final int port;
+ private final Endpoint localEp;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
- private Cache<Long, AsyncResponse<?>> responseFutures;
- private final Endpoint localEp;
+ private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder()
+ .maximumSize(100000)
+ .weakValues()
+ // TODO: Once the entry expires, notify blocking threads (if any).
+ .expireAfterWrite(10, TimeUnit.MINUTES)
+ .build();
+ private final GenericKeyedObjectPool<Endpoint, Channel> channels
+ = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
protected Serializer serializer;
@@ -65,15 +71,8 @@
}
public void activate() throws Exception {
- channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
- responseFutures = CacheBuilder.newBuilder()
- .maximumSize(100000)
- .weakValues()
- // TODO: Once the entry expires, notify blocking threads (if any).
- .expireAfterWrite(10, TimeUnit.MINUTES)
- .build();
startAcceptingConnections();
}
@@ -145,7 +144,8 @@
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
- b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
+ b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ // TODO: Need JVM options to configure PooledByteBufAllocator.
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
@@ -172,19 +172,18 @@
@Override
public Channel makeObject(Endpoint ep) throws Exception {
- Bootstrap b = new Bootstrap();
- b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
- b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
- b.group(workerGroup);
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ bootstrap.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.handler(new OnosCommunicationChannelInitializer());
-
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
- ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
+ ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
return f.channel();
}
@@ -201,12 +200,15 @@
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+ private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+ private final ChannelHandler encoder = new MessageEncoder(serializer);
+
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
- .addLast("encoder", new MessageEncoder(serializer))
+ .addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
- .addLast("handler", new InboundMessageDispatcher());
+ .addLast("handler", dispatcher);
}
}
@@ -222,10 +224,11 @@
@Override
public void run() {
- channel.writeAndFlush(message);
+ channel.writeAndFlush(message, channel.voidPromise());
}
}
+ @ChannelHandler.Sharable
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
@Override
@@ -248,7 +251,6 @@
handler.handle(message);
}
-
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
context.close();