Ensure messages are correctly timed out in NettyMessagingManager

Change-Id: I341394a967c3cb8daf4668e6040c25598c87a6a7
(cherry picked from commit 7bf921c16b4f83f1b3c4c0913b39d530f1669ef0)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 20c38e0..80f0a20 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -49,7 +49,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import com.google.common.base.Throwables;
@@ -340,40 +339,10 @@
         return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
     }
 
-    private <T> CompletableFuture<T> executeOnPooledConnection(
-            Endpoint endpoint,
-            String type,
-            Function<ClientConnection, CompletableFuture<T>> callback,
-            Executor executor) {
-        CompletableFuture<T> future = new CompletableFuture<T>();
-        executeOnPooledConnection(endpoint, type, callback, executor, future);
-        return future;
-    }
-
-    private <T> void executeOnPooledConnection(
-        Endpoint endpoint,
-        String type,
-        Function<ClientConnection, CompletableFuture<T>> callback,
-        Executor executor,
-        CompletableFuture<T> future) {
-
-        // If the endpoint is the local node, avoid the loopback interface and use the singleton local connection.
-        if (endpoint.equals(localEndpoint)) {
-            callback.apply(localClientConnection).whenComplete((result, error) -> {
-                if (error == null) {
-                    executor.execute(() -> future.complete(result));
-                } else {
-                    executor.execute(() -> future.completeExceptionally(error));
-                }
-            });
-            return;
-        }
-
-        // Get the channel pool and the offset for this message type.
+    private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
         List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
-        int offset = getChannelOffset(type);
+        int offset = getChannelOffset(messageType);
 
-        // If the channel future is completed exceptionally, open a new channel.
         CompletableFuture<Channel> channelFuture = channelPool.get(offset);
         if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
             synchronized (channelPool) {
@@ -385,60 +354,99 @@
             }
         }
 
-        // Create a consumer with which to complete the send operation on a given channel.
-        final Consumer<Channel> runner = channel -> {
-            ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
-            callback.apply(connection).whenComplete((result, sendError) -> {
-                if (sendError == null) {
-                    executor.execute(() -> future.complete(result));
-                } else {
-                    // If an exception other than a TimeoutException occurred, close the connection and
-                    // remove the channel from the pool.
-                    Throwable cause = Throwables.getRootCause(sendError);
-                    if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
-                        synchronized (channelPool) {
-                            channelPool.set(offset, null);
-                        }
-                        channel.close();
-                        clientConnections.remove(channel);
-                        connection.close();
-                    }
-                    executor.execute(() -> future.completeExceptionally(sendError));
-                }
-            });
-        };
-
-        // Wait for the channel future to be completed. Once it's complete, if the channel is active then
-        // attempt to send the message. Otherwise, if the channel is inactive then attempt to open a new channel.
+        CompletableFuture<Channel> future = new CompletableFuture<>();
         final CompletableFuture<Channel> finalFuture = channelFuture;
         finalFuture.whenComplete((channel, error) -> {
             if (error == null) {
                 if (!channel.isActive()) {
-                    final CompletableFuture<Channel> currentFuture;
+                    CompletableFuture<Channel> currentFuture;
                     synchronized (channelPool) {
                         currentFuture = channelPool.get(offset);
                         if (currentFuture == finalFuture) {
                             channelPool.set(offset, null);
                         }
                     }
+
+                    ClientConnection connection = clientConnections.remove(channel);
+                    if (connection != null) {
+                        connection.close();
+                    }
+
                     if (currentFuture == finalFuture) {
-                        executeOnPooledConnection(endpoint, type, callback, executor);
+                        getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
+                            if (recursiveError == null) {
+                                future.complete(recursiveResult);
+                            } else {
+                                future.completeExceptionally(recursiveError);
+                            }
+                        });
                     } else {
                         currentFuture.whenComplete((recursiveResult, recursiveError) -> {
                             if (recursiveError == null) {
-                                runner.accept(recursiveResult);
+                                future.complete(recursiveResult);
                             } else {
                                 future.completeExceptionally(recursiveError);
                             }
                         });
                     }
                 } else {
-                    runner.accept(channel);
+                    future.complete(channel);
                 }
             } else {
                 future.completeExceptionally(error);
             }
         });
+        return future;
+    }
+
+    private <T> CompletableFuture<T> executeOnPooledConnection(
+            Endpoint endpoint,
+            String type,
+            Function<ClientConnection, CompletableFuture<T>> callback,
+            Executor executor) {
+        CompletableFuture<T> future = new CompletableFuture<T>();
+        executeOnPooledConnection(endpoint, type, callback, executor, future);
+        return future;
+    }
+
+    private <T> void executeOnPooledConnection(
+            Endpoint endpoint,
+            String type,
+            Function<ClientConnection, CompletableFuture<T>> callback,
+            Executor executor,
+            CompletableFuture<T> future) {
+        if (endpoint.equals(localEndpoint)) {
+            callback.apply(localClientConnection).whenComplete((result, error) -> {
+                if (error == null) {
+                    executor.execute(() -> future.complete(result));
+                } else {
+                    executor.execute(() -> future.completeExceptionally(error));
+                }
+            });
+            return;
+        }
+
+        getChannel(endpoint, type).whenComplete((channel, channelError) -> {
+            if (channelError == null) {
+                ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
+                callback.apply(connection).whenComplete((result, sendError) -> {
+                    if (sendError == null) {
+                        executor.execute(() -> future.complete(result));
+                    } else {
+                        Throwable cause = Throwables.getRootCause(sendError);
+                        if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
+                            channel.close().addListener(f -> {
+                                connection.close();
+                                clientConnections.remove(channel);
+                            });
+                        }
+                        executor.execute(() -> future.completeExceptionally(sendError));
+                    }
+                });
+            } else {
+                executor.execute(() -> future.completeExceptionally(channelError));
+            }
+        });
     }
 
     @Override
@@ -826,7 +834,6 @@
         @Override
         public void close() {
             if (closed.compareAndSet(false, true)) {
-                timeoutFuture.cancel(false);
                 for (Callback callback : futures.values()) {
                     callback.completeExceptionally(new ConnectException());
                 }