Fixed issue with recieving side not checking readability of channel before reading
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
index 313a448..ddfeeb6 100644
--- a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
@@ -9,7 +9,7 @@
 
     @Override
     public void handle(Message message) throws IOException {
-        System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
+        System.out.println("Received message. Echoing it back to the sender.");
         message.respond(message.payload());
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index 8681093..1482db4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -8,6 +8,14 @@
     private final int port;
     private final String host;
 
+    /**
+     * Used for serialization.
+     */
+    private Endpoint() {
+        port = 0;
+        host = null;
+    }
+
     public Endpoint(String host, int port) {
         this.host = host;
         this.port = port;
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index bcf6f52..96cbe79 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -35,6 +35,10 @@
         return payload;
     }
 
+    protected void setMessagingService(NettyMessagingService messagingService) {
+        this.messagingService = messagingService;
+    }
+
     @Override
     public void respond(Object data) throws IOException {
         Builder builder = new Builder(messagingService);
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 73c01a0..c07d289 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -1,9 +1,8 @@
 package org.onlab.netty;
 
 import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 
@@ -12,8 +11,6 @@
  */
 public class KryoSerializer implements Serializer {
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
     private KryoPool serializerPool;
 
     public KryoSerializer() {
@@ -28,7 +25,9 @@
         serializerPool = KryoPool.newBuilder()
                 .register(ArrayList.class,
                           HashMap.class,
-                          ArrayList.class
+                          ArrayList.class,
+                          InternalMessage.class,
+                          Endpoint.class
                 )
                 .build()
                 .populate(1);
@@ -36,7 +35,7 @@
 
 
     @Override
-    public Object decode(byte[] data) {
+    public <T> T decode(byte[] data) {
         return serializerPool.deserialize(data);
     }
 
@@ -44,4 +43,14 @@
     public byte[] encode(Object payload) {
         return serializerPool.serialize(payload);
     }
+
+    @Override
+    public <T> T deserialize(ByteBuffer buffer) {
+        return serializerPool.deserialize(buffer);
+    }
+
+    @Override
+    public void serialize(Object obj, ByteBuffer buffer) {
+        serializerPool.serialize(obj, buffer);
+    }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index ecf2d62..7997ef0 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -1,18 +1,15 @@
 package org.onlab.netty;
 
+import static com.google.common.base.Preconditions.checkState;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
 import java.util.Arrays;
 import java.util.List;
 
-import static com.google.common.base.Preconditions.checkState;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-/**
- * Decode bytes into a InternalMessage.
- */
-public class MessageDecoder extends ByteToMessageDecoder {
+// TODO: Implement performance enchancements such as those described in the javadoc for ReplayingDecoder.
+public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
 
     private final NettyMessagingService messagingService;
     private final Serializer serializer;
@@ -23,36 +20,21 @@
     }
 
     @Override
-    protected void decode(ChannelHandlerContext context, ByteBuf in,
-            List<Object> messages) throws Exception {
+    protected void decode(
+            ChannelHandlerContext context,
+            ByteBuf buffer,
+            List<Object> out) throws Exception {
 
-        byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
+        byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
+        buffer.readBytes(preamble);
         checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
 
-        // read message Id.
-        long id = in.readLong();
+        int bodySize = buffer.readInt();
+        byte[] body = new byte[bodySize];
+        buffer.readBytes(body);
 
-        // read message type; first read size and then bytes.
-        String type = new String(in.readBytes(in.readInt()).array());
-
-        // read sender host name; first read size and then bytes.
-        String host = new String(in.readBytes(in.readInt()).array());
-
-        // read sender port.
-        int port = in.readInt();
-
-        Endpoint sender = new Endpoint(host, port);
-
-        // read message payload; first read size and then bytes.
-        Object payload = serializer.decode(in.readBytes(in.readInt()).array());
-
-        InternalMessage message = new InternalMessage.Builder(messagingService)
-                .withId(id)
-                .withSender(sender)
-                .withType(type)
-                .withPayload(payload)
-                .build();
-
-        messages.add(message);
+        InternalMessage message = serializer.decode(body);
+        message.setMessagingService(messagingService);
+        out.add(message);
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 1b52a0f..63c1dbc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -19,42 +19,20 @@
     }
 
     @Override
-    protected void encode(ChannelHandlerContext context, InternalMessage message,
+    protected void encode(
+            ChannelHandlerContext context,
+            InternalMessage message,
             ByteBuf out) throws Exception {
 
         // write preamble
         out.writeBytes(PREAMBLE);
 
-        // write id
-        out.writeLong(message.id());
+        byte[] payload = serializer.encode(message);
 
-        // write type length
-        out.writeInt(message.type().length());
-
-        // write type
-        out.writeBytes(message.type().getBytes());
-
-        // write sender host name size
-        out.writeInt(message.sender().host().length());
-
-        // write sender host name.
-        out.writeBytes(message.sender().host().getBytes());
-
-        // write port
-        out.writeInt(message.sender().port());
-
-        try {
-            serializer.encode(message.payload());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        byte[] payload = serializer.encode(message.payload());
-
-        // write payload length.
+        // write payload length
         out.writeInt(payload.length);
 
-        // write payload bytes
+        // write payload.
         out.writeBytes(payload);
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 54da8cc..4a755cc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -22,7 +22,6 @@
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.pool.KeyedObjectPool;
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.slf4j.Logger;
@@ -38,8 +37,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private KeyedObjectPool<Endpoint, Channel> channels =
-            new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+    private GenericKeyedObjectPool<Endpoint, Channel> channels;
+
     private final int port;
     private final EventLoopGroup bossGroup = new NioEventLoopGroup();
     private final EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -66,6 +65,9 @@
     }
 
     public void activate() throws Exception {
+        channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+        channels.setTestOnBorrow(true);
+        channels.setTestOnReturn(true);
         responseFutures = CacheBuilder.newBuilder()
                 .maximumSize(100000)
                 .weakValues()
@@ -95,17 +97,14 @@
     protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
         Channel channel = null;
         try {
-            channel = channels.borrowObject(ep);
-            channel.eventLoop().execute(new WriteTask(channel, message));
+            try {
+                channel = channels.borrowObject(ep);
+                channel.eventLoop().execute(new WriteTask(channel, message));
+            } finally {
+                channels.returnObject(ep, channel);
+            }
         } catch (Exception e) {
             throw new IOException(e);
-        } finally {
-            try {
-                channels.returnObject(ep, channel);
-            } catch (Exception e) {
-                log.warn("Error returning object back to the pool", e);
-                // ignored.
-            }
         }
     }
 
@@ -141,6 +140,8 @@
 
     private void startAcceptingConnections() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
+        b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+        b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
         b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
@@ -169,6 +170,8 @@
         public Channel makeObject(Endpoint ep) throws Exception {
             Bootstrap b = new Bootstrap();
             b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+            b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
             b.group(workerGroup);
             // TODO: Make this faster:
             // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -197,20 +200,20 @@
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
-                .addLast(new MessageEncoder(serializer))
-                .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
-                .addLast(new NettyMessagingService.InboundMessageDispatcher());
+                .addLast("encoder", new MessageEncoder(serializer))
+                .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
+                .addLast("handler", new InboundMessageDispatcher());
         }
     }
 
     private class WriteTask implements Runnable {
 
-        private final Object message;
+        private final InternalMessage message;
         private final Channel channel;
 
-        public WriteTask(Channel channel, Object message) {
-            this.message = message;
+        public WriteTask(Channel channel, InternalMessage message) {
             this.channel = channel;
+            this.message = message;
         }
 
         @Override
@@ -240,5 +243,11 @@
             MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
             handler.handle(message);
         }
+
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+            context.close();
+        }
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
index ac55f5a..56494b2 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Serializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
@@ -1,5 +1,7 @@
 package org.onlab.netty;
 
+import java.nio.ByteBuffer;
+
 /**
  * Interface for encoding/decoding message payloads.
  */
@@ -11,7 +13,7 @@
      * @param data byte array.
      * @return POJO
      */
-    Object decode(byte[] data);
+    public <T> T decode(byte[] data);
 
     /**
      * Encodes the specified POJO into a byte array.
@@ -19,6 +21,23 @@
      * @param data POJO to be encoded
      * @return byte array.
      */
-    byte[] encode(Object message);
+    public byte[] encode(Object data);
 
+    /**
+     * Serializes the specified object into bytes using one of the
+     * pre-registered serializers.
+     *
+     * @param obj object to be serialized
+     * @param buffer to write serialized bytes
+     */
+    public void serialize(final Object obj, ByteBuffer buffer);
+
+    /**
+     * Deserializes the specified bytes into an object using one of the
+     * pre-registered serializers.
+     *
+     * @param buffer bytes to be deserialized
+     * @return deserialized object
+     */
+    public <T> T deserialize(final ByteBuffer buffer);
 }