Preserve ordering of messages in NettyMessagingManager to simplify distributed systems protocols and improve performance.
Change-Id: I8797f57c7f136dccb14c772faec515274462de51
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 193e81a..d9ed9c3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -15,58 +15,10 @@
*/
package org.onosproject.store.cluster.messaging.impl;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.pool.AbstractChannelPoolHandler;
-import io.netty.channel.pool.AbstractChannelPoolMap;
-import io.netty.channel.pool.ChannelPool;
-import io.netty.channel.pool.ChannelPoolMap;
-import io.netty.channel.pool.SimpleChannelPool;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.FutureListener;
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
-import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.core.HybridLogicalClockService;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -76,8 +28,10 @@
import java.security.MessageDigest;
import java.security.cert.Certificate;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
@@ -97,6 +51,49 @@
import java.util.function.BiFunction;
import java.util.function.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterMetadataService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.core.HybridLogicalClockService;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingException;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@@ -108,13 +105,13 @@
@Service
public class NettyMessagingManager implements MessagingService {
private static final long DEFAULT_TIMEOUT_MILLIS = 500;
- private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(10).toMillis();
+ private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
private static final long MIN_TIMEOUT_MILLIS = 100;
private static final long MAX_TIMEOUT_MILLIS = 5000;
private static final long TIMEOUT_INTERVAL = 50;
private static final int WINDOW_SIZE = 100;
private static final double TIMEOUT_MULTIPLIER = 2.5;
- private static final short MIN_KS_LENGTH = 6;
+ private static final int CHANNEL_POOL_SIZE = 8;
private static final byte[] EMPTY_PAYLOAD = new byte[0];
@@ -140,18 +137,9 @@
private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
- .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
- .build();
private ScheduledFuture<?> timeoutFuture;
- private final ChannelPoolMap<Endpoint, SimpleChannelPool> channels =
- new AbstractChannelPoolMap<Endpoint, SimpleChannelPool>() {
- @Override
- protected SimpleChannelPool newPool(Endpoint endpoint) {
- return new SimpleChannelPool(bootstrapClient(endpoint), new ClientChannelPoolHandler());
- }
- };
+ private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
private EventLoopGroup serverGroup;
private EventLoopGroup clientGroup;
@@ -303,11 +291,6 @@
for (RemoteClientConnection connection : clientConnections.values()) {
connection.timeoutCallbacks();
}
-
- // Iterate through all timeout histories and recompute the timeout.
- for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
- timeoutHistory.recomputeTimeoutMillis();
- }
}
@Override
@@ -319,7 +302,7 @@
localEndpoint,
type,
payload);
- return executeOnPooledConnection(ep, c -> c.sendAsync(message), MoreExecutors.directExecutor());
+ return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
}
@Override
@@ -338,23 +321,91 @@
localEndpoint,
type,
payload);
- return executeOnPooledConnection(ep, c -> c.sendAndReceive(message), executor);
+ return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
}
- /**
- * Executes the given callback on a pooled connection.
- *
- * @param endpoint the endpoint to which to send a message
- * @param callback the callback to execute to send the message
- * @param <T> the send result type
- * @return a completable future to be completed with the result of the supplied function
- */
+ private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
+ return channels.computeIfAbsent(endpoint, e -> {
+ List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
+ for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
+ defaultList.add(null);
+ }
+ return Lists.newCopyOnWriteArrayList(defaultList);
+ });
+ }
+
+ private int getChannelOffset(String messageType) {
+ return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
+ }
+
+ private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
+ List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
+ int offset = getChannelOffset(messageType);
+
+ CompletableFuture<Channel> channelFuture = channelPool.get(offset);
+ if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
+ synchronized (channelPool) {
+ channelFuture = channelPool.get(offset);
+ if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
+ channelFuture = openChannel(endpoint);
+ channelPool.set(offset, channelFuture);
+ }
+ }
+ }
+
+ CompletableFuture<Channel> future = new CompletableFuture<>();
+ final CompletableFuture<Channel> finalFuture = channelFuture;
+ finalFuture.whenComplete((channel, error) -> {
+ if (error == null) {
+ if (!channel.isActive()) {
+ synchronized (channelPool) {
+ CompletableFuture<Channel> currentFuture = channelPool.get(offset);
+ if (currentFuture == finalFuture) {
+ channelPool.set(offset, null);
+ getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
+ if (recursiveError == null) {
+ future.complete(recursiveResult);
+ } else {
+ future.completeExceptionally(recursiveError);
+ }
+ });
+ } else {
+ currentFuture.whenComplete((recursiveResult, recursiveError) -> {
+ if (recursiveError == null) {
+ future.complete(recursiveResult);
+ } else {
+ future.completeExceptionally(recursiveError);
+ }
+ });
+ }
+ }
+ } else {
+ future.complete(channel);
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
+ });
+ return future;
+ }
+
private <T> CompletableFuture<T> executeOnPooledConnection(
Endpoint endpoint,
+ String type,
Function<ClientConnection, CompletableFuture<T>> callback,
Executor executor) {
+ CompletableFuture<T> future = new CompletableFuture<T>();
+ executeOnPooledConnection(endpoint, type, callback, executor, future);
+ return future;
+ }
+
+ private <T> void executeOnPooledConnection(
+ Endpoint endpoint,
+ String type,
+ Function<ClientConnection, CompletableFuture<T>> callback,
+ Executor executor,
+ CompletableFuture<T> future) {
if (endpoint.equals(localEndpoint)) {
- CompletableFuture<T> future = new CompletableFuture<>();
callback.apply(localClientConnection).whenComplete((result, error) -> {
if (error == null) {
executor.execute(() -> future.complete(result));
@@ -362,34 +413,23 @@
executor.execute(() -> future.completeExceptionally(error));
}
});
- return future;
+ return;
}
- CompletableFuture<T> future = new CompletableFuture<>();
- ChannelPool pool = channels.get(endpoint);
- pool.acquire().addListener((FutureListener<Channel>) channelResult -> {
- if (channelResult.isSuccess()) {
- Channel channel = channelResult.getNow();
+ getChannel(endpoint, type).whenComplete((channel, channelError) -> {
+ if (channelError == null) {
ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
- callback.apply(connection).whenComplete((result, error) -> {
- pool.release(channel).addListener(releaseResult -> {
- if (!releaseResult.isSuccess()) {
- clientConnections.remove(channel);
- connection.close();
- }
- });
-
- if (error == null) {
+ callback.apply(connection).whenComplete((result, sendError) -> {
+ if (sendError == null) {
executor.execute(() -> future.complete(result));
} else {
- executor.execute(() -> future.completeExceptionally(error));
+ executor.execute(() -> future.completeExceptionally(sendError));
}
});
} else {
- executor.execute(() -> future.completeExceptionally(channelResult.cause()));
+ executor.execute(() -> future.completeExceptionally(channelError));
}
});
- return future;
}
@Override
@@ -451,6 +491,11 @@
bootstrap.channel(clientChannelClass);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
+ if (enableNettyTls) {
+ bootstrap.handler(new SslClientCommunicationChannelInitializer());
+ } else {
+ bootstrap.handler(new BasicChannelInitializer());
+ }
return bootstrap;
}
@@ -483,18 +528,20 @@
});
}
- /**
- * Channel pool handler.
- */
- private class ClientChannelPoolHandler extends AbstractChannelPoolHandler {
- @Override
- public void channelCreated(Channel channel) throws Exception {
- if (enableNettyTls) {
- new SslClientCommunicationChannelInitializer().initChannel((SocketChannel) channel);
+ private CompletableFuture<Channel> openChannel(Endpoint ep) {
+ Bootstrap bootstrap = bootstrapClient(ep);
+ CompletableFuture<Channel> retFuture = new CompletableFuture<>();
+ ChannelFuture f = bootstrap.connect();
+
+ f.addListener(future -> {
+ if (future.isSuccess()) {
+ retFuture.complete(f.channel());
} else {
- new BasicChannelInitializer().initChannel((SocketChannel) channel);
+ retFuture.completeExceptionally(future.cause());
}
- }
+ });
+ log.debug("Established a new connection to {}", ep);
+ return retFuture;
}
/**
@@ -755,6 +802,9 @@
private final Channel channel;
private final Map<Long, Callback> futures = Maps.newConcurrentMap();
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
+ .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
+ .build();
RemoteClientConnection(Channel channel) {
this.channel = channel;
@@ -786,6 +836,11 @@
throw new AssertionError();
}
}
+
+ // Iterate through all timeout histories and recompute the timeout.
+ for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
+ timeoutHistory.recomputeTimeoutMillis();
+ }
}
@Override