Ensure messages are correctly timed out in NettyMessagingManager
Change-Id: I341394a967c3cb8daf4668e6040c25598c87a6a7
(cherry picked from commit 7bf921c16b4f83f1b3c4c0913b39d530f1669ef0)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 20c38e0..80f0a20 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -49,7 +49,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.function.Function;
import com.google.common.base.Throwables;
@@ -340,40 +339,10 @@
return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
}
- private <T> CompletableFuture<T> executeOnPooledConnection(
- Endpoint endpoint,
- String type,
- Function<ClientConnection, CompletableFuture<T>> callback,
- Executor executor) {
- CompletableFuture<T> future = new CompletableFuture<T>();
- executeOnPooledConnection(endpoint, type, callback, executor, future);
- return future;
- }
-
- private <T> void executeOnPooledConnection(
- Endpoint endpoint,
- String type,
- Function<ClientConnection, CompletableFuture<T>> callback,
- Executor executor,
- CompletableFuture<T> future) {
-
- // If the endpoint is the local node, avoid the loopback interface and use the singleton local connection.
- if (endpoint.equals(localEndpoint)) {
- callback.apply(localClientConnection).whenComplete((result, error) -> {
- if (error == null) {
- executor.execute(() -> future.complete(result));
- } else {
- executor.execute(() -> future.completeExceptionally(error));
- }
- });
- return;
- }
-
- // Get the channel pool and the offset for this message type.
+ private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
- int offset = getChannelOffset(type);
+ int offset = getChannelOffset(messageType);
- // If the channel future is completed exceptionally, open a new channel.
CompletableFuture<Channel> channelFuture = channelPool.get(offset);
if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
synchronized (channelPool) {
@@ -385,60 +354,99 @@
}
}
- // Create a consumer with which to complete the send operation on a given channel.
- final Consumer<Channel> runner = channel -> {
- ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
- callback.apply(connection).whenComplete((result, sendError) -> {
- if (sendError == null) {
- executor.execute(() -> future.complete(result));
- } else {
- // If an exception other than a TimeoutException occurred, close the connection and
- // remove the channel from the pool.
- Throwable cause = Throwables.getRootCause(sendError);
- if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
- synchronized (channelPool) {
- channelPool.set(offset, null);
- }
- channel.close();
- clientConnections.remove(channel);
- connection.close();
- }
- executor.execute(() -> future.completeExceptionally(sendError));
- }
- });
- };
-
- // Wait for the channel future to be completed. Once it's complete, if the channel is active then
- // attempt to send the message. Otherwise, if the channel is inactive then attempt to open a new channel.
+ CompletableFuture<Channel> future = new CompletableFuture<>();
final CompletableFuture<Channel> finalFuture = channelFuture;
finalFuture.whenComplete((channel, error) -> {
if (error == null) {
if (!channel.isActive()) {
- final CompletableFuture<Channel> currentFuture;
+ CompletableFuture<Channel> currentFuture;
synchronized (channelPool) {
currentFuture = channelPool.get(offset);
if (currentFuture == finalFuture) {
channelPool.set(offset, null);
}
}
+
+ ClientConnection connection = clientConnections.remove(channel);
+ if (connection != null) {
+ connection.close();
+ }
+
if (currentFuture == finalFuture) {
- executeOnPooledConnection(endpoint, type, callback, executor);
+ getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
+ if (recursiveError == null) {
+ future.complete(recursiveResult);
+ } else {
+ future.completeExceptionally(recursiveError);
+ }
+ });
} else {
currentFuture.whenComplete((recursiveResult, recursiveError) -> {
if (recursiveError == null) {
- runner.accept(recursiveResult);
+ future.complete(recursiveResult);
} else {
future.completeExceptionally(recursiveError);
}
});
}
} else {
- runner.accept(channel);
+ future.complete(channel);
}
} else {
future.completeExceptionally(error);
}
});
+ return future;
+ }
+
+ private <T> CompletableFuture<T> executeOnPooledConnection(
+ Endpoint endpoint,
+ String type,
+ Function<ClientConnection, CompletableFuture<T>> callback,
+ Executor executor) {
+ CompletableFuture<T> future = new CompletableFuture<T>();
+ executeOnPooledConnection(endpoint, type, callback, executor, future);
+ return future;
+ }
+
+ private <T> void executeOnPooledConnection(
+ Endpoint endpoint,
+ String type,
+ Function<ClientConnection, CompletableFuture<T>> callback,
+ Executor executor,
+ CompletableFuture<T> future) {
+ if (endpoint.equals(localEndpoint)) {
+ callback.apply(localClientConnection).whenComplete((result, error) -> {
+ if (error == null) {
+ executor.execute(() -> future.complete(result));
+ } else {
+ executor.execute(() -> future.completeExceptionally(error));
+ }
+ });
+ return;
+ }
+
+ getChannel(endpoint, type).whenComplete((channel, channelError) -> {
+ if (channelError == null) {
+ ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
+ callback.apply(connection).whenComplete((result, sendError) -> {
+ if (sendError == null) {
+ executor.execute(() -> future.complete(result));
+ } else {
+ Throwable cause = Throwables.getRootCause(sendError);
+ if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
+ channel.close().addListener(f -> {
+ connection.close();
+ clientConnections.remove(channel);
+ });
+ }
+ executor.execute(() -> future.completeExceptionally(sendError));
+ }
+ });
+ } else {
+ executor.execute(() -> future.completeExceptionally(channelError));
+ }
+ });
}
@Override
@@ -826,7 +834,6 @@
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
- timeoutFuture.cancel(false);
for (Callback callback : futures.values()) {
callback.completeExceptionally(new ConnectException());
}