[ONOS-5992] Ensure futures for failed messages are completed using the supplied Executor in NettyMessagingManager.

Change-Id: Iafbd829c649f5be94a146470df3a13b74eaa1064
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index e93e63b..d49964e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -268,8 +268,8 @@
     @Override
     public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
         checkPermission(CLUSTER_WRITE);
-        CompletableFuture<byte[]> response = new CompletableFuture<>();
-        Callback callback = new Callback(response, executor);
+        CompletableFuture<byte[]> future = new CompletableFuture<>();
+        Callback callback = new Callback(future, executor);
         Long messageId = messageIdGenerator.incrementAndGet();
         callbacks.put(messageId, callback);
         InternalMessage message = new InternalMessage(preamble,
@@ -278,11 +278,14 @@
                                                       localEp,
                                                       type,
                                                       payload);
-        return sendAsync(ep, message).whenComplete((r, e) -> {
-            if (e != null) {
+
+        sendAsync(ep, message).whenComplete((response, error) -> {
+            if (error != null) {
                 callbacks.invalidate(messageId);
+                callback.completeExceptionally(error);
             }
-        }).thenComposeAsync(v -> response, executor);
+        });
+        return future;
     }
 
     @Override