[ONOS-6401] Implement dynamically computed timeouts for NettyMessagingManager

Change-Id: Ie2673603a2251983c9e0a166020b4feec041e84c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index 8b96c2a..0ab7c31 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -105,10 +105,19 @@
                            Endpoint sender,
                            String type,
                            byte[] payload) {
-        this(preamble, time, id, sender, type, payload, Status.OK);
+        this(preamble, time, id, sender, type, payload, null);
     }
 
     public InternalMessage(int preamble,
+            HybridLogicalTime time,
+            long id,
+            Endpoint sender,
+            byte[] payload,
+            Status status) {
+        this(preamble, time, id, sender, "", payload, status);
+    }
+
+    InternalMessage(int preamble,
                            HybridLogicalTime time,
                            long id,
                            Endpoint sender,
@@ -124,6 +133,14 @@
         this.status = status;
     }
 
+    public boolean isRequest() {
+        return status == null;
+    }
+
+    public boolean isReply() {
+        return status != null;
+    }
+
     public HybridLogicalTime time() {
         return time;
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index 4c88d4b..367193a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -88,7 +88,7 @@
             senderPort = buffer.readInt();
             checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
         case READ_MESSAGE_TYPE_LENGTH:
-            messageTypeLength = buffer.readInt();
+            messageTypeLength = buffer.readShort();
             checkpoint(DecoderState.READ_MESSAGE_TYPE);
         case READ_MESSAGE_TYPE:
             byte[] messageTypeBytes = new byte[messageTypeLength];
@@ -96,7 +96,12 @@
             messageType = new String(messageTypeBytes, Charsets.UTF_8);
             checkpoint(DecoderState.READ_MESSAGE_STATUS);
         case READ_MESSAGE_STATUS:
-            status = Status.forId(buffer.readInt());
+            int statusId = buffer.readByte();
+            if (statusId == -1) {
+                status = null;
+            } else {
+                status = Status.forId(statusId);
+            }
             checkpoint(DecoderState.READ_CONTENT_LENGTH);
         case READ_CONTENT_LENGTH:
             contentLength = buffer.readInt();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
index 1fcc5cc..6ec50a3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
@@ -79,13 +79,18 @@
         byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
 
         // write length of message type
-        out.writeInt(messageTypeBytes.length);
+        out.writeShort(messageTypeBytes.length);
 
         // write message type bytes
         out.writeBytes(messageTypeBytes);
 
         // write message status value
-        out.writeInt(message.status().id());
+        InternalMessage.Status status = message.status();
+        if (status == null) {
+            out.writeByte(-1);
+        } else {
+            out.writeByte(status.id());
+        }
 
         byte[] payload = message.payload();
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index f3b028c..6200947 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -18,15 +18,13 @@
 import com.google.common.base.Strings;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
@@ -39,18 +37,23 @@
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.pool.AbstractChannelPoolHandler;
+import io.netty.channel.pool.AbstractChannelPoolMap;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolMap;
+import io.netty.channel.pool.SimpleChannelPool;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import io.netty.util.concurrent.FutureListener;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.core.HybridLogicalClockService;
@@ -67,21 +70,27 @@
 import javax.net.ssl.TrustManagerFactory;
 
 import java.io.FileInputStream;
-import java.io.IOException;
+import java.net.ConnectException;
 import java.security.KeyStore;
+import java.time.Duration;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
-import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.security.AppGuard.checkPermission;
@@ -93,41 +102,51 @@
 @Component(immediate = true)
 @Service
 public class NettyMessagingManager implements MessagingService {
-
-    private static final int REPLY_TIME_OUT_MILLIS = 500;
+    private static final long DEFAULT_TIMEOUT_MILLIS = 500;
+    private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(10).toMillis();
+    private static final long MIN_TIMEOUT_MILLIS = 100;
+    private static final long MAX_TIMEOUT_MILLIS = 5000;
+    private static final long TIMEOUT_INTERVAL = 50;
+    private static final int WINDOW_SIZE = 100;
+    private static final double TIMEOUT_MULTIPLIER = 2.5;
     private static final short MIN_KS_LENGTH = 6;
 
+    private static final byte[] EMPTY_PAYLOAD = new byte[0];
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+    private final ClientConnection localClientConnection = new LocalClientConnection();
+    private final ServerConnection localServerConnection = new LocalServerConnection(null);
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected HybridLogicalClockService clockService;
 
-    private Endpoint localEp;
+    private Endpoint localEndpoint;
     private int preamble;
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+    private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
+    private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
+    private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
-    private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
-            .expireAfterWrite(REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS)
-            .removalListener(new RemovalListener<Long, Callback>() {
-                @Override
-                public void onRemoval(RemovalNotification<Long, Callback> entry) {
-                    if (entry.wasEvicted()) {
-                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
-                    }
-                }
-            })
-            .build();
 
-    private final GenericKeyedObjectPool<Endpoint, Connection> channels
-            = new GenericKeyedObjectPool<>(new OnosCommunicationChannelFactory());
+    private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
+            .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
+            .build();
+    private ScheduledFuture<?> timeoutFuture;
+
+    private final ChannelPoolMap<Endpoint, SimpleChannelPool> channels =
+            new AbstractChannelPoolMap<Endpoint, SimpleChannelPool>() {
+                @Override
+                protected SimpleChannelPool newPool(Endpoint endpoint) {
+                    return new SimpleChannelPool(bootstrapClient(endpoint), new ClientChannelPoolHandler());
+                }
+            };
 
     private EventLoopGroup serverGroup;
     private EventLoopGroup clientGroup;
     private Class<? extends ServerChannel> serverChannelClass;
     private Class<? extends Channel> clientChannelClass;
+    private ScheduledExecutorService timeoutExecutor;
 
     protected static final boolean TLS_DISABLED = false;
     protected boolean enableNettyTls = TLS_DISABLED;
@@ -146,29 +165,28 @@
         getTlsParameters();
 
         if (started.get()) {
-            log.warn("Already running at local endpoint: {}", localEp);
+            log.warn("Already running at local endpoint: {}", localEndpoint);
             return;
         }
         this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
-        this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
-        channels.setLifo(true);
-        channels.setTestOnBorrow(true);
-        channels.setTestOnReturn(true);
-        channels.setMinEvictableIdleTimeMillis(60_000L);
-        channels.setTimeBetweenEvictionRunsMillis(30_000L);
+        this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
         initEventLoopGroup();
         startAcceptingConnections();
+        timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("NettyMessagingEvt", "timeout", log));
+        timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
+                this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
         started.set(true);
-        serverGroup.scheduleWithFixedDelay(callbacks::cleanUp, 0, REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() throws Exception {
         if (started.get()) {
-            channels.close();
             serverGroup.shutdownGracefully();
             clientGroup.shutdownGracefully();
+            timeoutFuture.cancel(false);
+            timeoutExecutor.shutdown();
             started.set(false);
         }
         log.info("Stopped");
@@ -201,6 +219,7 @@
             }
         }
     }
+
     private void initEventLoopGroup() {
         // try Epoll first and if that does work, use nio.
         try {
@@ -211,7 +230,7 @@
             return;
         } catch (Throwable e) {
             log.debug("Failed to initialize native (epoll) transport. "
-                              + "Reason: {}. Proceeding with nio.", e.getMessage());
+                    + "Reason: {}. Proceeding with nio.", e.getMessage());
         }
         clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
         serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
@@ -219,44 +238,31 @@
         clientChannelClass = NioSocketChannel.class;
     }
 
+    /**
+     * Times out response callbacks.
+     */
+    private void timeoutAllCallbacks() {
+        // Iterate through all connections and time out callbacks.
+        for (RemoteClientConnection connection : clientConnections.values()) {
+            connection.timeoutCallbacks();
+        }
+
+        // Iterate through all timeout histories and recompute the timeout.
+        for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
+            timeoutHistory.recomputeTimeoutMillis();
+        }
+    }
+
     @Override
     public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
         checkPermission(CLUSTER_WRITE);
         InternalMessage message = new InternalMessage(preamble,
-                                                      clockService.timeNow(),
-                                                      messageIdGenerator.incrementAndGet(),
-                                                      localEp,
-                                                      type,
-                                                      payload);
-        return sendAsync(ep, message);
-    }
-
-    protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
-        checkPermission(CLUSTER_WRITE);
-        if (ep.equals(localEp)) {
-            try {
-                dispatchLocally(message);
-            } catch (IOException e) {
-                return Tools.exceptionalFuture(e);
-            }
-            return CompletableFuture.completedFuture(null);
-        }
-
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        try {
-            Connection connection = null;
-            try {
-                connection = channels.borrowObject(ep);
-                connection.send(message, future);
-            } finally {
-                if (connection != null) {
-                    channels.returnObject(ep, connection);
-                }
-            }
-        } catch (Exception e) {
-            future.completeExceptionally(e);
-        }
-        return future;
+                clockService.timeNow(),
+                messageIdGenerator.incrementAndGet(),
+                localEndpoint,
+                type,
+                payload);
+        return executeOnPooledConnection(ep, c -> c.sendAsync(message), MoreExecutors.directExecutor());
     }
 
     @Override
@@ -268,21 +274,62 @@
     @Override
     public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
         checkPermission(CLUSTER_WRITE);
-        CompletableFuture<byte[]> future = new CompletableFuture<>();
-        Callback callback = new Callback(future, executor);
         Long messageId = messageIdGenerator.incrementAndGet();
-        callbacks.put(messageId, callback);
         InternalMessage message = new InternalMessage(preamble,
-                                                      clockService.timeNow(),
-                                                      messageId,
-                                                      localEp,
-                                                      type,
-                                                      payload);
+                clockService.timeNow(),
+                messageId,
+                localEndpoint,
+                type,
+                payload);
+        return executeOnPooledConnection(ep, c -> c.sendAndReceive(message), executor);
+    }
 
-        sendAsync(ep, message).whenComplete((response, error) -> {
-            if (error != null) {
-                callbacks.invalidate(messageId);
-                callback.completeExceptionally(error);
+    /**
+     * Executes the given callback on a pooled connection.
+     *
+     * @param endpoint the endpoint to which to send a message
+     * @param callback the callback to execute to send the message
+     * @param <T> the send result type
+     * @return a completable future to be completed with the result of the supplied function
+     */
+    private <T> CompletableFuture<T> executeOnPooledConnection(
+            Endpoint endpoint,
+            Function<ClientConnection, CompletableFuture<T>> callback,
+            Executor executor) {
+        if (endpoint.equals(localEndpoint)) {
+            CompletableFuture<T> future = new CompletableFuture<>();
+            callback.apply(localClientConnection).whenComplete((result, error) -> {
+               if (error == null) {
+                   executor.execute(() -> future.complete(result));
+               } else {
+                   executor.execute(() -> future.completeExceptionally(error));
+               }
+            });
+            return future;
+        }
+
+        CompletableFuture<T> future = new CompletableFuture<>();
+        ChannelPool pool = channels.get(endpoint);
+        pool.acquire().addListener((FutureListener<Channel>) channelResult -> {
+            if (channelResult.isSuccess()) {
+                Channel channel = channelResult.getNow();
+                ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
+                callback.apply(connection).whenComplete((result, error) -> {
+                    pool.release(channel).addListener(releaseResult -> {
+                        if (!releaseResult.isSuccess()) {
+                            clientConnections.remove(channel);
+                            connection.close();
+                        }
+                    });
+
+                    if (error == null) {
+                        executor.execute(() -> future.complete(result));
+                    } else {
+                        executor.execute(() -> future.completeExceptionally(error));
+                    }
+                });
+            } else {
+                executor.execute(() -> future.completeExceptionally(channelResult.cause()));
             }
         });
         return future;
@@ -291,13 +338,14 @@
     @Override
     public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
         checkPermission(CLUSTER_WRITE);
-        handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
+        handlers.put(type, (message, connection) -> executor.execute(() ->
+                handler.accept(message.sender(), message.payload())));
     }
 
     @Override
     public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
         checkPermission(CLUSTER_WRITE);
-        handlers.put(type, message -> executor.execute(() -> {
+        handlers.put(type, (message, connection) -> executor.execute(() -> {
             byte[] responsePayload = null;
             Status status = Status.OK;
             try {
@@ -306,14 +354,14 @@
                 log.debug("An error occurred in a message handler: {}", e);
                 status = Status.ERROR_HANDLER_EXCEPTION;
             }
-            sendReply(message, status, Optional.ofNullable(responsePayload));
+            connection.reply(message, status, Optional.ofNullable(responsePayload));
         }));
     }
 
     @Override
     public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
         checkPermission(CLUSTER_WRITE);
-        handlers.put(type, message -> {
+        handlers.put(type, (message, connection) -> {
             handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
                 Status status;
                 if (error == null) {
@@ -322,7 +370,7 @@
                     log.debug("An error occurred in a message handler: {}", error);
                     status = Status.ERROR_HANDLER_EXCEPTION;
                 }
-                sendReply(message, status, Optional.ofNullable(result));
+                connection.reply(message, status, Optional.ofNullable(result));
             });
         });
     }
@@ -333,10 +381,26 @@
         handlers.remove(type);
     }
 
+    private Bootstrap bootstrapClient(Endpoint endpoint) {
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
+                new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
+        bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+        bootstrap.group(clientGroup);
+        // TODO: Make this faster:
+        // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
+        bootstrap.channel(clientChannelClass);
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+        bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
+        return bootstrap;
+    }
+
     private void startAcceptingConnections() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
-                      new WriteBufferWaterMark(8 * 1024, 32 * 1024));
+                new WriteBufferWaterMark(8 * 1024, 32 * 1024));
         b.option(ChannelOption.SO_RCVBUF, 1048576);
         b.option(ChannelOption.TCP_NODELAY, true);
         b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
@@ -345,82 +409,41 @@
         if (enableNettyTls) {
             b.childHandler(new SslServerCommunicationChannelInitializer());
         } else {
-            b.childHandler(new OnosCommunicationChannelInitializer());
+            b.childHandler(new BasicChannelInitializer());
         }
         b.option(ChannelOption.SO_BACKLOG, 128);
         b.childOption(ChannelOption.SO_KEEPALIVE, true);
 
         // Bind and start to accept incoming connections.
-        b.bind(localEp.port()).sync().addListener(future -> {
+        b.bind(localEndpoint.port()).sync().addListener(future -> {
             if (future.isSuccess()) {
-                log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+                log.info("{} accepting incoming connections on port {}",
+                        localEndpoint.host(), localEndpoint.port());
             } else {
-                log.warn("{} failed to bind to port {} due to {}", localEp.host(), localEp.port(), future.cause());
+                log.warn("{} failed to bind to port {} due to {}",
+                        localEndpoint.host(), localEndpoint.port(), future.cause());
             }
         });
     }
 
-    private class OnosCommunicationChannelFactory
-            implements KeyedPoolableObjectFactory<Endpoint, Connection> {
-
+    /**
+     * Channel pool handler.
+     */
+    private class ClientChannelPoolHandler extends AbstractChannelPoolHandler {
         @Override
-        public void activateObject(Endpoint endpoint, Connection connection)
-                throws Exception {
-        }
-
-        @Override
-        public void destroyObject(Endpoint ep, Connection connection) throws Exception {
-            log.debug("Closing connection {} to {}", connection, ep);
-            //Is this the right way to destroy?
-            connection.destroy();
-        }
-
-        @Override
-        public Connection makeObject(Endpoint ep) throws Exception {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
-                          new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
-            bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
-            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
-            bootstrap.group(clientGroup);
-            // TODO: Make this faster:
-            // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
-            bootstrap.channel(clientChannelClass);
-            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+        public void channelCreated(Channel channel) throws Exception {
             if (enableNettyTls) {
-                bootstrap.handler(new SslClientCommunicationChannelInitializer());
+                new SslClientCommunicationChannelInitializer().initChannel((SocketChannel) channel);
             } else {
-                bootstrap.handler(new OnosCommunicationChannelInitializer());
+                new BasicChannelInitializer().initChannel((SocketChannel) channel);
             }
-            // Start the client.
-            CompletableFuture<Channel> retFuture = new CompletableFuture<>();
-            ChannelFuture f = bootstrap.connect(ep.host().toInetAddress(), ep.port());
-
-            f.addListener(future -> {
-                if (future.isSuccess()) {
-                    retFuture.complete(f.channel());
-                } else {
-                    retFuture.completeExceptionally(future.cause());
-                }
-            });
-            log.debug("Established a new connection to {}", ep);
-            return new Connection(retFuture);
-        }
-
-        @Override
-        public void passivateObject(Endpoint ep, Connection connection)
-                throws Exception {
-        }
-
-        @Override
-        public boolean validateObject(Endpoint ep, Connection connection) {
-            return connection.validate();
         }
     }
 
+    /**
+     * Channel initializer for TLS servers.
+     */
     private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
         private final ChannelHandler encoder = new MessageEncoder(preamble);
 
@@ -454,8 +477,10 @@
         }
     }
 
+    /**
+     * Channel initializer for TLS clients.
+     */
     private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
         private final ChannelHandler encoder = new MessageEncoder(preamble);
 
@@ -488,8 +513,10 @@
         }
     }
 
-    private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
+    /**
+     * Channel initializer for basic connections.
+     */
+    private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
         private final ChannelHandler encoder = new MessageEncoder(preamble);
 
@@ -502,16 +529,27 @@
         }
     }
 
+    /**
+     * Channel inbound handler that dispatches messages to the appropriate handler.
+     */
     @ChannelHandler.Sharable
     private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
-     // Effectively SimpleChannelInboundHandler<InternalMessage>,
-     // had to specify <Object> to avoid Class Loader not being able to find some classes.
+        // Effectively SimpleChannelInboundHandler<InternalMessage>,
+        // had to specify <Object> to avoid Class Loader not being able to find some classes.
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
             InternalMessage message = (InternalMessage) rawMessage;
             try {
-                dispatchLocally(message);
+                if (message.isRequest()) {
+                    RemoteServerConnection connection =
+                            serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
+                    connection.dispatch(message);
+                } else {
+                    RemoteClientConnection connection =
+                            clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
+                    connection.dispatch(message);
+                }
             } catch (RejectedExecutionException e) {
                 log.warn("Unable to dispatch message due to {}", e.getMessage());
             }
@@ -520,6 +558,16 @@
         @Override
         public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
             log.error("Exception inside channel handling pipeline.", cause);
+
+            RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
+            if (clientConnection != null) {
+                clientConnection.close();
+            }
+
+            RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
+            if (serverConnection != null) {
+                serverConnection.close();
+            }
             context.close();
         }
 
@@ -537,133 +585,320 @@
         }
     }
 
-    private void dispatchLocally(InternalMessage message) throws IOException {
-        if (message.preamble() != preamble) {
-            log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
-            sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
-        }
-        clockService.recordEventTime(message.time());
-        String type = message.type();
-        if (REPLY_MESSAGE_TYPE.equals(type)) {
-            try {
-                Callback callback =
-                        callbacks.getIfPresent(message.id());
-                if (callback != null) {
-                    if (message.status() == Status.OK) {
-                        callback.complete(message.payload());
-                    } else if (message.status() == Status.ERROR_NO_HANDLER) {
-                        callback.completeExceptionally(new MessagingException.NoRemoteHandler());
-                    } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
-                        callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
-                    } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
-                        callback.completeExceptionally(new MessagingException.ProtocolException());
-                    }
-                } else {
-                    log.debug("Received a reply for message id:[{}]. "
-                                     + " from {}. But was unable to locate the"
-                                     + " request handle", message.id(), message.sender());
-                }
-            } finally {
-                callbacks.invalidate(message.id());
-            }
-            return;
-        }
-        Consumer<InternalMessage> handler = handlers.get(type);
-        if (handler != null) {
-            handler.accept(message);
-        } else {
-            log.debug("No handler for message type {} from {}", message.type(), message.sender());
-            sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
-        }
-    }
-
-    private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
-        InternalMessage response = new InternalMessage(preamble,
-                clockService.timeNow(),
-                message.id(),
-                localEp,
-                REPLY_MESSAGE_TYPE,
-                responsePayload.orElse(new byte[0]),
-                status);
-        sendAsync(message.sender(), response).whenComplete((result, error) -> {
-            if (error != null) {
-                log.debug("Failed to respond", error);
-            }
-        });
-    }
-
+    /**
+     * Wraps a {@link CompletableFuture} and tracks its type and creation time.
+     */
     private final class Callback {
+        private final String type;
         private final CompletableFuture<byte[]> future;
-        private final Executor executor;
+        private final long time = System.currentTimeMillis();
 
-        public Callback(CompletableFuture<byte[]> future, Executor executor) {
+        Callback(String type, CompletableFuture<byte[]> future) {
+            this.type = type;
             this.future = future;
-            this.executor = executor;
         }
 
         public void complete(byte[] value) {
-            executor.execute(() -> future.complete(value));
+            future.complete(value);
         }
 
         public void completeExceptionally(Throwable error) {
-            executor.execute(() -> future.completeExceptionally(error));
+            future.completeExceptionally(error);
         }
     }
-    private final class Connection {
-        private final CompletableFuture<Channel> internalFuture;
 
-        public Connection(CompletableFuture<Channel> internalFuture) {
-            this.internalFuture = internalFuture;
+    /**
+     * Represents the client side of a connection to a local or remote server.
+     */
+    private interface ClientConnection {
+
+        /**
+         * Sends a message to the other side of the connection.
+         *
+         * @param message the message to send
+         * @return a completable future to be completed once the message has been sent
+         */
+        CompletableFuture<Void> sendAsync(InternalMessage message);
+
+        /**
+         * Sends a message to the other side of the connection, awaiting a reply.
+         *
+         * @param message the message to send
+         * @return a completable future to be completed once a reply is received or the request times out
+         */
+        CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
+
+        /**
+         * Closes the connection.
+         */
+        default void close() {
+        }
+    }
+
+    /**
+     * Represents the server side of a connection.
+     */
+    private interface ServerConnection {
+
+        /**
+         * Sends a reply to the other side of the connection.
+         *
+         * @param message the message to which to reply
+         * @param status the reply status
+         * @param payload the response payload
+         */
+        void reply(InternalMessage message, Status status, Optional<byte[]> payload);
+
+        /**
+         * Closes the connection.
+         */
+        default void close() {
+        }
+    }
+
+    /**
+     * Local connection implementation.
+     */
+    private final class LocalClientConnection implements ClientConnection {
+        @Override
+        public CompletableFuture<Void> sendAsync(InternalMessage message) {
+            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+            if (handler != null) {
+                handler.accept(message, localServerConnection);
+            } else {
+                log.debug("No handler for message type {} from {}", message.type(), message.sender());
+            }
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+            CompletableFuture<byte[]> future = new CompletableFuture<>();
+            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+            if (handler != null) {
+                handler.accept(message, new LocalServerConnection(future));
+            } else {
+                log.debug("No handler for message type {} from {}", message.type(), message.sender());
+                new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+            }
+            return future;
+        }
+    }
+
+    /**
+     * Local server connection.
+     */
+    private final class LocalServerConnection implements ServerConnection {
+        private final CompletableFuture<byte[]> future;
+
+        LocalServerConnection(CompletableFuture<byte[]> future) {
+            this.future = future;
+        }
+
+        @Override
+        public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
+            if (future != null) {
+                if (status == Status.OK) {
+                    future.complete(payload.orElse(EMPTY_PAYLOAD));
+                } else if (status == Status.ERROR_NO_HANDLER) {
+                    future.completeExceptionally(new MessagingException.NoRemoteHandler());
+                } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
+                    future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+                } else if (status == Status.PROTOCOL_EXCEPTION) {
+                    future.completeExceptionally(new MessagingException.ProtocolException());
+                }
+            }
+        }
+    }
+
+    /**
+     * Remote connection implementation.
+     */
+    private final class RemoteClientConnection implements ClientConnection {
+        private final Channel channel;
+        private final Map<Long, Callback> futures = Maps.newConcurrentMap();
+        private final AtomicBoolean closed = new AtomicBoolean(false);
+
+        RemoteClientConnection(Channel channel) {
+            this.channel = channel;
         }
 
         /**
-         * Sends a message out on its channel and associated the message with a
-         * completable future used for signaling.
-         * @param message the message to be sent
-         * @param future a future that is completed normally or exceptionally if
-         *               message sending succeeds or fails respectively
+         * Times out callbacks for this connection.
          */
-        public void send(Object message, CompletableFuture<Void> future) {
-            internalFuture.whenComplete((channel, throwable) -> {
-                if (throwable == null) {
-                    channel.writeAndFlush(message).addListener(channelFuture -> {
-                        if (!channelFuture.isSuccess()) {
-                            future.completeExceptionally(channelFuture.cause());
-                        } else {
-                            future.complete(null);
-                        }
-                    });
+        private void timeoutCallbacks() {
+            // Store the current time.
+            long currentTime = System.currentTimeMillis();
+
+            // Iterate through future callbacks and time out callbacks that have been alive
+            // longer than the current timeout according to the message type.
+            Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Callback callback = iterator.next().getValue();
+                try {
+                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
+                    long currentTimeout = timeoutHistory.currentTimeout;
+                    if (currentTime - callback.time > currentTimeout) {
+                        iterator.remove();
+                        long elapsedTime = currentTime - callback.time;
+                        timeoutHistory.addReplyTime(elapsedTime);
+                        callback.completeExceptionally(
+                                new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
+                    }
+                } catch (ExecutionException e) {
+                    throw new AssertionError();
+                }
+            }
+        }
+
+        @Override
+        public CompletableFuture<Void> sendAsync(InternalMessage message) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            channel.writeAndFlush(message).addListener(channelFuture -> {
+                if (!channelFuture.isSuccess()) {
+                    future.completeExceptionally(channelFuture.cause());
                 } else {
-                    future.completeExceptionally(throwable);
+                    future.complete(null);
                 }
             });
+            return future;
+        }
+
+        @Override
+        public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+            CompletableFuture<byte[]> future = new CompletableFuture<>();
+            Callback callback = new Callback(message.type(), future);
+            futures.put(message.id(), callback);
+            channel.writeAndFlush(message).addListener(channelFuture -> {
+                if (!channelFuture.isSuccess()) {
+                    futures.remove(message.id());
+                    callback.completeExceptionally(channelFuture.cause());
+                }
+            });
+            return future;
         }
 
         /**
-         * Destroys a channel by closing its channel (if it exists) and
-         * cancelling its future.
+         * Dispatches a message to a local handler.
+         *
+         * @param message the message to dispatch
          */
-        public void destroy() {
-            Channel channel = internalFuture.getNow(null);
-            if (channel != null) {
-                channel.close();
+        private void dispatch(InternalMessage message) {
+            if (message.preamble() != preamble) {
+                log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+                return;
             }
-            internalFuture.cancel(false);
+
+            clockService.recordEventTime(message.time());
+
+            Callback callback = futures.remove(message.id());
+            if (callback != null) {
+                if (message.status() == Status.OK) {
+                    callback.complete(message.payload());
+                } else if (message.status() == Status.ERROR_NO_HANDLER) {
+                    callback.completeExceptionally(new MessagingException.NoRemoteHandler());
+                } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+                    callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+                } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+                    callback.completeExceptionally(new MessagingException.ProtocolException());
+                }
+
+                try {
+                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
+                    timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
+                } catch (ExecutionException e) {
+                    throw new AssertionError();
+                }
+            } else {
+                log.debug("Received a reply for message id:[{}]. "
+                        + " from {}. But was unable to locate the"
+                        + " request handle", message.id(), message.sender());
+            }
+        }
+
+        @Override
+        public void close() {
+            if (closed.compareAndSet(false, true)) {
+                timeoutFuture.cancel(false);
+                for (Callback callback : futures.values()) {
+                    callback.completeExceptionally(new ConnectException());
+                }
+            }
+        }
+    }
+
+    /**
+     * Remote server connection.
+     */
+    private final class RemoteServerConnection implements ServerConnection {
+        private final Channel channel;
+
+        RemoteServerConnection(Channel channel) {
+            this.channel = channel;
         }
 
         /**
-         * Determines whether the connection is valid meaning it is either
-         * complete with and active channel
-         * or it has not yet completed.
-         * @return true if the channel has an active connection or has not
-         * yet completed
+         * Dispatches a message to a local handler.
+         *
+         * @param message the message to dispatch
          */
-        public boolean validate() {
-            if (internalFuture.isCompletedExceptionally()) {
-                return false;
+        private void dispatch(InternalMessage message) {
+            if (message.preamble() != preamble) {
+                log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+                reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+                return;
             }
-            Channel channel = internalFuture.getNow(null);
-            return channel == null || channel.isActive();
+
+            clockService.recordEventTime(message.time());
+
+            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+            if (handler != null) {
+                handler.accept(message, this);
+            } else {
+                log.debug("No handler for message type {} from {}", message.type(), message.sender());
+                reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+            }
+        }
+
+        @Override
+        public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
+            InternalMessage response = new InternalMessage(preamble,
+                    clockService.timeNow(),
+                    message.id(),
+                    localEndpoint,
+                    payload.orElse(EMPTY_PAYLOAD),
+                    status);
+            channel.writeAndFlush(response);
+        }
+    }
+
+    /**
+     * Request-reply timeout history tracker.
+     */
+    private static final class TimeoutHistory {
+        private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
+        private final AtomicLong maxReplyTime = new AtomicLong();
+        private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
+
+        /**
+         * Adds a reply time to the history.
+         *
+         * @param replyTime the reply time to add to the history
+         */
+        void addReplyTime(long replyTime) {
+            maxReplyTime.getAndAccumulate(replyTime, Math::max);
+        }
+
+        /**
+         * Computes the current timeout.
+         */
+        private void recomputeTimeoutMillis() {
+            double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
+            timeoutHistory.addValue(
+                    Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
+            if (timeoutHistory.getN() == WINDOW_SIZE) {
+                this.currentTimeout = (long) timeoutHistory.getMax();
+            }
         }
     }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
index 9c59d65..2129c4b 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
@@ -37,9 +37,11 @@
 import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -157,6 +159,20 @@
         assertEquals(ep1, sender.get());
     }
 
+    @Test
+    public void testSendTimeout() {
+        String subject = nextSubject();
+        BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>();
+        netty2.registerHandler(subject, handler);
+
+        try {
+            netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
+            fail();
+        } catch (CompletionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+    }
+
     /*
      * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
      * and response completion occurs on the expected thread.