Kryo related fixes

- KryoNamespace to allow control over registration id

Change-Id: Idc2a0e27a09916657c725ee97e4366109144cc66
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index ca24e4f..50bc58d 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -22,20 +22,11 @@
 /**
  * Representation of a TCP/UDP communication end point.
  */
-public class Endpoint {
+public final class Endpoint {
 
     private final int port;
     private final String host;
 
-    /**
-     * Used for serialization.
-     */
-    @SuppressWarnings("unused")
-    private Endpoint() {
-        port = 0;
-        host = null;
-    }
-
     public Endpoint(String host, int port) {
         this.host = host;
         this.port = port;
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index a981c34..40b529e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -34,6 +34,13 @@
     // Must be created using the Builder.
     private InternalMessage() {}
 
+    InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+        this.id = id;
+        this.sender = sender;
+        this.type = type;
+        this.payload = payload;
+    }
+
     public long id() {
         return id;
     }
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 435225c..6118c49 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -17,11 +17,13 @@
 
 import org.onlab.util.KryoNamespace;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 
-//FIXME: Should be move out to test or app
+import java.nio.ByteBuffer;
+
 /**
  * Kryo Serializer.
  */
@@ -37,17 +39,11 @@
      * Sets up the common serialzers pool.
      */
     protected void setupKryoPool() {
-        // FIXME Slice out types used in common to separate pool/namespace.
         serializerPool = KryoNamespace.newBuilder()
-                .register(ArrayList.class,
-                          HashMap.class,
-                          ArrayList.class,
-                          InternalMessage.class,
-                          Endpoint.class,
-                          byte[].class
-                )
-                .build()
-                .populate(1);
+                .register(byte[].class)
+                .register(new InternalMessageSerializer(), InternalMessage.class)
+                .register(new EndPointSerializer(), Endpoint.class)
+                .build();
     }
 
 
@@ -66,4 +62,45 @@
     public void encode(Object obj, ByteBuffer buffer) {
         serializerPool.serialize(obj, buffer);
     }
+
+    public static final class InternalMessageSerializer
+            extends Serializer<InternalMessage> {
+
+        @Override
+        public void write(Kryo kryo, Output output, InternalMessage object) {
+            output.writeLong(object.id());
+            kryo.writeClassAndObject(output, object.sender());
+            output.writeString(object.type());
+            output.writeInt(object.payload().length, true);
+            output.writeBytes(object.payload());
+        }
+
+        @Override
+        public InternalMessage read(Kryo kryo, Input input,
+                                    Class<InternalMessage> type) {
+            long id = input.readLong();
+            Endpoint sender = (Endpoint) kryo.readClassAndObject(input);
+            String msgtype = input.readString();
+            int length = input.readInt(true);
+            byte[] payload = input.readBytes(length);
+            return new InternalMessage(id, sender, msgtype, payload);
+        }
+
+    }
+
+    public static final class EndPointSerializer extends Serializer<Endpoint> {
+
+        @Override
+        public void write(Kryo kryo, Output output, Endpoint object) {
+            output.writeString(object.host());
+            output.writeInt(object.port());
+        }
+
+        @Override
+        public Endpoint read(Kryo kryo, Input input, Class<Endpoint> type) {
+            String host = input.readString();
+            int port = input.readInt();
+            return new Endpoint(host, port);
+        }
+    }
 }