Revert "[ONOS-7373] Ensure Netty channels are closed on send exceptions"
This reverts commit cd25bd73b75812c371437eb5135da34c0d3fb4dd.
Change-Id: I178d270ace87b61cac1e98cd401d3a2b496d0d51
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 669e824..0359bae 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -49,10 +49,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.function.Function;
-import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
@@ -340,40 +338,10 @@
return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
}
- private <T> CompletableFuture<T> executeOnPooledConnection(
- Endpoint endpoint,
- String type,
- Function<ClientConnection, CompletableFuture<T>> callback,
- Executor executor) {
- CompletableFuture<T> future = new CompletableFuture<T>();
- executeOnPooledConnection(endpoint, type, callback, executor, future);
- return future;
- }
-
- private <T> void executeOnPooledConnection(
- Endpoint endpoint,
- String type,
- Function<ClientConnection, CompletableFuture<T>> callback,
- Executor executor,
- CompletableFuture<T> future) {
-
- // If the endpoint is the local node, avoid the loopback interface and use the singleton local connection.
- if (endpoint.equals(localEndpoint)) {
- callback.apply(localClientConnection).whenComplete((result, error) -> {
- if (error == null) {
- executor.execute(() -> future.complete(result));
- } else {
- executor.execute(() -> future.completeExceptionally(error));
- }
- });
- return;
- }
-
- // Get the channel pool and the offset for this message type.
+ private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
- int offset = getChannelOffset(type);
+ int offset = getChannelOffset(messageType);
- // If the channel future is completed exceptionally, open a new channel.
CompletableFuture<Channel> channelFuture = channelPool.get(offset);
if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
synchronized (channelPool) {
@@ -385,29 +353,7 @@
}
}
- // Create a consumer with which to complete the send operation on a given channel.
- final Consumer<Channel> runner = channel -> {
- ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
- callback.apply(connection).whenComplete((result, sendError) -> {
- if (sendError == null) {
- executor.execute(() -> future.complete(result));
- } else {
- // If an exception other than a TimeoutException occurred, close the connection and
- // remove the channel from the pool.
- if (!(Throwables.getRootCause(sendError) instanceof TimeoutException)) {
- synchronized (channelPool) {
- channelPool.set(offset, null);
- }
- connection.close();
- channel.close();
- }
- executor.execute(() -> future.completeExceptionally(sendError));
- }
- });
- };
-
- // Wait for the channel future to be completed. Once it's complete, if the channel is active then
- // attempt to send the message. Otherwise, if the channel is inactive then attempt to open a new channel.
+ CompletableFuture<Channel> future = new CompletableFuture<>();
final CompletableFuture<Channel> finalFuture = channelFuture;
finalFuture.whenComplete((channel, error) -> {
if (error == null) {
@@ -416,11 +362,17 @@
CompletableFuture<Channel> currentFuture = channelPool.get(offset);
if (currentFuture == finalFuture) {
channelPool.set(offset, null);
- executeOnPooledConnection(endpoint, type, callback, executor);
+ getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
+ if (recursiveError == null) {
+ future.complete(recursiveResult);
+ } else {
+ future.completeExceptionally(recursiveError);
+ }
+ });
} else {
currentFuture.whenComplete((recursiveResult, recursiveError) -> {
if (recursiveError == null) {
- runner.accept(recursiveResult);
+ future.complete(recursiveResult);
} else {
future.completeExceptionally(recursiveError);
}
@@ -428,12 +380,56 @@
}
}
} else {
- runner.accept(channel);
+ future.complete(channel);
}
} else {
future.completeExceptionally(error);
}
});
+ return future;
+ }
+
+ private <T> CompletableFuture<T> executeOnPooledConnection(
+ Endpoint endpoint,
+ String type,
+ Function<ClientConnection, CompletableFuture<T>> callback,
+ Executor executor) {
+ CompletableFuture<T> future = new CompletableFuture<T>();
+ executeOnPooledConnection(endpoint, type, callback, executor, future);
+ return future;
+ }
+
+ private <T> void executeOnPooledConnection(
+ Endpoint endpoint,
+ String type,
+ Function<ClientConnection, CompletableFuture<T>> callback,
+ Executor executor,
+ CompletableFuture<T> future) {
+ if (endpoint.equals(localEndpoint)) {
+ callback.apply(localClientConnection).whenComplete((result, error) -> {
+ if (error == null) {
+ executor.execute(() -> future.complete(result));
+ } else {
+ executor.execute(() -> future.completeExceptionally(error));
+ }
+ });
+ return;
+ }
+
+ getChannel(endpoint, type).whenComplete((channel, channelError) -> {
+ if (channelError == null) {
+ ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
+ callback.apply(connection).whenComplete((result, sendError) -> {
+ if (sendError == null) {
+ executor.execute(() -> future.complete(result));
+ } else {
+ executor.execute(() -> future.completeExceptionally(sendError));
+ }
+ });
+ } else {
+ executor.execute(() -> future.completeExceptionally(channelError));
+ }
+ });
}
@Override