Improved message decoding performance
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
index b2b490e..f4024a4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
+++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
@@ -17,8 +17,8 @@
     private final long start = System.nanoTime();
 
     @Override
-    public T get(long timeout, TimeUnit tu) throws TimeoutException {
-        timeout = tu.toNanos(timeout);
+    public T get(long timeout, TimeUnit timeUnit) throws TimeoutException {
+        timeout = timeUnit.toNanos(timeout);
         boolean interrupted = false;
         try {
             synchronized (this) {
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
index ddfeeb6..6ba4bdf 100644
--- a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
@@ -2,14 +2,19 @@
 
 import java.io.IOException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Message handler that echos the message back to the sender.
  */
 public class EchoHandler implements MessageHandler {
 
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     @Override
     public void handle(Message message) throws IOException {
-        System.out.println("Received message. Echoing it back to the sender.");
+        log.info("Received message. Echoing it back to the sender.");
         message.respond(message.payload());
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index 1482db4..8ed86ae 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -11,6 +11,7 @@
     /**
      * Used for serialization.
      */
+    @SuppressWarnings("unused")
     private Endpoint() {
         port = 0;
         host = null;
@@ -67,4 +68,4 @@
         }
         return true;
     }
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index 96cbe79..e6c027e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -86,4 +86,4 @@
             return message;
         }
     }
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index c07d289..8a90c07 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -53,4 +53,4 @@
     public void serialize(Object obj, ByteBuffer buffer) {
         serializerPool.serialize(obj, buffer);
     }
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
index ed6cdb4..23c4073 100644
--- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
@@ -1,12 +1,17 @@
 package org.onlab.netty;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A MessageHandler that simply logs the information.
  */
 public class LoggingHandler implements MessageHandler {
 
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     @Override
     public void handle(Message message) {
-        System.out.println("Received: " + message.payload());
+        log.info("Received message. Payload: " + message.payload());
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 7997ef0..3ed3216 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -8,13 +8,18 @@
 import java.util.Arrays;
 import java.util.List;
 
-// TODO: Implement performance enchancements such as those described in the javadoc for ReplayingDecoder.
-public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
+/**
+ * Decoder for inbound messages.
+ */
+public class MessageDecoder extends ReplayingDecoder<DecoderState> {
 
     private final NettyMessagingService messagingService;
     private final Serializer serializer;
 
+    private int contentLength;
+
     public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+        super(DecoderState.READ_HEADER_VERSION);
         this.messagingService = messagingService;
         this.serializer = serializer;
     }
@@ -25,16 +30,31 @@
             ByteBuf buffer,
             List<Object> out) throws Exception {
 
-        byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
-        buffer.readBytes(preamble);
-        checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
-
-        int bodySize = buffer.readInt();
-        byte[] body = new byte[bodySize];
-        buffer.readBytes(body);
-
-        InternalMessage message = serializer.decode(body);
-        message.setMessagingService(messagingService);
-        out.add(message);
+        switch(state()) {
+        case READ_HEADER_VERSION:
+            int headerVersion = buffer.readInt();
+            checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
+            checkpoint(DecoderState.READ_PREAMBLE);
+        case READ_PREAMBLE:
+            byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
+            buffer.readBytes(preamble);
+            checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+            checkpoint(DecoderState.READ_CONTENT_LENGTH);
+        case READ_CONTENT_LENGTH:
+            contentLength = buffer.readInt();
+            checkpoint(DecoderState.READ_SERIALIZER_VERSION);
+        case READ_SERIALIZER_VERSION:
+            int serializerVersion = buffer.readInt();
+            checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
+            checkpoint(DecoderState.READ_CONTENT);
+        case READ_CONTENT:
+            InternalMessage message = serializer.deserialize(buffer.readBytes(contentLength).nioBuffer());
+            message.setMessagingService(messagingService);
+            out.add(message);
+            checkpoint(DecoderState.READ_HEADER_VERSION);
+            break;
+         default:
+            checkState(false, "Must not be here");
+        }
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 63c1dbc..ad88b62 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -11,6 +11,9 @@
 
     // onosiscool in ascii
     public static final byte[] PREAMBLE = "onosiscool".getBytes();
+    public static final int HEADER_VERSION = 1;
+    public static final int SERIALIZER_VERSION = 1;
+
 
     private final Serializer serializer;
 
@@ -24,6 +27,9 @@
             InternalMessage message,
             ByteBuf out) throws Exception {
 
+        // write version
+        out.writeInt(HEADER_VERSION);
+
         // write preamble
         out.writeBytes(PREAMBLE);
 
@@ -32,6 +38,9 @@
         // write payload length
         out.writeInt(payload.length);
 
+        // write serializer version
+        out.writeInt(SERIALIZER_VERSION);
+
         // write payload.
         out.writeBytes(payload);
     }
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 4a755cc..8a609bd 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -134,6 +134,10 @@
         handlers.remove(type);
     }
 
+    public void setSerializer(Serializer serializer) {
+        this.serializer = serializer;
+    }
+
     private MessageHandler getMessageHandler(String type) {
         return handlers.get(type);
     }
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
index 12fa025..6a93149 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -4,16 +4,10 @@
     private SimpleServer() {}
 
     public static void main(String... args) throws Exception {
-        NettyMessagingService server = new TestNettyMessagingService();
+        NettyMessagingService server = new NettyMessagingService(8080);
         server.activate();
+        server.setSerializer(new KryoSerializer());
         server.registerHandler("simple", new LoggingHandler());
         server.registerHandler("echo", new EchoHandler());
     }
-
-    public static class TestNettyMessagingService extends NettyMessagingService {
-        protected TestNettyMessagingService() {
-            Serializer serializer = new KryoSerializer();
-            this.serializer = serializer;
-        }
-    }
 }