Fixed issue with recieving side not checking readability of channel before reading
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 54da8cc..4a755cc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -22,7 +22,6 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.pool.KeyedObjectPool;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
@@ -38,8 +37,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private KeyedObjectPool<Endpoint, Channel> channels =
- new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+ private GenericKeyedObjectPool<Endpoint, Channel> channels;
+
private final int port;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -66,6 +65,9 @@
}
public void activate() throws Exception {
+ channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+ channels.setTestOnBorrow(true);
+ channels.setTestOnReturn(true);
responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
@@ -95,17 +97,14 @@
protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
Channel channel = null;
try {
- channel = channels.borrowObject(ep);
- channel.eventLoop().execute(new WriteTask(channel, message));
+ try {
+ channel = channels.borrowObject(ep);
+ channel.eventLoop().execute(new WriteTask(channel, message));
+ } finally {
+ channels.returnObject(ep, channel);
+ }
} catch (Exception e) {
throw new IOException(e);
- } finally {
- try {
- channels.returnObject(ep, channel);
- } catch (Exception e) {
- log.warn("Error returning object back to the pool", e);
- // ignored.
- }
}
}
@@ -141,6 +140,8 @@
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
+ b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+ b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
@@ -169,6 +170,8 @@
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap b = new Bootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+ b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
b.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -197,20 +200,20 @@
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
- .addLast(new MessageEncoder(serializer))
- .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
- .addLast(new NettyMessagingService.InboundMessageDispatcher());
+ .addLast("encoder", new MessageEncoder(serializer))
+ .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
+ .addLast("handler", new InboundMessageDispatcher());
}
}
private class WriteTask implements Runnable {
- private final Object message;
+ private final InternalMessage message;
private final Channel channel;
- public WriteTask(Channel channel, Object message) {
- this.message = message;
+ public WriteTask(Channel channel, InternalMessage message) {
this.channel = channel;
+ this.message = message;
}
@Override
@@ -240,5 +243,11 @@
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
handler.handle(message);
}
+
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ context.close();
+ }
}
}