Limit the amont of work that happens on netty event loop threads.
Currently we are kryo serializing/deserializing the message envelope which can potentially limit throughput.

Change-Id: I0ae9dab53bbb765b7618ceaefda1edf4f77b0b59
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 51f8dd3..1697539 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -20,9 +20,10 @@
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ReplayingDecoder;
 
-import java.util.Arrays;
 import java.util.List;
 
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,12 +36,15 @@
 
     private final NettyMessagingService messagingService;
 
-    private static final KryoSerializer SERIALIZER = new KryoSerializer();
-
+    private long messageId;
+    private Version ipVersion;
+    private IpAddress senderIp;
+    private int senderPort;
     private int contentLength;
+    private long messageType;
 
     public MessageDecoder(NettyMessagingService messagingService) {
-        super(DecoderState.READ_HEADER_VERSION);
+        super(DecoderState.READ_MESSAGE_ID);
         this.messagingService = messagingService;
     }
 
@@ -51,27 +55,37 @@
             List<Object> out) throws Exception {
 
         switch (state()) {
-        case READ_HEADER_VERSION:
-            int headerVersion = buffer.readInt();
-            checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
-            checkpoint(DecoderState.READ_PREAMBLE);
-        case READ_PREAMBLE:
-            byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
-            buffer.readBytes(preamble);
-            checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+        case READ_MESSAGE_ID:
+            messageId = buffer.readLong();
+            checkpoint(DecoderState.READ_SENDER_IP_VERSION);
+        case READ_SENDER_IP_VERSION:
+            ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
+            checkpoint(DecoderState.READ_SENDER_IP);
+        case READ_SENDER_IP:
+            byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
+            buffer.readBytes(octects);
+            senderIp = IpAddress.valueOf(ipVersion, octects);
+            checkpoint(DecoderState.READ_SENDER_PORT);
+        case READ_SENDER_PORT:
+            senderPort = buffer.readInt();
+            checkpoint(DecoderState.READ_MESSAGE_TYPE);
+        case READ_MESSAGE_TYPE:
+            messageType = buffer.readLong();
             checkpoint(DecoderState.READ_CONTENT_LENGTH);
         case READ_CONTENT_LENGTH:
             contentLength = buffer.readInt();
-            checkpoint(DecoderState.READ_SERIALIZER_VERSION);
-        case READ_SERIALIZER_VERSION:
-            int serializerVersion = buffer.readInt();
-            checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
             checkpoint(DecoderState.READ_CONTENT);
         case READ_CONTENT:
-            InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
+            byte[] payload = new byte[contentLength];
+            buffer.readBytes(payload);
+            InternalMessage message = new InternalMessage(
+                    messageId,
+                    new Endpoint(senderIp, senderPort),
+                    messageType,
+                    payload);
             message.setMessagingService(messagingService);
             out.add(message);
-            checkpoint(DecoderState.READ_HEADER_VERSION);
+            checkpoint(DecoderState.READ_MESSAGE_ID);
             break;
          default:
             checkState(false, "Must not be here");