Simplified how message payloads get serialized/deserialized
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index ab9ae3c..2f1e504 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -13,6 +13,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.net.AnnotationsUtil;
 import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DefaultDevice;
@@ -37,7 +38,11 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
 import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
@@ -104,6 +109,24 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(KryoPoolUtil.API)
+                    .register(InternalDeviceEvent.class)
+                    .register(InternalPortEvent.class)
+                    .register(InternalPortStatusEvent.class)
+                    .register(Timestamped.class)
+                    .register(MastershipBasedTimestamp.class)
+                    .build()
+                    .populate(1);
+        }
+
+    };
+
     @Activate
     public void activate() {
         clusterCommunicator.addSubscriber(
@@ -779,17 +802,26 @@
     }
 
     private void notifyPeers(InternalDeviceEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
     private void notifyPeers(InternalPortEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.PORT_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
     private void notifyPeers(InternalPortStatusEvent event) throws IOException {
-        ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+                SERIALIZER.encode(event));
         clusterCommunicator.broadcast(message);
     }
 
@@ -797,7 +829,7 @@
         @Override
         public void handle(ClusterMessage message) {
             log.info("Received device update event from peer: {}", message.sender());
-            InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+            InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
@@ -810,7 +842,7 @@
         public void handle(ClusterMessage message) {
 
             log.info("Received port update event from peer: {}", message.sender());
-            InternalPortEvent event = (InternalPortEvent) message.payload();
+            InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -825,7 +857,7 @@
         public void handle(ClusterMessage message) {
 
             log.info("Received port status update event from peer: {}", message.sender());
-            InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+            InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();