Revert "[ONOS-7373] Ensure Netty channels are closed on send exceptions"

This reverts commit cd25bd73b75812c371437eb5135da34c0d3fb4dd.

Change-Id: I178d270ace87b61cac1e98cd401d3a2b496d0d51
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 669e824..0359bae 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -49,10 +49,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
-import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
@@ -340,40 +338,10 @@
         return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
     }
 
-    private <T> CompletableFuture<T> executeOnPooledConnection(
-            Endpoint endpoint,
-            String type,
-            Function<ClientConnection, CompletableFuture<T>> callback,
-            Executor executor) {
-        CompletableFuture<T> future = new CompletableFuture<T>();
-        executeOnPooledConnection(endpoint, type, callback, executor, future);
-        return future;
-    }
-
-    private <T> void executeOnPooledConnection(
-        Endpoint endpoint,
-        String type,
-        Function<ClientConnection, CompletableFuture<T>> callback,
-        Executor executor,
-        CompletableFuture<T> future) {
-
-        // If the endpoint is the local node, avoid the loopback interface and use the singleton local connection.
-        if (endpoint.equals(localEndpoint)) {
-            callback.apply(localClientConnection).whenComplete((result, error) -> {
-                if (error == null) {
-                    executor.execute(() -> future.complete(result));
-                } else {
-                    executor.execute(() -> future.completeExceptionally(error));
-                }
-            });
-            return;
-        }
-
-        // Get the channel pool and the offset for this message type.
+    private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
         List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
-        int offset = getChannelOffset(type);
+        int offset = getChannelOffset(messageType);
 
-        // If the channel future is completed exceptionally, open a new channel.
         CompletableFuture<Channel> channelFuture = channelPool.get(offset);
         if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
             synchronized (channelPool) {
@@ -385,29 +353,7 @@
             }
         }
 
-        // Create a consumer with which to complete the send operation on a given channel.
-        final Consumer<Channel> runner = channel -> {
-            ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
-            callback.apply(connection).whenComplete((result, sendError) -> {
-                if (sendError == null) {
-                    executor.execute(() -> future.complete(result));
-                } else {
-                    // If an exception other than a TimeoutException occurred, close the connection and
-                    // remove the channel from the pool.
-                    if (!(Throwables.getRootCause(sendError) instanceof TimeoutException)) {
-                        synchronized (channelPool) {
-                            channelPool.set(offset, null);
-                        }
-                        connection.close();
-                        channel.close();
-                    }
-                    executor.execute(() -> future.completeExceptionally(sendError));
-                }
-            });
-        };
-
-        // Wait for the channel future to be completed. Once it's complete, if the channel is active then
-        // attempt to send the message. Otherwise, if the channel is inactive then attempt to open a new channel.
+        CompletableFuture<Channel> future = new CompletableFuture<>();
         final CompletableFuture<Channel> finalFuture = channelFuture;
         finalFuture.whenComplete((channel, error) -> {
             if (error == null) {
@@ -416,11 +362,17 @@
                         CompletableFuture<Channel> currentFuture = channelPool.get(offset);
                         if (currentFuture == finalFuture) {
                             channelPool.set(offset, null);
-                            executeOnPooledConnection(endpoint, type, callback, executor);
+                            getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
+                                if (recursiveError == null) {
+                                    future.complete(recursiveResult);
+                                } else {
+                                    future.completeExceptionally(recursiveError);
+                                }
+                            });
                         } else {
                             currentFuture.whenComplete((recursiveResult, recursiveError) -> {
                                 if (recursiveError == null) {
-                                    runner.accept(recursiveResult);
+                                    future.complete(recursiveResult);
                                 } else {
                                     future.completeExceptionally(recursiveError);
                                 }
@@ -428,12 +380,56 @@
                         }
                     }
                 } else {
-                    runner.accept(channel);
+                    future.complete(channel);
                 }
             } else {
                 future.completeExceptionally(error);
             }
         });
+        return future;
+    }
+
+    private <T> CompletableFuture<T> executeOnPooledConnection(
+            Endpoint endpoint,
+            String type,
+            Function<ClientConnection, CompletableFuture<T>> callback,
+            Executor executor) {
+        CompletableFuture<T> future = new CompletableFuture<T>();
+        executeOnPooledConnection(endpoint, type, callback, executor, future);
+        return future;
+    }
+
+    private <T> void executeOnPooledConnection(
+            Endpoint endpoint,
+            String type,
+            Function<ClientConnection, CompletableFuture<T>> callback,
+            Executor executor,
+            CompletableFuture<T> future) {
+        if (endpoint.equals(localEndpoint)) {
+            callback.apply(localClientConnection).whenComplete((result, error) -> {
+                if (error == null) {
+                    executor.execute(() -> future.complete(result));
+                } else {
+                    executor.execute(() -> future.completeExceptionally(error));
+                }
+            });
+            return;
+        }
+
+        getChannel(endpoint, type).whenComplete((channel, channelError) -> {
+            if (channelError == null) {
+                ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
+                callback.apply(connection).whenComplete((result, sendError) -> {
+                    if (sendError == null) {
+                        executor.execute(() -> future.complete(result));
+                    } else {
+                        executor.execute(() -> future.completeExceptionally(sendError));
+                    }
+                });
+            } else {
+                executor.execute(() -> future.completeExceptionally(channelError));
+            }
+        });
     }
 
     @Override