renamed Serializer -> (onlab.netty-layer) PayloadSerializer

- Added TODO memos to ClusterCommunicationService layer

Change-Id: I4c81a72d03cddd23637f9c6cbf102125ea448c01
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index 5d04a46..dceb7c6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -12,6 +12,7 @@
     private final NodeId sender;
     private final MessageSubject subject;
     private final Object payload;
+    // TODO: add field specifying Serializer for payload
 
     /**
      * Creates a cluster message.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index b260e1c..98be0b1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -12,8 +12,6 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.NodeId;
@@ -46,16 +44,23 @@
     private final Timer timer = new Timer("onos-controller-heatbeats");
     public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    // TODO: This probably should not be a OSGi service.
+    //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private MessagingService messagingService;
 
     @Activate
     public void activate() {
+        // TODO: initialize messagingService
+        // TODO: setPayloadSerializer, which is capable of
+        //  (1) serialize ClusterMessage - ClusterMessage.payload
+        //  (2) serialize ClusterMessage.payload using user specified serializer
+//        messagingService.setPayloadSerializer(...);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        // TODO: cleanup messageingService if needed.
         log.info("Stopped");
     }
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
new file mode 100644
index 0000000..dbd88c3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -0,0 +1,43 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
+
+    public ClusterMessageSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ClusterMessage object) {
+        kryo.writeClassAndObject(output, object.sender());
+        kryo.writeClassAndObject(output, object.subject());
+        // TODO: write bytes serialized using ClusterMessage specified serializer
+        // write serialized payload size
+        //output.writeInt(...);
+        // write serialized payload
+        //output.writeBytes(...);
+    }
+
+    @Override
+    public ClusterMessage read(Kryo kryo, Input input,
+                               Class<ClusterMessage> type) {
+        // TODO Auto-generated method stub
+        NodeId sender = (NodeId) kryo.readClassAndObject(input);
+        MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
+        int size = input.readInt();
+        byte[] payloadBytes = input.readBytes(size);
+        // TODO: deserialize payload using ClusterMessage specified serializer
+        Object payload = null;
+        return new ClusterMessage(sender, subject, payload);
+    }
+
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 6df0b23..cd34e0d 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -9,7 +9,7 @@
 /**
  * Kryo Serializer.
  */
-public class KryoSerializer implements Serializer {
+public class KryoSerializer implements PayloadSerializer {
 
     private KryoPool serializerPool;
 
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index a0d34a5..f199019 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -14,14 +14,14 @@
 public class MessageDecoder extends ReplayingDecoder<DecoderState> {
 
     private final NettyMessagingService messagingService;
-    private final Serializer serializer;
+    private final PayloadSerializer payloadSerializer;
 
     private int contentLength;
 
-    public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+    public MessageDecoder(NettyMessagingService messagingService, PayloadSerializer payloadSerializer) {
         super(DecoderState.READ_HEADER_VERSION);
         this.messagingService = messagingService;
-        this.serializer = serializer;
+        this.payloadSerializer = payloadSerializer;
     }
 
     @Override
@@ -48,7 +48,7 @@
             checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
             checkpoint(DecoderState.READ_CONTENT);
         case READ_CONTENT:
-            InternalMessage message = serializer.decode(buffer.readBytes(contentLength).nioBuffer());
+            InternalMessage message = payloadSerializer.decode(buffer.readBytes(contentLength).nioBuffer());
             message.setMessagingService(messagingService);
             out.add(message);
             checkpoint(DecoderState.READ_HEADER_VERSION);
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 5581747..0ee29cb 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -17,10 +17,10 @@
     public static final int SERIALIZER_VERSION = 1;
 
 
-    private final Serializer serializer;
+    private final PayloadSerializer payloadSerializer;
 
-    public MessageEncoder(Serializer serializer) {
-        this.serializer = serializer;
+    public MessageEncoder(PayloadSerializer payloadSerializer) {
+        this.payloadSerializer = payloadSerializer;
     }
 
     @Override
@@ -35,12 +35,12 @@
         // write preamble
         out.writeBytes(PREAMBLE);
 
-        byte[] payload = serializer.encode(message);
+        byte[] payload = payloadSerializer.encode(message);
 
         // write payload length
         out.writeInt(payload.length);
 
-        // write serializer version
+        // write payloadSerializer version
         out.writeInt(SERIALIZER_VERSION);
 
         // write payload.
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index ebad442..d455c94 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -38,4 +38,11 @@
      * @param type message type
      */
     public void unregisterHandler(String type);
+
+    /**
+     * Specify the serializer to use for encoding/decoding payload.
+     *
+     * @param payloadSerializer payloadSerializer to use
+     */
+    public void setPayloadSerializer(PayloadSerializer payloadSerializer);
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index f0c4861..4c32164 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -52,7 +52,7 @@
     private final GenericKeyedObjectPool<Endpoint, Channel> channels
             = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
 
-    protected Serializer serializer;
+    protected PayloadSerializer payloadSerializer;
 
     public NettyMessagingService() {
         // TODO: Default port should be configurable.
@@ -133,8 +133,9 @@
         handlers.remove(type);
     }
 
-    public void setSerializer(Serializer serializer) {
-        this.serializer = serializer;
+    @Override
+    public void setPayloadSerializer(PayloadSerializer payloadSerializer) {
+        this.payloadSerializer = payloadSerializer;
     }
 
     private MessageHandler getMessageHandler(String type) {
@@ -201,13 +202,13 @@
     private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
 
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(serializer);
+        private final ChannelHandler encoder = new MessageEncoder(payloadSerializer);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
                 .addLast("encoder", encoder)
-                .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
+                .addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer))
                 .addLast("handler", dispatcher);
         }
     }
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
similarity index 95%
rename from utils/netty/src/main/java/org/onlab/netty/Serializer.java
rename to utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
index 46550d4..9874543 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Serializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
@@ -5,7 +5,7 @@
 /**
  * Interface for encoding/decoding message payloads.
  */
-public interface Serializer {
+public interface PayloadSerializer {
 
     /**
      * Decodes the specified byte array to a POJO.
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
index 5ce8f2e..2c3f4d4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
@@ -44,8 +44,8 @@
     public static class TestNettyMessagingService extends NettyMessagingService {
         public TestNettyMessagingService(int port) throws Exception {
             super(port);
-            Serializer serializer = new KryoSerializer();
-            this.serializer = serializer;
+            PayloadSerializer payloadSerializer = new KryoSerializer();
+            this.payloadSerializer = payloadSerializer;
         }
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
index 6a93149..4d1db8e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -6,7 +6,7 @@
     public static void main(String... args) throws Exception {
         NettyMessagingService server = new NettyMessagingService(8080);
         server.activate();
-        server.setSerializer(new KryoSerializer());
+        server.setPayloadSerializer(new KryoSerializer());
         server.registerHandler("simple", new LoggingHandler());
         server.registerHandler("echo", new EchoHandler());
     }