Simplified how message payloads get serialized/deserialized
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index ab9ae3c..2f1e504 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -13,6 +13,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
@@ -37,7 +38,11 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
@@ -104,6 +109,24 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(InternalDeviceEvent.class)
+ .register(InternalPortEvent.class)
+ .register(InternalPortStatusEvent.class)
+ .register(Timestamped.class)
+ .register(MastershipBasedTimestamp.class)
+ .build()
+ .populate(1);
+ }
+
+ };
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
@@ -779,17 +802,26 @@
}
private void notifyPeers(InternalDeviceEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+ SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.PORT_UPDATE,
+ SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortStatusEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+ SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
@@ -797,7 +829,7 @@
@Override
public void handle(ClusterMessage message) {
log.info("Received device update event from peer: {}", message.sender());
- InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+ InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
@@ -810,7 +842,7 @@
public void handle(ClusterMessage message) {
log.info("Received port update event from peer: {}", message.sender());
- InternalPortEvent event = (InternalPortEvent) message.payload();
+ InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
@@ -825,7 +857,7 @@
public void handle(ClusterMessage message) {
log.info("Received port status update event from peer: {}", message.sender());
- InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+ InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();