Simplified how message payloads get serialized/deserialized
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 98be0b1..babe4d3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -23,6 +23,9 @@
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 import org.onlab.netty.Endpoint;
 import org.onlab.netty.Message;
 import org.onlab.netty.MessageHandler;
@@ -48,6 +51,18 @@
     //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private MessagingService messagingService;
 
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(KryoPoolUtil.API)
+                    .register(ClusterMessage.class)
+                    .register(ClusterMembershipEvent.class)
+                    .build()
+                    .populate(1);
+        }
+
+    };
+
     @Activate
     public void activate() {
         // TODO: initialize messagingService
@@ -92,7 +107,7 @@
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
         try {
-            messagingService.sendAsync(nodeEp, message.subject().value(), message);
+            messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
             return true;
         } catch (IOException e) {
             log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -126,7 +141,7 @@
         broadcast(new ClusterMessage(
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-                new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
         members.remove(node.id());
     }
 
@@ -138,7 +153,7 @@
             broadcast(new ClusterMessage(
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-                new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
         }
     }
 
@@ -147,7 +162,7 @@
         @Override
         public void handle(ClusterMessage message) {
 
-            ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+            ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
             ControllerNode node = event.node();
             if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
                 log.info("Node {} sent a hearbeat", node.id());
@@ -172,7 +187,8 @@
 
         @Override
         public void handle(Message message) {
-            handler.handle((ClusterMessage) message.payload());
+            ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+            handler.handle(clusterMessage);
         }
     }
 }