Kryo related fixes
- KryoNamespace to allow control over registration id
Change-Id: Idc2a0e27a09916657c725ee97e4366109144cc66
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 435225c..6118c49 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -17,11 +17,13 @@
import org.onlab.util.KryoNamespace;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
-//FIXME: Should be move out to test or app
+import java.nio.ByteBuffer;
+
/**
* Kryo Serializer.
*/
@@ -37,17 +39,11 @@
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
- // FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoNamespace.newBuilder()
- .register(ArrayList.class,
- HashMap.class,
- ArrayList.class,
- InternalMessage.class,
- Endpoint.class,
- byte[].class
- )
- .build()
- .populate(1);
+ .register(byte[].class)
+ .register(new InternalMessageSerializer(), InternalMessage.class)
+ .register(new EndPointSerializer(), Endpoint.class)
+ .build();
}
@@ -66,4 +62,45 @@
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
+
+ public static final class InternalMessageSerializer
+ extends Serializer<InternalMessage> {
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalMessage object) {
+ output.writeLong(object.id());
+ kryo.writeClassAndObject(output, object.sender());
+ output.writeString(object.type());
+ output.writeInt(object.payload().length, true);
+ output.writeBytes(object.payload());
+ }
+
+ @Override
+ public InternalMessage read(Kryo kryo, Input input,
+ Class<InternalMessage> type) {
+ long id = input.readLong();
+ Endpoint sender = (Endpoint) kryo.readClassAndObject(input);
+ String msgtype = input.readString();
+ int length = input.readInt(true);
+ byte[] payload = input.readBytes(length);
+ return new InternalMessage(id, sender, msgtype, payload);
+ }
+
+ }
+
+ public static final class EndPointSerializer extends Serializer<Endpoint> {
+
+ @Override
+ public void write(Kryo kryo, Output output, Endpoint object) {
+ output.writeString(object.host());
+ output.writeInt(object.port());
+ }
+
+ @Override
+ public Endpoint read(Kryo kryo, Input input, Class<Endpoint> type) {
+ String host = input.readString();
+ int port = input.readInt();
+ return new Endpoint(host, port);
+ }
+ }
}