Simplified how message payloads get serialized/deserialized
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index dceb7c6..2bdf5a0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -11,7 +11,7 @@
private final NodeId sender;
private final MessageSubject subject;
- private final Object payload;
+ private final byte[] payload;
// TODO: add field specifying Serializer for payload
/**
@@ -19,7 +19,7 @@
*
* @param subject message subject
*/
- public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) {
+ public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
this.sender = sender;
this.subject = subject;
this.payload = payload;
@@ -48,7 +48,7 @@
*
* @return message payload.
*/
- public Object payload() {
+ public byte[] payload() {
return payload;
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 98be0b1..babe4d3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -23,6 +23,9 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
@@ -48,6 +51,18 @@
//@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MessagingService messagingService;
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(ClusterMessage.class)
+ .register(ClusterMembershipEvent.class)
+ .build()
+ .populate(1);
+ }
+
+ };
+
@Activate
public void activate() {
// TODO: initialize messagingService
@@ -92,7 +107,7 @@
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- messagingService.sendAsync(nodeEp, message.subject().value(), message);
+ messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
return true;
} catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -126,7 +141,7 @@
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
+ SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
members.remove(node.id());
}
@@ -138,7 +153,7 @@
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+ SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
}
}
@@ -147,7 +162,7 @@
@Override
public void handle(ClusterMessage message) {
- ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+ ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
ControllerNode node = event.node();
if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
log.info("Node {} sent a hearbeat", node.id());
@@ -172,7 +187,8 @@
@Override
public void handle(Message message) {
- handler.handle((ClusterMessage) message.payload());
+ ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ handler.handle(clusterMessage);
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index ab9ae3c..2f1e504 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -13,6 +13,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
@@ -37,7 +38,11 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
@@ -104,6 +109,24 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(InternalDeviceEvent.class)
+ .register(InternalPortEvent.class)
+ .register(InternalPortStatusEvent.class)
+ .register(Timestamped.class)
+ .register(MastershipBasedTimestamp.class)
+ .build()
+ .populate(1);
+ }
+
+ };
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
@@ -779,17 +802,26 @@
}
private void notifyPeers(InternalDeviceEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+ SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.PORT_UPDATE,
+ SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortStatusEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+ SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
@@ -797,7 +829,7 @@
@Override
public void handle(ClusterMessage message) {
log.info("Received device update event from peer: {}", message.sender());
- InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+ InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
@@ -810,7 +842,7 @@
public void handle(ClusterMessage message) {
log.info("Received port update event from peer: {}", message.sender());
- InternalPortEvent event = (InternalPortEvent) message.payload();
+ InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
@@ -825,7 +857,7 @@
public void handle(ClusterMessage message) {
log.info("Received port status update event from peer: {}", message.sender());
- InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+ InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
index dbd88c3..f4dadad 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -17,27 +17,20 @@
}
@Override
- public void write(Kryo kryo, Output output, ClusterMessage object) {
- kryo.writeClassAndObject(output, object.sender());
- kryo.writeClassAndObject(output, object.subject());
- // TODO: write bytes serialized using ClusterMessage specified serializer
- // write serialized payload size
- //output.writeInt(...);
- // write serialized payload
- //output.writeBytes(...);
+ public void write(Kryo kryo, Output output, ClusterMessage message) {
+ kryo.writeClassAndObject(output, message.sender());
+ kryo.writeClassAndObject(output, message.subject());
+ output.writeInt(message.payload().length);
+ output.writeBytes(message.payload());
}
@Override
public ClusterMessage read(Kryo kryo, Input input,
Class<ClusterMessage> type) {
- // TODO Auto-generated method stub
NodeId sender = (NodeId) kryo.readClassAndObject(input);
MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
- int size = input.readInt();
- byte[] payloadBytes = input.readBytes(size);
- // TODO: deserialize payload using ClusterMessage specified serializer
- Object payload = null;
+ int payloadSize = input.readInt();
+ byte[] payload = input.readBytes(payloadSize);
return new ClusterMessage(sender, subject, payload);
}
-
-}
+}
\ No newline at end of file