Simplified how message payloads get serialized/deserialized
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index dceb7c6..2bdf5a0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -11,7 +11,7 @@
 
     private final NodeId sender;
     private final MessageSubject subject;
-    private final Object payload;
+    private final byte[] payload;
     // TODO: add field specifying Serializer for payload
 
     /**
@@ -19,7 +19,7 @@
      *
      * @param subject message subject
      */
-    public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) {
+    public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
         this.sender = sender;
         this.subject = subject;
         this.payload = payload;
@@ -48,7 +48,7 @@
      *
      * @return message payload.
      */
-    public Object payload() {
+    public byte[] payload() {
         return payload;
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 98be0b1..babe4d3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -23,6 +23,9 @@
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 import org.onlab.netty.Endpoint;
 import org.onlab.netty.Message;
 import org.onlab.netty.MessageHandler;
@@ -48,6 +51,18 @@
     //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private MessagingService messagingService;
 
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(KryoPoolUtil.API)
+                    .register(ClusterMessage.class)
+                    .register(ClusterMembershipEvent.class)
+                    .build()
+                    .populate(1);
+        }
+
+    };
+
     @Activate
     public void activate() {
         // TODO: initialize messagingService
@@ -92,7 +107,7 @@
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
         try {
-            messagingService.sendAsync(nodeEp, message.subject().value(), message);
+            messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
             return true;
         } catch (IOException e) {
             log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -126,7 +141,7 @@
         broadcast(new ClusterMessage(
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-                new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
         members.remove(node.id());
     }
 
@@ -138,7 +153,7 @@
             broadcast(new ClusterMessage(
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-                new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
         }
     }
 
@@ -147,7 +162,7 @@
         @Override
         public void handle(ClusterMessage message) {
 
-            ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+            ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
             ControllerNode node = event.node();
             if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
                 log.info("Node {} sent a hearbeat", node.id());
@@ -172,7 +187,8 @@
 
         @Override
         public void handle(Message message) {
-            handler.handle((ClusterMessage) message.payload());
+            ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+            handler.handle(clusterMessage);
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index ab9ae3c..2f1e504 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -13,6 +13,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.net.AnnotationsUtil;
 import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DefaultDevice;
@@ -37,7 +38,11 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
 import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
@@ -104,6 +109,24 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(KryoPoolUtil.API)
+                    .register(InternalDeviceEvent.class)
+                    .register(InternalPortEvent.class)
+                    .register(InternalPortStatusEvent.class)
+                    .register(Timestamped.class)
+                    .register(MastershipBasedTimestamp.class)
+                    .build()
+                    .populate(1);
+        }
+
+    };
+
     @Activate
     public void activate() {
         clusterCommunicator.addSubscriber(
@@ -779,17 +802,26 @@
     }
 
     private void notifyPeers(InternalDeviceEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
     private void notifyPeers(InternalPortEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.PORT_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
     private void notifyPeers(InternalPortStatusEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
@@ -797,7 +829,7 @@
         @Override
         public void handle(ClusterMessage message) {
             log.info("Received device update event from peer: {}", message.sender());
-            InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+            InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
@@ -810,7 +842,7 @@
         public void handle(ClusterMessage message) {
 
             log.info("Received port update event from peer: {}", message.sender());
-            InternalPortEvent event = (InternalPortEvent) message.payload();
+            InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -825,7 +857,7 @@
         public void handle(ClusterMessage message) {
 
             log.info("Received port status update event from peer: {}", message.sender());
-            InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+            InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
index dbd88c3..f4dadad 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -17,27 +17,20 @@
     }
 
     @Override
-    public void write(Kryo kryo, Output output, ClusterMessage object) {
-        kryo.writeClassAndObject(output, object.sender());
-        kryo.writeClassAndObject(output, object.subject());
-        // TODO: write bytes serialized using ClusterMessage specified serializer
-        // write serialized payload size
-        //output.writeInt(...);
-        // write serialized payload
-        //output.writeBytes(...);
+    public void write(Kryo kryo, Output output, ClusterMessage message) {
+        kryo.writeClassAndObject(output, message.sender());
+        kryo.writeClassAndObject(output, message.subject());
+        output.writeInt(message.payload().length);
+        output.writeBytes(message.payload());
     }
 
     @Override
     public ClusterMessage read(Kryo kryo, Input input,
                                Class<ClusterMessage> type) {
-        // TODO Auto-generated method stub
         NodeId sender = (NodeId) kryo.readClassAndObject(input);
         MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
-        int size = input.readInt();
-        byte[] payloadBytes = input.readBytes(size);
-        // TODO: deserialize payload using ClusterMessage specified serializer
-        Object payload = null;
+        int payloadSize = input.readInt();
+        byte[] payload = input.readBytes(payloadSize);
         return new ClusterMessage(sender, subject, payload);
     }
-
-}
+}
\ No newline at end of file