renamed Serializer -> (onlab.netty-layer) PayloadSerializer
- Added TODO memos to ClusterCommunicationService layer
Change-Id: I4c81a72d03cddd23637f9c6cbf102125ea448c01
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index 5d04a46..dceb7c6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -12,6 +12,7 @@
private final NodeId sender;
private final MessageSubject subject;
private final Object payload;
+ // TODO: add field specifying Serializer for payload
/**
* Creates a cluster message.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index b260e1c..98be0b1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -12,8 +12,6 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
@@ -46,16 +44,23 @@
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ // TODO: This probably should not be a OSGi service.
+ //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MessagingService messagingService;
@Activate
public void activate() {
+ // TODO: initialize messagingService
+ // TODO: setPayloadSerializer, which is capable of
+ // (1) serialize ClusterMessage - ClusterMessage.payload
+ // (2) serialize ClusterMessage.payload using user specified serializer
+// messagingService.setPayloadSerializer(...);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ // TODO: cleanup messageingService if needed.
log.info("Stopped");
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
new file mode 100644
index 0000000..dbd88c3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -0,0 +1,43 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
+
+ public ClusterMessageSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ClusterMessage object) {
+ kryo.writeClassAndObject(output, object.sender());
+ kryo.writeClassAndObject(output, object.subject());
+ // TODO: write bytes serialized using ClusterMessage specified serializer
+ // write serialized payload size
+ //output.writeInt(...);
+ // write serialized payload
+ //output.writeBytes(...);
+ }
+
+ @Override
+ public ClusterMessage read(Kryo kryo, Input input,
+ Class<ClusterMessage> type) {
+ // TODO Auto-generated method stub
+ NodeId sender = (NodeId) kryo.readClassAndObject(input);
+ MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
+ int size = input.readInt();
+ byte[] payloadBytes = input.readBytes(size);
+ // TODO: deserialize payload using ClusterMessage specified serializer
+ Object payload = null;
+ return new ClusterMessage(sender, subject, payload);
+ }
+
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 6df0b23..cd34e0d 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -9,7 +9,7 @@
/**
* Kryo Serializer.
*/
-public class KryoSerializer implements Serializer {
+public class KryoSerializer implements PayloadSerializer {
private KryoPool serializerPool;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index a0d34a5..f199019 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -14,14 +14,14 @@
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
- private final Serializer serializer;
+ private final PayloadSerializer payloadSerializer;
private int contentLength;
- public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+ public MessageDecoder(NettyMessagingService messagingService, PayloadSerializer payloadSerializer) {
super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService;
- this.serializer = serializer;
+ this.payloadSerializer = payloadSerializer;
}
@Override
@@ -48,7 +48,7 @@
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
- InternalMessage message = serializer.decode(buffer.readBytes(contentLength).nioBuffer());
+ InternalMessage message = payloadSerializer.decode(buffer.readBytes(contentLength).nioBuffer());
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION);
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 5581747..0ee29cb 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -17,10 +17,10 @@
public static final int SERIALIZER_VERSION = 1;
- private final Serializer serializer;
+ private final PayloadSerializer payloadSerializer;
- public MessageEncoder(Serializer serializer) {
- this.serializer = serializer;
+ public MessageEncoder(PayloadSerializer payloadSerializer) {
+ this.payloadSerializer = payloadSerializer;
}
@Override
@@ -35,12 +35,12 @@
// write preamble
out.writeBytes(PREAMBLE);
- byte[] payload = serializer.encode(message);
+ byte[] payload = payloadSerializer.encode(message);
// write payload length
out.writeInt(payload.length);
- // write serializer version
+ // write payloadSerializer version
out.writeInt(SERIALIZER_VERSION);
// write payload.
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index ebad442..d455c94 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -38,4 +38,11 @@
* @param type message type
*/
public void unregisterHandler(String type);
+
+ /**
+ * Specify the serializer to use for encoding/decoding payload.
+ *
+ * @param payloadSerializer payloadSerializer to use
+ */
+ public void setPayloadSerializer(PayloadSerializer payloadSerializer);
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index f0c4861..4c32164 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -52,7 +52,7 @@
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
- protected Serializer serializer;
+ protected PayloadSerializer payloadSerializer;
public NettyMessagingService() {
// TODO: Default port should be configurable.
@@ -133,8 +133,9 @@
handlers.remove(type);
}
- public void setSerializer(Serializer serializer) {
- this.serializer = serializer;
+ @Override
+ public void setPayloadSerializer(PayloadSerializer payloadSerializer) {
+ this.payloadSerializer = payloadSerializer;
}
private MessageHandler getMessageHandler(String type) {
@@ -201,13 +202,13 @@
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(serializer);
+ private final ChannelHandler encoder = new MessageEncoder(payloadSerializer);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
+ .addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer))
.addLast("handler", dispatcher);
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
similarity index 95%
rename from utils/netty/src/main/java/org/onlab/netty/Serializer.java
rename to utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
index 46550d4..9874543 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Serializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
@@ -5,7 +5,7 @@
/**
* Interface for encoding/decoding message payloads.
*/
-public interface Serializer {
+public interface PayloadSerializer {
/**
* Decodes the specified byte array to a POJO.
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
index 5ce8f2e..2c3f4d4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
@@ -44,8 +44,8 @@
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
- Serializer serializer = new KryoSerializer();
- this.serializer = serializer;
+ PayloadSerializer payloadSerializer = new KryoSerializer();
+ this.payloadSerializer = payloadSerializer;
}
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
index 6a93149..4d1db8e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -6,7 +6,7 @@
public static void main(String... args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8080);
server.activate();
- server.setSerializer(new KryoSerializer());
+ server.setPayloadSerializer(new KryoSerializer());
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}