ClusterCommunicationManager: only serialize once for broadcast
Change-Id: Ife78af3c758c87eeb8a79cdbf51b5307b8b1ca88
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 070e1fe..1469433 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -103,9 +103,10 @@
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
+ byte[] payload = SERIALIZER.encode(message);
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
- ok = unicastUnchecked(message, node.id()) && ok;
+ ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
}
}
return ok;
@@ -114,8 +115,9 @@
@Override
public boolean broadcastIncludeSelf(ClusterMessage message) {
boolean ok = true;
+ byte[] payload = SERIALIZER.encode(message);
for (ControllerNode node : clusterService.getNodes()) {
- ok = unicastUnchecked(message, node.id()) && ok;
+ ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
}
return ok;
}
@@ -124,9 +126,10 @@
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
+ byte[] payload = SERIALIZER.encode(message);
for (NodeId nodeId : nodes) {
if (!nodeId.equals(localNode.id())) {
- ok = unicastUnchecked(message, nodeId) && ok;
+ ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
}
}
return ok;
@@ -134,12 +137,15 @@
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
+ return unicast(message.subject(), SERIALIZER.encode(message), toNodeId);
+ }
+
+ private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- messagingService.sendAsync(nodeEp,
- message.subject().value(), SERIALIZER.encode(message));
+ messagingService.sendAsync(nodeEp, subject.value(), payload);
return true;
} catch (IOException e) {
log.trace("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -147,9 +153,10 @@
}
}
- private boolean unicastUnchecked(ClusterMessage message, NodeId toNodeId) {
+
+ private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
try {
- return unicast(message, toNodeId);
+ return unicast(subject, payload, toNodeId);
} catch (IOException e) {
return false;
}