Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean
Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 21b0919..8a237ef 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -35,10 +35,6 @@
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
-import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -62,8 +58,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
+ private NodeId localNodeId;
+
@Activate
public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
log.info("Started");
}
@@ -73,60 +72,6 @@
}
@Override
- public boolean broadcast(ClusterMessage message) {
- boolean ok = true;
- final ControllerNode localNode = clusterService.getLocalNode();
- byte[] payload = message.getBytes();
- for (ControllerNode node : clusterService.getNodes()) {
- if (!node.equals(localNode)) {
- ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
- }
- }
- return ok;
- }
-
- @Override
- public boolean broadcastIncludeSelf(ClusterMessage message) {
- boolean ok = true;
- byte[] payload = message.getBytes();
- for (ControllerNode node : clusterService.getNodes()) {
- ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
- }
- return ok;
- }
-
- @Override
- public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
- boolean ok = true;
- final ControllerNode localNode = clusterService.getLocalNode();
- byte[] payload = message.getBytes();
- for (NodeId nodeId : nodes) {
- if (!nodeId.equals(localNode.id())) {
- ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
- }
- }
- return ok;
- }
-
- @Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
- }
-
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
- SettableFuture<byte[]> response = SettableFuture.create();
- sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
- if (e == null) {
- response.set(r);
- } else {
- response.setException(e);
- }
- });
- return response;
- }
-
- @Override
public <M> void broadcast(M message,
MessageSubject subject,
Function<M, byte[]> encoder) {
@@ -154,15 +99,19 @@
}
@Override
- public <M> boolean unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId) {
- byte[] payload = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- encoder.apply(message)).getBytes();
- return unicastUnchecked(subject, payload, toNodeId);
+ public <M> CompletableFuture<Void> unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ try {
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ encoder.apply(message)).getBytes();
+ return doUnicast(subject, payload, toNodeId);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
}
@Override
@@ -171,10 +120,10 @@
Function<M, byte[]> encoder,
Set<NodeId> nodes) {
byte[] payload = new ClusterMessage(
- clusterService.getLocalNode().id(),
+ localNodeId,
subject,
encoder.apply(message)).getBytes();
- nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+ nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
}
@Override
@@ -194,17 +143,11 @@
}
}
- private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- try {
- messagingService.sendAsync(nodeEp, subject.value(), payload);
- return true;
- } catch (IOException e) {
- log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
- return false;
- }
+ return messagingService.sendAsync(nodeEp, subject.value(), payload);
}
private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {