[ONOS-5992] Ensure futures for failed messages are completed using the supplied Executor in NettyMessagingManager.
Change-Id: Iafbd829c649f5be94a146470df3a13b74eaa1064
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index e93e63b..d49964e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -268,8 +268,8 @@
@Override
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
checkPermission(CLUSTER_WRITE);
- CompletableFuture<byte[]> response = new CompletableFuture<>();
- Callback callback = new Callback(response, executor);
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ Callback callback = new Callback(future, executor);
Long messageId = messageIdGenerator.incrementAndGet();
callbacks.put(messageId, callback);
InternalMessage message = new InternalMessage(preamble,
@@ -278,11 +278,14 @@
localEp,
type,
payload);
- return sendAsync(ep, message).whenComplete((r, e) -> {
- if (e != null) {
+
+ sendAsync(ep, message).whenComplete((response, error) -> {
+ if (error != null) {
callbacks.invalidate(messageId);
+ callback.completeExceptionally(error);
}
- }).thenComposeAsync(v -> response, executor);
+ });
+ return future;
}
@Override