[ONOS-6401] Implement dynamically computed timeouts for NettyMessagingManager
Change-Id: Ie2673603a2251983c9e0a166020b4feec041e84c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index f3b028c..6200947 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -18,15 +18,13 @@
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
@@ -39,18 +37,23 @@
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.pool.AbstractChannelPoolHandler;
+import io.netty.channel.pool.AbstractChannelPoolMap;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolMap;
+import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import io.netty.util.concurrent.FutureListener;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.HybridLogicalClockService;
@@ -67,21 +70,27 @@
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
-import java.io.IOException;
+import java.net.ConnectException;
import java.security.KeyStore;
+import java.time.Duration;
+import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
+import java.util.function.Function;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.security.AppGuard.checkPermission;
@@ -93,41 +102,51 @@
@Component(immediate = true)
@Service
public class NettyMessagingManager implements MessagingService {
-
- private static final int REPLY_TIME_OUT_MILLIS = 500;
+ private static final long DEFAULT_TIMEOUT_MILLIS = 500;
+ private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(10).toMillis();
+ private static final long MIN_TIMEOUT_MILLIS = 100;
+ private static final long MAX_TIMEOUT_MILLIS = 5000;
+ private static final long TIMEOUT_INTERVAL = 50;
+ private static final int WINDOW_SIZE = 100;
+ private static final double TIMEOUT_MULTIPLIER = 2.5;
private static final short MIN_KS_LENGTH = 6;
+ private static final byte[] EMPTY_PAYLOAD = new byte[0];
+
private final Logger log = LoggerFactory.getLogger(getClass());
- private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+ private final ClientConnection localClientConnection = new LocalClientConnection();
+ private final ServerConnection localServerConnection = new LocalServerConnection(null);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HybridLogicalClockService clockService;
- private Endpoint localEp;
+ private Endpoint localEndpoint;
private int preamble;
private final AtomicBoolean started = new AtomicBoolean(false);
- private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+ private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
+ private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
+ private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
- .expireAfterWrite(REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS)
- .removalListener(new RemovalListener<Long, Callback>() {
- @Override
- public void onRemoval(RemovalNotification<Long, Callback> entry) {
- if (entry.wasEvicted()) {
- entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
- }
- }
- })
- .build();
- private final GenericKeyedObjectPool<Endpoint, Connection> channels
- = new GenericKeyedObjectPool<>(new OnosCommunicationChannelFactory());
+ private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
+ .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
+ .build();
+ private ScheduledFuture<?> timeoutFuture;
+
+ private final ChannelPoolMap<Endpoint, SimpleChannelPool> channels =
+ new AbstractChannelPoolMap<Endpoint, SimpleChannelPool>() {
+ @Override
+ protected SimpleChannelPool newPool(Endpoint endpoint) {
+ return new SimpleChannelPool(bootstrapClient(endpoint), new ClientChannelPoolHandler());
+ }
+ };
private EventLoopGroup serverGroup;
private EventLoopGroup clientGroup;
private Class<? extends ServerChannel> serverChannelClass;
private Class<? extends Channel> clientChannelClass;
+ private ScheduledExecutorService timeoutExecutor;
protected static final boolean TLS_DISABLED = false;
protected boolean enableNettyTls = TLS_DISABLED;
@@ -146,29 +165,28 @@
getTlsParameters();
if (started.get()) {
- log.warn("Already running at local endpoint: {}", localEp);
+ log.warn("Already running at local endpoint: {}", localEndpoint);
return;
}
this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
- this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
- channels.setLifo(true);
- channels.setTestOnBorrow(true);
- channels.setTestOnReturn(true);
- channels.setMinEvictableIdleTimeMillis(60_000L);
- channels.setTimeBetweenEvictionRunsMillis(30_000L);
+ this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
initEventLoopGroup();
startAcceptingConnections();
+ timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("NettyMessagingEvt", "timeout", log));
+ timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
+ this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
started.set(true);
- serverGroup.scheduleWithFixedDelay(callbacks::cleanUp, 0, REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() throws Exception {
if (started.get()) {
- channels.close();
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
+ timeoutFuture.cancel(false);
+ timeoutExecutor.shutdown();
started.set(false);
}
log.info("Stopped");
@@ -201,6 +219,7 @@
}
}
}
+
private void initEventLoopGroup() {
// try Epoll first and if that does work, use nio.
try {
@@ -211,7 +230,7 @@
return;
} catch (Throwable e) {
log.debug("Failed to initialize native (epoll) transport. "
- + "Reason: {}. Proceeding with nio.", e.getMessage());
+ + "Reason: {}. Proceeding with nio.", e.getMessage());
}
clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
@@ -219,44 +238,31 @@
clientChannelClass = NioSocketChannel.class;
}
+ /**
+ * Times out response callbacks.
+ */
+ private void timeoutAllCallbacks() {
+ // Iterate through all connections and time out callbacks.
+ for (RemoteClientConnection connection : clientConnections.values()) {
+ connection.timeoutCallbacks();
+ }
+
+ // Iterate through all timeout histories and recompute the timeout.
+ for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
+ timeoutHistory.recomputeTimeoutMillis();
+ }
+ }
+
@Override
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
checkPermission(CLUSTER_WRITE);
InternalMessage message = new InternalMessage(preamble,
- clockService.timeNow(),
- messageIdGenerator.incrementAndGet(),
- localEp,
- type,
- payload);
- return sendAsync(ep, message);
- }
-
- protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
- checkPermission(CLUSTER_WRITE);
- if (ep.equals(localEp)) {
- try {
- dispatchLocally(message);
- } catch (IOException e) {
- return Tools.exceptionalFuture(e);
- }
- return CompletableFuture.completedFuture(null);
- }
-
- CompletableFuture<Void> future = new CompletableFuture<>();
- try {
- Connection connection = null;
- try {
- connection = channels.borrowObject(ep);
- connection.send(message, future);
- } finally {
- if (connection != null) {
- channels.returnObject(ep, connection);
- }
- }
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
+ clockService.timeNow(),
+ messageIdGenerator.incrementAndGet(),
+ localEndpoint,
+ type,
+ payload);
+ return executeOnPooledConnection(ep, c -> c.sendAsync(message), MoreExecutors.directExecutor());
}
@Override
@@ -268,21 +274,62 @@
@Override
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
checkPermission(CLUSTER_WRITE);
- CompletableFuture<byte[]> future = new CompletableFuture<>();
- Callback callback = new Callback(future, executor);
Long messageId = messageIdGenerator.incrementAndGet();
- callbacks.put(messageId, callback);
InternalMessage message = new InternalMessage(preamble,
- clockService.timeNow(),
- messageId,
- localEp,
- type,
- payload);
+ clockService.timeNow(),
+ messageId,
+ localEndpoint,
+ type,
+ payload);
+ return executeOnPooledConnection(ep, c -> c.sendAndReceive(message), executor);
+ }
- sendAsync(ep, message).whenComplete((response, error) -> {
- if (error != null) {
- callbacks.invalidate(messageId);
- callback.completeExceptionally(error);
+ /**
+ * Executes the given callback on a pooled connection.
+ *
+ * @param endpoint the endpoint to which to send a message
+ * @param callback the callback to execute to send the message
+ * @param <T> the send result type
+ * @return a completable future to be completed with the result of the supplied function
+ */
+ private <T> CompletableFuture<T> executeOnPooledConnection(
+ Endpoint endpoint,
+ Function<ClientConnection, CompletableFuture<T>> callback,
+ Executor executor) {
+ if (endpoint.equals(localEndpoint)) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ callback.apply(localClientConnection).whenComplete((result, error) -> {
+ if (error == null) {
+ executor.execute(() -> future.complete(result));
+ } else {
+ executor.execute(() -> future.completeExceptionally(error));
+ }
+ });
+ return future;
+ }
+
+ CompletableFuture<T> future = new CompletableFuture<>();
+ ChannelPool pool = channels.get(endpoint);
+ pool.acquire().addListener((FutureListener<Channel>) channelResult -> {
+ if (channelResult.isSuccess()) {
+ Channel channel = channelResult.getNow();
+ ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
+ callback.apply(connection).whenComplete((result, error) -> {
+ pool.release(channel).addListener(releaseResult -> {
+ if (!releaseResult.isSuccess()) {
+ clientConnections.remove(channel);
+ connection.close();
+ }
+ });
+
+ if (error == null) {
+ executor.execute(() -> future.complete(result));
+ } else {
+ executor.execute(() -> future.completeExceptionally(error));
+ }
+ });
+ } else {
+ executor.execute(() -> future.completeExceptionally(channelResult.cause()));
}
});
return future;
@@ -291,13 +338,14 @@
@Override
public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
checkPermission(CLUSTER_WRITE);
- handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
+ handlers.put(type, (message, connection) -> executor.execute(() ->
+ handler.accept(message.sender(), message.payload())));
}
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
checkPermission(CLUSTER_WRITE);
- handlers.put(type, message -> executor.execute(() -> {
+ handlers.put(type, (message, connection) -> executor.execute(() -> {
byte[] responsePayload = null;
Status status = Status.OK;
try {
@@ -306,14 +354,14 @@
log.debug("An error occurred in a message handler: {}", e);
status = Status.ERROR_HANDLER_EXCEPTION;
}
- sendReply(message, status, Optional.ofNullable(responsePayload));
+ connection.reply(message, status, Optional.ofNullable(responsePayload));
}));
}
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
checkPermission(CLUSTER_WRITE);
- handlers.put(type, message -> {
+ handlers.put(type, (message, connection) -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Status status;
if (error == null) {
@@ -322,7 +370,7 @@
log.debug("An error occurred in a message handler: {}", error);
status = Status.ERROR_HANDLER_EXCEPTION;
}
- sendReply(message, status, Optional.ofNullable(result));
+ connection.reply(message, status, Optional.ofNullable(result));
});
});
}
@@ -333,10 +381,26 @@
handlers.remove(type);
}
+ private Bootstrap bootstrapClient(Endpoint endpoint) {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
+ new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
+ bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+ bootstrap.group(clientGroup);
+ // TODO: Make this faster:
+ // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
+ bootstrap.channel(clientChannelClass);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
+ return bootstrap;
+ }
+
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(8 * 1024, 32 * 1024));
+ new WriteBufferWaterMark(8 * 1024, 32 * 1024));
b.option(ChannelOption.SO_RCVBUF, 1048576);
b.option(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
@@ -345,82 +409,41 @@
if (enableNettyTls) {
b.childHandler(new SslServerCommunicationChannelInitializer());
} else {
- b.childHandler(new OnosCommunicationChannelInitializer());
+ b.childHandler(new BasicChannelInitializer());
}
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
- b.bind(localEp.port()).sync().addListener(future -> {
+ b.bind(localEndpoint.port()).sync().addListener(future -> {
if (future.isSuccess()) {
- log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+ log.info("{} accepting incoming connections on port {}",
+ localEndpoint.host(), localEndpoint.port());
} else {
- log.warn("{} failed to bind to port {} due to {}", localEp.host(), localEp.port(), future.cause());
+ log.warn("{} failed to bind to port {} due to {}",
+ localEndpoint.host(), localEndpoint.port(), future.cause());
}
});
}
- private class OnosCommunicationChannelFactory
- implements KeyedPoolableObjectFactory<Endpoint, Connection> {
-
+ /**
+ * Channel pool handler.
+ */
+ private class ClientChannelPoolHandler extends AbstractChannelPoolHandler {
@Override
- public void activateObject(Endpoint endpoint, Connection connection)
- throws Exception {
- }
-
- @Override
- public void destroyObject(Endpoint ep, Connection connection) throws Exception {
- log.debug("Closing connection {} to {}", connection, ep);
- //Is this the right way to destroy?
- connection.destroy();
- }
-
- @Override
- public Connection makeObject(Endpoint ep) throws Exception {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
- bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
- bootstrap.group(clientGroup);
- // TODO: Make this faster:
- // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
- bootstrap.channel(clientChannelClass);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ public void channelCreated(Channel channel) throws Exception {
if (enableNettyTls) {
- bootstrap.handler(new SslClientCommunicationChannelInitializer());
+ new SslClientCommunicationChannelInitializer().initChannel((SocketChannel) channel);
} else {
- bootstrap.handler(new OnosCommunicationChannelInitializer());
+ new BasicChannelInitializer().initChannel((SocketChannel) channel);
}
- // Start the client.
- CompletableFuture<Channel> retFuture = new CompletableFuture<>();
- ChannelFuture f = bootstrap.connect(ep.host().toInetAddress(), ep.port());
-
- f.addListener(future -> {
- if (future.isSuccess()) {
- retFuture.complete(f.channel());
- } else {
- retFuture.completeExceptionally(future.cause());
- }
- });
- log.debug("Established a new connection to {}", ep);
- return new Connection(retFuture);
- }
-
- @Override
- public void passivateObject(Endpoint ep, Connection connection)
- throws Exception {
- }
-
- @Override
- public boolean validateObject(Endpoint ep, Connection connection) {
- return connection.validate();
}
}
+ /**
+ * Channel initializer for TLS servers.
+ */
private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@@ -454,8 +477,10 @@
}
}
+ /**
+ * Channel initializer for TLS clients.
+ */
private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@@ -488,8 +513,10 @@
}
}
- private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
+ /**
+ * Channel initializer for basic connections.
+ */
+ private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@@ -502,16 +529,27 @@
}
}
+ /**
+ * Channel inbound handler that dispatches messages to the appropriate handler.
+ */
@ChannelHandler.Sharable
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
- // Effectively SimpleChannelInboundHandler<InternalMessage>,
- // had to specify <Object> to avoid Class Loader not being able to find some classes.
+ // Effectively SimpleChannelInboundHandler<InternalMessage>,
+ // had to specify <Object> to avoid Class Loader not being able to find some classes.
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
InternalMessage message = (InternalMessage) rawMessage;
try {
- dispatchLocally(message);
+ if (message.isRequest()) {
+ RemoteServerConnection connection =
+ serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
+ connection.dispatch(message);
+ } else {
+ RemoteClientConnection connection =
+ clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
+ connection.dispatch(message);
+ }
} catch (RejectedExecutionException e) {
log.warn("Unable to dispatch message due to {}", e.getMessage());
}
@@ -520,6 +558,16 @@
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
+
+ RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
+ if (clientConnection != null) {
+ clientConnection.close();
+ }
+
+ RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
+ if (serverConnection != null) {
+ serverConnection.close();
+ }
context.close();
}
@@ -537,133 +585,320 @@
}
}
- private void dispatchLocally(InternalMessage message) throws IOException {
- if (message.preamble() != preamble) {
- log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
- sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
- }
- clockService.recordEventTime(message.time());
- String type = message.type();
- if (REPLY_MESSAGE_TYPE.equals(type)) {
- try {
- Callback callback =
- callbacks.getIfPresent(message.id());
- if (callback != null) {
- if (message.status() == Status.OK) {
- callback.complete(message.payload());
- } else if (message.status() == Status.ERROR_NO_HANDLER) {
- callback.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
- callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
- callback.completeExceptionally(new MessagingException.ProtocolException());
- }
- } else {
- log.debug("Received a reply for message id:[{}]. "
- + " from {}. But was unable to locate the"
- + " request handle", message.id(), message.sender());
- }
- } finally {
- callbacks.invalidate(message.id());
- }
- return;
- }
- Consumer<InternalMessage> handler = handlers.get(type);
- if (handler != null) {
- handler.accept(message);
- } else {
- log.debug("No handler for message type {} from {}", message.type(), message.sender());
- sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
- }
- }
-
- private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
- InternalMessage response = new InternalMessage(preamble,
- clockService.timeNow(),
- message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- responsePayload.orElse(new byte[0]),
- status);
- sendAsync(message.sender(), response).whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to respond", error);
- }
- });
- }
-
+ /**
+ * Wraps a {@link CompletableFuture} and tracks its type and creation time.
+ */
private final class Callback {
+ private final String type;
private final CompletableFuture<byte[]> future;
- private final Executor executor;
+ private final long time = System.currentTimeMillis();
- public Callback(CompletableFuture<byte[]> future, Executor executor) {
+ Callback(String type, CompletableFuture<byte[]> future) {
+ this.type = type;
this.future = future;
- this.executor = executor;
}
public void complete(byte[] value) {
- executor.execute(() -> future.complete(value));
+ future.complete(value);
}
public void completeExceptionally(Throwable error) {
- executor.execute(() -> future.completeExceptionally(error));
+ future.completeExceptionally(error);
}
}
- private final class Connection {
- private final CompletableFuture<Channel> internalFuture;
- public Connection(CompletableFuture<Channel> internalFuture) {
- this.internalFuture = internalFuture;
+ /**
+ * Represents the client side of a connection to a local or remote server.
+ */
+ private interface ClientConnection {
+
+ /**
+ * Sends a message to the other side of the connection.
+ *
+ * @param message the message to send
+ * @return a completable future to be completed once the message has been sent
+ */
+ CompletableFuture<Void> sendAsync(InternalMessage message);
+
+ /**
+ * Sends a message to the other side of the connection, awaiting a reply.
+ *
+ * @param message the message to send
+ * @return a completable future to be completed once a reply is received or the request times out
+ */
+ CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
+
+ /**
+ * Closes the connection.
+ */
+ default void close() {
+ }
+ }
+
+ /**
+ * Represents the server side of a connection.
+ */
+ private interface ServerConnection {
+
+ /**
+ * Sends a reply to the other side of the connection.
+ *
+ * @param message the message to which to reply
+ * @param status the reply status
+ * @param payload the response payload
+ */
+ void reply(InternalMessage message, Status status, Optional<byte[]> payload);
+
+ /**
+ * Closes the connection.
+ */
+ default void close() {
+ }
+ }
+
+ /**
+ * Local connection implementation.
+ */
+ private final class LocalClientConnection implements ClientConnection {
+ @Override
+ public CompletableFuture<Void> sendAsync(InternalMessage message) {
+ BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ if (handler != null) {
+ handler.accept(message, localServerConnection);
+ } else {
+ log.debug("No handler for message type {} from {}", message.type(), message.sender());
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ if (handler != null) {
+ handler.accept(message, new LocalServerConnection(future));
+ } else {
+ log.debug("No handler for message type {} from {}", message.type(), message.sender());
+ new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+ }
+ return future;
+ }
+ }
+
+ /**
+ * Local server connection.
+ */
+ private final class LocalServerConnection implements ServerConnection {
+ private final CompletableFuture<byte[]> future;
+
+ LocalServerConnection(CompletableFuture<byte[]> future) {
+ this.future = future;
+ }
+
+ @Override
+ public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
+ if (future != null) {
+ if (status == Status.OK) {
+ future.complete(payload.orElse(EMPTY_PAYLOAD));
+ } else if (status == Status.ERROR_NO_HANDLER) {
+ future.completeExceptionally(new MessagingException.NoRemoteHandler());
+ } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ } else if (status == Status.PROTOCOL_EXCEPTION) {
+ future.completeExceptionally(new MessagingException.ProtocolException());
+ }
+ }
+ }
+ }
+
+ /**
+ * Remote connection implementation.
+ */
+ private final class RemoteClientConnection implements ClientConnection {
+ private final Channel channel;
+ private final Map<Long, Callback> futures = Maps.newConcurrentMap();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ RemoteClientConnection(Channel channel) {
+ this.channel = channel;
}
/**
- * Sends a message out on its channel and associated the message with a
- * completable future used for signaling.
- * @param message the message to be sent
- * @param future a future that is completed normally or exceptionally if
- * message sending succeeds or fails respectively
+ * Times out callbacks for this connection.
*/
- public void send(Object message, CompletableFuture<Void> future) {
- internalFuture.whenComplete((channel, throwable) -> {
- if (throwable == null) {
- channel.writeAndFlush(message).addListener(channelFuture -> {
- if (!channelFuture.isSuccess()) {
- future.completeExceptionally(channelFuture.cause());
- } else {
- future.complete(null);
- }
- });
+ private void timeoutCallbacks() {
+ // Store the current time.
+ long currentTime = System.currentTimeMillis();
+
+ // Iterate through future callbacks and time out callbacks that have been alive
+ // longer than the current timeout according to the message type.
+ Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Callback callback = iterator.next().getValue();
+ try {
+ TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
+ long currentTimeout = timeoutHistory.currentTimeout;
+ if (currentTime - callback.time > currentTimeout) {
+ iterator.remove();
+ long elapsedTime = currentTime - callback.time;
+ timeoutHistory.addReplyTime(elapsedTime);
+ callback.completeExceptionally(
+ new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
+ }
+ } catch (ExecutionException e) {
+ throw new AssertionError();
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> sendAsync(InternalMessage message) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ channel.writeAndFlush(message).addListener(channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ future.completeExceptionally(channelFuture.cause());
} else {
- future.completeExceptionally(throwable);
+ future.complete(null);
}
});
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ Callback callback = new Callback(message.type(), future);
+ futures.put(message.id(), callback);
+ channel.writeAndFlush(message).addListener(channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ futures.remove(message.id());
+ callback.completeExceptionally(channelFuture.cause());
+ }
+ });
+ return future;
}
/**
- * Destroys a channel by closing its channel (if it exists) and
- * cancelling its future.
+ * Dispatches a message to a local handler.
+ *
+ * @param message the message to dispatch
*/
- public void destroy() {
- Channel channel = internalFuture.getNow(null);
- if (channel != null) {
- channel.close();
+ private void dispatch(InternalMessage message) {
+ if (message.preamble() != preamble) {
+ log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+ return;
}
- internalFuture.cancel(false);
+
+ clockService.recordEventTime(message.time());
+
+ Callback callback = futures.remove(message.id());
+ if (callback != null) {
+ if (message.status() == Status.OK) {
+ callback.complete(message.payload());
+ } else if (message.status() == Status.ERROR_NO_HANDLER) {
+ callback.completeExceptionally(new MessagingException.NoRemoteHandler());
+ } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+ callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+ callback.completeExceptionally(new MessagingException.ProtocolException());
+ }
+
+ try {
+ TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
+ timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
+ } catch (ExecutionException e) {
+ throw new AssertionError();
+ }
+ } else {
+ log.debug("Received a reply for message id:[{}]. "
+ + " from {}. But was unable to locate the"
+ + " request handle", message.id(), message.sender());
+ }
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ timeoutFuture.cancel(false);
+ for (Callback callback : futures.values()) {
+ callback.completeExceptionally(new ConnectException());
+ }
+ }
+ }
+ }
+
+ /**
+ * Remote server connection.
+ */
+ private final class RemoteServerConnection implements ServerConnection {
+ private final Channel channel;
+
+ RemoteServerConnection(Channel channel) {
+ this.channel = channel;
}
/**
- * Determines whether the connection is valid meaning it is either
- * complete with and active channel
- * or it has not yet completed.
- * @return true if the channel has an active connection or has not
- * yet completed
+ * Dispatches a message to a local handler.
+ *
+ * @param message the message to dispatch
*/
- public boolean validate() {
- if (internalFuture.isCompletedExceptionally()) {
- return false;
+ private void dispatch(InternalMessage message) {
+ if (message.preamble() != preamble) {
+ log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+ reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+ return;
}
- Channel channel = internalFuture.getNow(null);
- return channel == null || channel.isActive();
+
+ clockService.recordEventTime(message.time());
+
+ BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ if (handler != null) {
+ handler.accept(message, this);
+ } else {
+ log.debug("No handler for message type {} from {}", message.type(), message.sender());
+ reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+ }
+ }
+
+ @Override
+ public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
+ InternalMessage response = new InternalMessage(preamble,
+ clockService.timeNow(),
+ message.id(),
+ localEndpoint,
+ payload.orElse(EMPTY_PAYLOAD),
+ status);
+ channel.writeAndFlush(response);
+ }
+ }
+
+ /**
+ * Request-reply timeout history tracker.
+ */
+ private static final class TimeoutHistory {
+ private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
+ private final AtomicLong maxReplyTime = new AtomicLong();
+ private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
+
+ /**
+ * Adds a reply time to the history.
+ *
+ * @param replyTime the reply time to add to the history
+ */
+ void addReplyTime(long replyTime) {
+ maxReplyTime.getAndAccumulate(replyTime, Math::max);
+ }
+
+ /**
+ * Computes the current timeout.
+ */
+ private void recomputeTimeoutMillis() {
+ double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
+ timeoutHistory.addValue(
+ Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
+ if (timeoutHistory.getN() == WINDOW_SIZE) {
+ this.currentTimeout = (long) timeoutHistory.getMax();
+ }
}
}
}