Timeout local messages in NettyMessagingManager to avoid hanging when receivers are blocked on external calls.
Change-Id: Ic104a21317f4223921f1acba231e3f97039c2f2e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index b7e121b..0359bae 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -116,8 +116,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ClientConnection localClientConnection = new LocalClientConnection();
- private final ServerConnection localServerConnection = new LocalServerConnection(null);
+ private final LocalClientConnection localClientConnection = new LocalClientConnection();
+ private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
//TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
private static final String CONFIG_DIR = "../config";
@@ -287,6 +287,7 @@
*/
private void timeoutAllCallbacks() {
// Iterate through all connections and time out callbacks.
+ localClientConnection.timeoutCallbacks();
for (RemoteClientConnection connection : clientConnections.values()) {
connection.timeoutCallbacks();
}
@@ -406,11 +407,11 @@
CompletableFuture<T> future) {
if (endpoint.equals(localEndpoint)) {
callback.apply(localClientConnection).whenComplete((result, error) -> {
- if (error == null) {
- executor.execute(() -> future.complete(result));
- } else {
- executor.execute(() -> future.completeExceptionally(error));
- }
+ if (error == null) {
+ executor.execute(() -> future.complete(result));
+ } else {
+ executor.execute(() -> future.completeExceptionally(error));
+ }
});
return;
}
@@ -656,7 +657,6 @@
*
* @param msg inbound message
* @return true if {@code msg} is {@link InternalMessage} instance.
- *
* @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
*/
@Override
@@ -724,7 +724,7 @@
* Sends a reply to the other side of the connection.
*
* @param message the message to which to reply
- * @param status the reply status
+ * @param status the reply status
* @param payload the response payload
*/
void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
@@ -737,9 +737,84 @@
}
/**
+ * Remote connection implementation.
+ */
+ private abstract class AbstractClientConnection implements ClientConnection {
+ private final Map<Long, Callback> futures = Maps.newConcurrentMap();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
+ .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
+ .build();
+
+ /**
+ * Times out callbacks for this connection.
+ */
+ protected void timeoutCallbacks() {
+ // Store the current time.
+ long currentTime = System.currentTimeMillis();
+
+ // Iterate through future callbacks and time out callbacks that have been alive
+ // longer than the current timeout according to the message type.
+ Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Callback callback = iterator.next().getValue();
+ try {
+ TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
+ long currentTimeout = timeoutHistory.currentTimeout;
+ if (currentTime - callback.time > currentTimeout) {
+ iterator.remove();
+ long elapsedTime = currentTime - callback.time;
+ timeoutHistory.addReplyTime(elapsedTime);
+ callback.completeExceptionally(
+ new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
+ }
+ } catch (ExecutionException e) {
+ throw new AssertionError();
+ }
+ }
+
+ // Iterate through all timeout histories and recompute the timeout.
+ for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
+ timeoutHistory.recomputeTimeoutMillis();
+ }
+ }
+
+ protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
+ futures.put(id, new Callback(subject, future));
+ }
+
+ protected Callback completeCallback(long id) {
+ Callback callback = futures.remove(id);
+ if (callback != null) {
+ try {
+ TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
+ timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
+ } catch (ExecutionException e) {
+ throw new AssertionError();
+ }
+ }
+ return callback;
+ }
+
+ protected Callback failCallback(long id) {
+ return futures.remove(id);
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ timeoutFuture.cancel(false);
+ for (Callback callback : futures.values()) {
+ callback.completeExceptionally(new ConnectException());
+ }
+ }
+ }
+ }
+
+ /**
* Local connection implementation.
*/
- private final class LocalClientConnection implements ClientConnection {
+ private final class LocalClientConnection extends AbstractClientConnection {
@Override
public CompletableFuture<Void> sendAsync(InternalRequest message) {
BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
@@ -754,6 +829,8 @@
@Override
public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
CompletableFuture<byte[]> future = new CompletableFuture<>();
+ future.whenComplete((r, e) -> completeCallback(message.id()));
+ registerCallback(message.id(), message.subject(), future);
BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
if (handler != null) {
handler.accept(message, new LocalServerConnection(future));
@@ -795,51 +872,13 @@
/**
* Remote connection implementation.
*/
- private final class RemoteClientConnection implements ClientConnection {
+ private final class RemoteClientConnection extends AbstractClientConnection {
private final Channel channel;
- private final Map<Long, Callback> futures = Maps.newConcurrentMap();
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
- .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
- .build();
RemoteClientConnection(Channel channel) {
this.channel = channel;
}
- /**
- * Times out callbacks for this connection.
- */
- private void timeoutCallbacks() {
- // Store the current time.
- long currentTime = System.currentTimeMillis();
-
- // Iterate through future callbacks and time out callbacks that have been alive
- // longer than the current timeout according to the message type.
- Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
- while (iterator.hasNext()) {
- Callback callback = iterator.next().getValue();
- try {
- TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
- long currentTimeout = timeoutHistory.currentTimeout;
- if (currentTime - callback.time > currentTimeout) {
- iterator.remove();
- long elapsedTime = currentTime - callback.time;
- timeoutHistory.addReplyTime(elapsedTime);
- callback.completeExceptionally(
- new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
- }
- } catch (ExecutionException e) {
- throw new AssertionError();
- }
- }
-
- // Iterate through all timeout histories and recompute the timeout.
- for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
- timeoutHistory.recomputeTimeoutMillis();
- }
- }
-
@Override
public CompletableFuture<Void> sendAsync(InternalRequest message) {
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -856,12 +895,13 @@
@Override
public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
CompletableFuture<byte[]> future = new CompletableFuture<>();
- Callback callback = new Callback(message.subject(), future);
- futures.put(message.id(), callback);
+ registerCallback(message.id(), message.subject(), future);
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
- futures.remove(message.id());
- callback.completeExceptionally(channelFuture.cause());
+ Callback callback = failCallback(message.id());
+ if (callback != null) {
+ callback.completeExceptionally(channelFuture.cause());
+ }
}
});
return future;
@@ -880,7 +920,7 @@
clockService.recordEventTime(message.time());
- Callback callback = futures.remove(message.id());
+ Callback callback = completeCallback(message.id());
if (callback != null) {
if (message.status() == InternalReply.Status.OK) {
callback.complete(message.payload());
@@ -891,29 +931,12 @@
} else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
callback.completeExceptionally(new MessagingException.ProtocolException());
}
-
- try {
- TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
- timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
- } catch (ExecutionException e) {
- throw new AssertionError();
- }
} else {
log.debug("Received a reply for message id:[{}] "
+ "but was unable to locate the"
+ " request handle", message.id());
}
}
-
- @Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- timeoutFuture.cancel(false);
- for (Callback callback : futures.values()) {
- callback.completeExceptionally(new ConnectException());
- }
- }
- }
}
/**