Fixed issue with recieving side not checking readability of channel before reading
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 54da8cc..4a755cc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -22,7 +22,6 @@
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.pool.KeyedObjectPool;
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.slf4j.Logger;
@@ -38,8 +37,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private KeyedObjectPool<Endpoint, Channel> channels =
-            new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+    private GenericKeyedObjectPool<Endpoint, Channel> channels;
+
     private final int port;
     private final EventLoopGroup bossGroup = new NioEventLoopGroup();
     private final EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -66,6 +65,9 @@
     }
 
     public void activate() throws Exception {
+        channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+        channels.setTestOnBorrow(true);
+        channels.setTestOnReturn(true);
         responseFutures = CacheBuilder.newBuilder()
                 .maximumSize(100000)
                 .weakValues()
@@ -95,17 +97,14 @@
     protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
         Channel channel = null;
         try {
-            channel = channels.borrowObject(ep);
-            channel.eventLoop().execute(new WriteTask(channel, message));
+            try {
+                channel = channels.borrowObject(ep);
+                channel.eventLoop().execute(new WriteTask(channel, message));
+            } finally {
+                channels.returnObject(ep, channel);
+            }
         } catch (Exception e) {
             throw new IOException(e);
-        } finally {
-            try {
-                channels.returnObject(ep, channel);
-            } catch (Exception e) {
-                log.warn("Error returning object back to the pool", e);
-                // ignored.
-            }
         }
     }
 
@@ -141,6 +140,8 @@
 
     private void startAcceptingConnections() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
+        b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+        b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
         b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
@@ -169,6 +170,8 @@
         public Channel makeObject(Endpoint ep) throws Exception {
             Bootstrap b = new Bootstrap();
             b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+            b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
             b.group(workerGroup);
             // TODO: Make this faster:
             // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -197,20 +200,20 @@
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
-                .addLast(new MessageEncoder(serializer))
-                .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
-                .addLast(new NettyMessagingService.InboundMessageDispatcher());
+                .addLast("encoder", new MessageEncoder(serializer))
+                .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
+                .addLast("handler", new InboundMessageDispatcher());
         }
     }
 
     private class WriteTask implements Runnable {
 
-        private final Object message;
+        private final InternalMessage message;
         private final Channel channel;
 
-        public WriteTask(Channel channel, Object message) {
-            this.message = message;
+        public WriteTask(Channel channel, InternalMessage message) {
             this.channel = channel;
+            this.message = message;
         }
 
         @Override
@@ -240,5 +243,11 @@
             MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
             handler.handle(message);
         }
+
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+            context.close();
+        }
     }
 }