Preserve ordering of messages in NettyMessagingManager to simplify distributed systems protocols and improve performance.

Change-Id: I8797f57c7f136dccb14c772faec515274462de51
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 193e81a..d9ed9c3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -15,58 +15,10 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.pool.AbstractChannelPoolHandler;
-import io.netty.channel.pool.AbstractChannelPoolMap;
-import io.netty.channel.pool.ChannelPool;
-import io.netty.channel.pool.ChannelPoolMap;
-import io.netty.channel.pool.SimpleChannelPool;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.FutureListener;
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
-import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.core.HybridLogicalClockService;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManagerFactory;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -76,8 +28,10 @@
 import java.security.MessageDigest;
 import java.security.cert.Certificate;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.StringJoiner;
@@ -97,6 +51,49 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterMetadataService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.core.HybridLogicalClockService;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingException;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@@ -108,13 +105,13 @@
 @Service
 public class NettyMessagingManager implements MessagingService {
     private static final long DEFAULT_TIMEOUT_MILLIS = 500;
-    private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(10).toMillis();
+    private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
     private static final long MIN_TIMEOUT_MILLIS = 100;
     private static final long MAX_TIMEOUT_MILLIS = 5000;
     private static final long TIMEOUT_INTERVAL = 50;
     private static final int WINDOW_SIZE = 100;
     private static final double TIMEOUT_MULTIPLIER = 2.5;
-    private static final short MIN_KS_LENGTH = 6;
+    private static final int CHANNEL_POOL_SIZE = 8;
 
     private static final byte[] EMPTY_PAYLOAD = new byte[0];
 
@@ -140,18 +137,9 @@
     private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
 
-    private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
-            .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
-            .build();
     private ScheduledFuture<?> timeoutFuture;
 
-    private final ChannelPoolMap<Endpoint, SimpleChannelPool> channels =
-            new AbstractChannelPoolMap<Endpoint, SimpleChannelPool>() {
-                @Override
-                protected SimpleChannelPool newPool(Endpoint endpoint) {
-                    return new SimpleChannelPool(bootstrapClient(endpoint), new ClientChannelPoolHandler());
-                }
-            };
+    private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
 
     private EventLoopGroup serverGroup;
     private EventLoopGroup clientGroup;
@@ -303,11 +291,6 @@
         for (RemoteClientConnection connection : clientConnections.values()) {
             connection.timeoutCallbacks();
         }
-
-        // Iterate through all timeout histories and recompute the timeout.
-        for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
-            timeoutHistory.recomputeTimeoutMillis();
-        }
     }
 
     @Override
@@ -319,7 +302,7 @@
                 localEndpoint,
                 type,
                 payload);
-        return executeOnPooledConnection(ep, c -> c.sendAsync(message), MoreExecutors.directExecutor());
+        return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
     }
 
     @Override
@@ -338,23 +321,91 @@
                 localEndpoint,
                 type,
                 payload);
-        return executeOnPooledConnection(ep, c -> c.sendAndReceive(message), executor);
+        return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
     }
 
-    /**
-     * Executes the given callback on a pooled connection.
-     *
-     * @param endpoint the endpoint to which to send a message
-     * @param callback the callback to execute to send the message
-     * @param <T> the send result type
-     * @return a completable future to be completed with the result of the supplied function
-     */
+    private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
+        return channels.computeIfAbsent(endpoint, e -> {
+            List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
+            for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
+                defaultList.add(null);
+            }
+            return Lists.newCopyOnWriteArrayList(defaultList);
+        });
+    }
+
+    private int getChannelOffset(String messageType) {
+        return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
+    }
+
+    private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
+        List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
+        int offset = getChannelOffset(messageType);
+
+        CompletableFuture<Channel> channelFuture = channelPool.get(offset);
+        if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
+            synchronized (channelPool) {
+                channelFuture = channelPool.get(offset);
+                if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
+                    channelFuture = openChannel(endpoint);
+                    channelPool.set(offset, channelFuture);
+                }
+            }
+        }
+
+        CompletableFuture<Channel> future = new CompletableFuture<>();
+        final CompletableFuture<Channel> finalFuture = channelFuture;
+        finalFuture.whenComplete((channel, error) -> {
+            if (error == null) {
+                if (!channel.isActive()) {
+                    synchronized (channelPool) {
+                        CompletableFuture<Channel> currentFuture = channelPool.get(offset);
+                        if (currentFuture == finalFuture) {
+                            channelPool.set(offset, null);
+                            getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
+                                if (recursiveError == null) {
+                                    future.complete(recursiveResult);
+                                } else {
+                                    future.completeExceptionally(recursiveError);
+                                }
+                            });
+                        } else {
+                            currentFuture.whenComplete((recursiveResult, recursiveError) -> {
+                                if (recursiveError == null) {
+                                    future.complete(recursiveResult);
+                                } else {
+                                    future.completeExceptionally(recursiveError);
+                                }
+                            });
+                        }
+                    }
+                } else {
+                    future.complete(channel);
+                }
+            } else {
+                future.completeExceptionally(error);
+            }
+        });
+        return future;
+    }
+
     private <T> CompletableFuture<T> executeOnPooledConnection(
             Endpoint endpoint,
+            String type,
             Function<ClientConnection, CompletableFuture<T>> callback,
             Executor executor) {
+        CompletableFuture<T> future = new CompletableFuture<T>();
+        executeOnPooledConnection(endpoint, type, callback, executor, future);
+        return future;
+    }
+
+    private <T> void executeOnPooledConnection(
+            Endpoint endpoint,
+            String type,
+            Function<ClientConnection, CompletableFuture<T>> callback,
+            Executor executor,
+            CompletableFuture<T> future) {
         if (endpoint.equals(localEndpoint)) {
-            CompletableFuture<T> future = new CompletableFuture<>();
             callback.apply(localClientConnection).whenComplete((result, error) -> {
                if (error == null) {
                    executor.execute(() -> future.complete(result));
@@ -362,34 +413,23 @@
                    executor.execute(() -> future.completeExceptionally(error));
                }
             });
-            return future;
+            return;
         }
 
-        CompletableFuture<T> future = new CompletableFuture<>();
-        ChannelPool pool = channels.get(endpoint);
-        pool.acquire().addListener((FutureListener<Channel>) channelResult -> {
-            if (channelResult.isSuccess()) {
-                Channel channel = channelResult.getNow();
+        getChannel(endpoint, type).whenComplete((channel, channelError) -> {
+            if (channelError == null) {
                 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
-                callback.apply(connection).whenComplete((result, error) -> {
-                    pool.release(channel).addListener(releaseResult -> {
-                        if (!releaseResult.isSuccess()) {
-                            clientConnections.remove(channel);
-                            connection.close();
-                        }
-                    });
-
-                    if (error == null) {
+                callback.apply(connection).whenComplete((result, sendError) -> {
+                    if (sendError == null) {
                         executor.execute(() -> future.complete(result));
                     } else {
-                        executor.execute(() -> future.completeExceptionally(error));
+                        executor.execute(() -> future.completeExceptionally(sendError));
                     }
                 });
             } else {
-                executor.execute(() -> future.completeExceptionally(channelResult.cause()));
+                executor.execute(() -> future.completeExceptionally(channelError));
             }
         });
-        return future;
     }
 
     @Override
@@ -451,6 +491,11 @@
         bootstrap.channel(clientChannelClass);
         bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
         bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
+        if (enableNettyTls) {
+            bootstrap.handler(new SslClientCommunicationChannelInitializer());
+        } else {
+            bootstrap.handler(new BasicChannelInitializer());
+        }
         return bootstrap;
     }
 
@@ -483,18 +528,20 @@
         });
     }
 
-    /**
-     * Channel pool handler.
-     */
-    private class ClientChannelPoolHandler extends AbstractChannelPoolHandler {
-        @Override
-        public void channelCreated(Channel channel) throws Exception {
-            if (enableNettyTls) {
-                new SslClientCommunicationChannelInitializer().initChannel((SocketChannel) channel);
+    private CompletableFuture<Channel> openChannel(Endpoint ep) {
+        Bootstrap bootstrap = bootstrapClient(ep);
+        CompletableFuture<Channel> retFuture = new CompletableFuture<>();
+        ChannelFuture f = bootstrap.connect();
+
+        f.addListener(future -> {
+            if (future.isSuccess()) {
+                retFuture.complete(f.channel());
             } else {
-                new BasicChannelInitializer().initChannel((SocketChannel) channel);
+                retFuture.completeExceptionally(future.cause());
             }
-        }
+        });
+        log.debug("Established a new connection to {}", ep);
+        return retFuture;
     }
 
     /**
@@ -755,6 +802,9 @@
         private final Channel channel;
         private final Map<Long, Callback> futures = Maps.newConcurrentMap();
         private final AtomicBoolean closed = new AtomicBoolean(false);
+        private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
+                .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
+                .build();
 
         RemoteClientConnection(Channel channel) {
             this.channel = channel;
@@ -786,6 +836,11 @@
                     throw new AssertionError();
                 }
             }
+
+            // Iterate through all timeout histories and recompute the timeout.
+            for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
+                timeoutHistory.recomputeTimeoutMillis();
+            }
         }
 
         @Override