ONOS-1983: Migrating all copycat Raft protocol specific communication to use ONOS cluster communication primitives
Change-Id: I3f07266e50106b1adc13f722c647686c2b42ef7f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7475819..21b0919 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,6 +21,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
@@ -182,11 +183,15 @@
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
NodeId toNodeId) {
- ClusterMessage envelope = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- encoder.apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ try {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
}
private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
@@ -223,7 +228,6 @@
messagingService.unregisterHandler(subject.value());
}
-
@Override
public <M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
@@ -231,8 +235,26 @@
Function<R, byte[]> encoder,
Executor executor) {
messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<>(decoder, encoder, handler),
- executor);
+ new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+ CompletableFuture<R> responseFuture = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ responseFuture.complete(handler.apply(m));
+ } catch (Exception e) {
+ responseFuture.completeExceptionally(e);
+ }
+ });
+ return responseFuture;
+ }));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageResponder<>(decoder, encoder, handler));
}
@Override
@@ -260,23 +282,22 @@
}
}
- private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
+ private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
private final Function<byte[], M> decoder;
private final Function<R, byte[]> encoder;
- private final Function<M, R> handler;
+ private final Function<M, CompletableFuture<R>> handler;
public InternalMessageResponder(Function<byte[], M> decoder,
Function<R, byte[]> encoder,
- Function<M, R> handler) {
+ Function<M, CompletableFuture<R>> handler) {
this.decoder = decoder;
this.encoder = encoder;
this.handler = handler;
}
@Override
- public byte[] apply(byte[] bytes) {
- R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
- return encoder.apply(reply);
+ public CompletableFuture<byte[]> apply(byte[] bytes) {
+ return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
}
}