Introduced a serializer interface and minimal kyro based implementation as a precursor to moving netty messaging out to onos-utils
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/KryoSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/KryoSerializer.java
new file mode 100644
index 0000000..5d809a4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/KryoSerializer.java
@@ -0,0 +1,47 @@
+package org.onlab.onos.store.messaging.impl;
+
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Kryo Serializer.
+ */
+public class KryoSerializer implements Serializer {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private KryoPool serializerPool;
+
+ public KryoSerializer() {
+ setupKryoPool();
+ }
+
+ /**
+ * Sets up the common serialzers pool.
+ */
+ protected void setupKryoPool() {
+ // FIXME Slice out types used in common to separate pool/namespace.
+ serializerPool = KryoPool.newBuilder()
+ .register(ArrayList.class,
+ HashMap.class,
+ ArrayList.class
+ )
+ .build()
+ .populate(1);
+ }
+
+
+ @Override
+ public Object decode(byte[] data) {
+ return serializerPool.deserialize(data);
+ }
+
+ @Override
+ public byte[] encode(Object payload) {
+ return serializerPool.serialize(payload);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/MessageDecoder.java
index 7f94015..59790f6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/MessageDecoder.java
@@ -5,7 +5,6 @@
import static com.google.common.base.Preconditions.checkState;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.messaging.Endpoint;
import io.netty.buffer.ByteBuf;
@@ -18,11 +17,11 @@
public class MessageDecoder extends ByteToMessageDecoder {
private final NettyMessagingService messagingService;
- private final SerializationService serializationService;
+ private final Serializer serializer;
- public MessageDecoder(NettyMessagingService messagingService, SerializationService serializationService) {
+ public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
this.messagingService = messagingService;
- this.serializationService = serializationService;
+ this.serializer = serializer;
}
@Override
@@ -47,7 +46,7 @@
Endpoint sender = new Endpoint(host, port);
// read message payload; first read size and then bytes.
- Object payload = serializationService.decode(in.readBytes(in.readInt()).array());
+ Object payload = serializer.decode(in.readBytes(in.readInt()).array());
InternalMessage message = new InternalMessage.Builder(messagingService)
.withId(id)
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/MessageEncoder.java
index b1c660c..501b70c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/MessageEncoder.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/MessageEncoder.java
@@ -1,7 +1,5 @@
package org.onlab.onos.store.messaging.impl;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
@@ -14,10 +12,10 @@
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
- private final SerializationService serializationService;
+ private final Serializer serializer;
- public MessageEncoder(SerializationService serializationService) {
- this.serializationService = serializationService;
+ public MessageEncoder(Serializer serializer) {
+ this.serializer = serializer;
}
@Override
@@ -46,12 +44,12 @@
out.writeInt(message.sender().port());
try {
- serializationService.encode(message.payload());
+ serializer.encode(message.payload());
} catch (Exception e) {
e.printStackTrace();
}
- byte[] payload = serializationService.encode(message.payload());
+ byte[] payload = serializer.encode(message.payload());
// write payload length.
out.writeInt(payload.length);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/NettyMessagingService.java b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/NettyMessagingService.java
index b6b3857..321e0ef 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/NettyMessagingService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/NettyMessagingService.java
@@ -31,7 +31,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.messaging.Endpoint;
import org.onlab.onos.store.messaging.MessageHandler;
import org.onlab.onos.store.messaging.MessagingService;
@@ -61,7 +60,7 @@
private final Endpoint localEp;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected SerializationService serializationService;
+ protected Serializer serializer;
public NettyMessagingService() {
// TODO: Default port should be configurable.
@@ -213,8 +212,8 @@
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
- .addLast(new MessageEncoder(serializationService))
- .addLast(new MessageDecoder(NettyMessagingService.this, serializationService))
+ .addLast(new MessageEncoder(serializer))
+ .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
.addLast(new NettyMessagingService.InboundMessageDispatcher());
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/Serializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/Serializer.java
new file mode 100644
index 0000000..d2da7cf
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/Serializer.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.messaging.impl;
+
+/**
+ * Interface for encoding/decoding message payloads.
+ */
+public interface Serializer {
+
+ /**
+ * Decodes the specified byte array to a POJO.
+ *
+ * @param data byte array.
+ * @return POJO
+ */
+ Object decode(byte[] data);
+
+ /**
+ * Encodes the specified POJO into a byte array.
+ *
+ * @param data POJO to be encoded
+ * @return byte array.
+ */
+ byte[] encode(Object message);
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/SimpleClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/SimpleClient.java
index 95753e7..746ecb2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/SimpleClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/SimpleClient.java
@@ -2,7 +2,6 @@
import java.util.concurrent.TimeUnit;
-import org.onlab.onos.store.cluster.impl.MessageSerializer;
import org.onlab.onos.store.messaging.Endpoint;
import org.onlab.onos.store.messaging.Response;
@@ -21,9 +20,8 @@
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
- MessageSerializer mgr = new MessageSerializer();
- mgr.activate();
- this.serializationService = mgr;
+ Serializer serializer = new KryoSerializer();
+ this.serializer = serializer;
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/SimpleServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/SimpleServer.java
index 1b331ba..96094b7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/SimpleServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/messaging/impl/SimpleServer.java
@@ -1,7 +1,5 @@
package org.onlab.onos.store.messaging.impl;
-import org.onlab.onos.store.cluster.impl.MessageSerializer;
-
public final class SimpleServer {
private SimpleServer() {}
@@ -14,9 +12,8 @@
public static class TestNettyMessagingService extends NettyMessagingService {
protected TestNettyMessagingService() {
- MessageSerializer mgr = new MessageSerializer();
- mgr.activate();
- this.serializationService = mgr;
+ Serializer serializer = new KryoSerializer();
+ this.serializer = serializer;
}
}
}