blob: 62009473ac42bf04cf3345d47c55032af4da258f [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
JunHuy Lam39eb4292015-06-26 17:24:23 +090018import com.google.common.base.Strings;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070021import com.google.common.collect.Maps;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080022import com.google.common.util.concurrent.MoreExecutors;
Madan Jampania9e70a62016-03-02 16:28:18 -080023
Aaron Kruglikov1b727382016-02-09 16:17:47 -080024import io.netty.bootstrap.Bootstrap;
25import io.netty.bootstrap.ServerBootstrap;
26import io.netty.buffer.PooledByteBufAllocator;
27import io.netty.channel.Channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080028import io.netty.channel.ChannelHandler;
29import io.netty.channel.ChannelHandlerContext;
30import io.netty.channel.ChannelInitializer;
31import io.netty.channel.ChannelOption;
32import io.netty.channel.EventLoopGroup;
33import io.netty.channel.ServerChannel;
34import io.netty.channel.SimpleChannelInboundHandler;
Jon Hall9a44d6a2017-03-02 18:14:37 -080035import io.netty.channel.WriteBufferWaterMark;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080036import io.netty.channel.epoll.EpollEventLoopGroup;
37import io.netty.channel.epoll.EpollServerSocketChannel;
38import io.netty.channel.epoll.EpollSocketChannel;
39import io.netty.channel.nio.NioEventLoopGroup;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070040import io.netty.channel.pool.AbstractChannelPoolHandler;
41import io.netty.channel.pool.AbstractChannelPoolMap;
42import io.netty.channel.pool.ChannelPool;
43import io.netty.channel.pool.ChannelPoolMap;
44import io.netty.channel.pool.SimpleChannelPool;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080045import io.netty.channel.socket.SocketChannel;
46import io.netty.channel.socket.nio.NioServerSocketChannel;
47import io.netty.channel.socket.nio.NioSocketChannel;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070048import io.netty.util.concurrent.FutureListener;
49import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
50import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070051import org.apache.felix.scr.annotations.Activate;
52import org.apache.felix.scr.annotations.Component;
53import org.apache.felix.scr.annotations.Deactivate;
54import org.apache.felix.scr.annotations.Reference;
55import org.apache.felix.scr.annotations.ReferenceCardinality;
56import org.apache.felix.scr.annotations.Service;
Madan Jampaniec1df022015-10-13 21:23:03 -070057import org.onosproject.cluster.ClusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070058import org.onosproject.cluster.ControllerNode;
Madan Jampani05833872016-07-12 23:01:39 -070059import org.onosproject.core.HybridLogicalClockService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070060import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampania9e70a62016-03-02 16:28:18 -080061import org.onosproject.store.cluster.messaging.MessagingException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080062import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampania9e70a62016-03-02 16:28:18 -080063import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070064import org.slf4j.Logger;
65import org.slf4j.LoggerFactory;
66
Aaron Kruglikov1b727382016-02-09 16:17:47 -080067import javax.net.ssl.KeyManagerFactory;
68import javax.net.ssl.SSLContext;
69import javax.net.ssl.SSLEngine;
70import javax.net.ssl.TrustManagerFactory;
Madan Jampania9e70a62016-03-02 16:28:18 -080071
Aaron Kruglikov1b727382016-02-09 16:17:47 -080072import java.io.FileInputStream;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070073import java.net.ConnectException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080074import java.security.KeyStore;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070075import java.time.Duration;
76import java.util.Iterator;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080077import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080078import java.util.Optional;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080079import java.util.concurrent.CompletableFuture;
80import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070081import java.util.concurrent.ExecutionException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080082import java.util.concurrent.Executor;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070083import java.util.concurrent.Executors;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080084import java.util.concurrent.RejectedExecutionException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070085import java.util.concurrent.ScheduledExecutorService;
86import java.util.concurrent.ScheduledFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080087import java.util.concurrent.TimeUnit;
88import java.util.concurrent.TimeoutException;
89import java.util.concurrent.atomic.AtomicBoolean;
90import java.util.concurrent.atomic.AtomicLong;
91import java.util.function.BiConsumer;
92import java.util.function.BiFunction;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070093import java.util.function.Function;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080094
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070095import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090096import static org.onosproject.security.AppGuard.checkPermission;
97import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
98
Madan Jampaniafeebbd2015-05-19 15:26:01 -070099/**
100 * Netty based MessagingService.
101 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -0700102@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700103@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800104public class NettyMessagingManager implements MessagingService {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700105 private static final long DEFAULT_TIMEOUT_MILLIS = 500;
106 private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(10).toMillis();
107 private static final long MIN_TIMEOUT_MILLIS = 100;
108 private static final long MAX_TIMEOUT_MILLIS = 5000;
109 private static final long TIMEOUT_INTERVAL = 50;
110 private static final int WINDOW_SIZE = 100;
111 private static final double TIMEOUT_MULTIPLIER = 2.5;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800112 private static final short MIN_KS_LENGTH = 6;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700113
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700114 private static final byte[] EMPTY_PAYLOAD = new byte[0];
115
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700116 private final Logger log = LoggerFactory.getLogger(getClass());
117
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700118 private final ClientConnection localClientConnection = new LocalClientConnection();
119 private final ServerConnection localServerConnection = new LocalServerConnection(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800120
Madan Jampani05833872016-07-12 23:01:39 -0700121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected HybridLogicalClockService clockService;
123
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700124 private Endpoint localEndpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800125 private int preamble;
126 private final AtomicBoolean started = new AtomicBoolean(false);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700127 private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
128 private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
129 private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800130 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800131
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700132 private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
133 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
134 .build();
135 private ScheduledFuture<?> timeoutFuture;
136
137 private final ChannelPoolMap<Endpoint, SimpleChannelPool> channels =
138 new AbstractChannelPoolMap<Endpoint, SimpleChannelPool>() {
139 @Override
140 protected SimpleChannelPool newPool(Endpoint endpoint) {
141 return new SimpleChannelPool(bootstrapClient(endpoint), new ClientChannelPoolHandler());
142 }
143 };
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800144
145 private EventLoopGroup serverGroup;
146 private EventLoopGroup clientGroup;
147 private Class<? extends ServerChannel> serverChannelClass;
148 private Class<? extends Channel> clientChannelClass;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700149 private ScheduledExecutorService timeoutExecutor;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800150
151 protected static final boolean TLS_DISABLED = false;
152 protected boolean enableNettyTls = TLS_DISABLED;
153
154 protected String ksLocation;
155 protected String tsLocation;
156 protected char[] ksPwd;
157 protected char[] tsPwd;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900158
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700160 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700161
162 @Activate
163 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700164 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800165 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800166
167 if (started.get()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700168 log.warn("Already running at local endpoint: {}", localEndpoint);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800169 return;
170 }
171 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700172 this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800173 initEventLoopGroup();
174 startAcceptingConnections();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700175 timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
176 groupedThreads("NettyMessagingEvt", "timeout", log));
177 timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
178 this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800179 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700180 log.info("Started");
181 }
182
183 @Deactivate
184 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800185 if (started.get()) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800186 serverGroup.shutdownGracefully();
187 clientGroup.shutdownGracefully();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700188 timeoutFuture.cancel(false);
189 timeoutExecutor.shutdown();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800190 started.set(false);
191 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700192 log.info("Stopped");
193 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900194
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800195 private void getTlsParameters() {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900196 String tempString = System.getProperty("enableNettyTLS");
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800197 enableNettyTls = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString);
198 log.info("enableNettyTLS = {}", enableNettyTls);
199 if (enableNettyTls) {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900200 ksLocation = System.getProperty("javax.net.ssl.keyStore");
201 if (Strings.isNullOrEmpty(ksLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800202 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900203 return;
204 }
205 tsLocation = System.getProperty("javax.net.ssl.trustStore");
206 if (Strings.isNullOrEmpty(tsLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800207 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900208 return;
209 }
210 ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
211 if (MIN_KS_LENGTH > ksPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800212 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900213 return;
214 }
215 tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
216 if (MIN_KS_LENGTH > tsPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800217 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900218 return;
219 }
220 }
221 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700222
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800223 private void initEventLoopGroup() {
224 // try Epoll first and if that does work, use nio.
225 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700226 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
227 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800228 serverChannelClass = EpollServerSocketChannel.class;
229 clientChannelClass = EpollSocketChannel.class;
230 return;
231 } catch (Throwable e) {
232 log.debug("Failed to initialize native (epoll) transport. "
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700233 + "Reason: {}. Proceeding with nio.", e.getMessage());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800234 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700235 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
236 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800237 serverChannelClass = NioServerSocketChannel.class;
238 clientChannelClass = NioSocketChannel.class;
239 }
240
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700241 /**
242 * Times out response callbacks.
243 */
244 private void timeoutAllCallbacks() {
245 // Iterate through all connections and time out callbacks.
246 for (RemoteClientConnection connection : clientConnections.values()) {
247 connection.timeoutCallbacks();
248 }
249
250 // Iterate through all timeout histories and recompute the timeout.
251 for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
252 timeoutHistory.recomputeTimeoutMillis();
253 }
254 }
255
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800256 @Override
257 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900258 checkPermission(CLUSTER_WRITE);
Madan Jampanib825aeb2016-04-01 15:18:25 -0700259 InternalMessage message = new InternalMessage(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700260 clockService.timeNow(),
261 messageIdGenerator.incrementAndGet(),
262 localEndpoint,
263 type,
264 payload);
265 return executeOnPooledConnection(ep, c -> c.sendAsync(message), MoreExecutors.directExecutor());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800266 }
267
268 @Override
269 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900270 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800271 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
272 }
273
274 @Override
275 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900276 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800277 Long messageId = messageIdGenerator.incrementAndGet();
Madan Jampani05833872016-07-12 23:01:39 -0700278 InternalMessage message = new InternalMessage(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700279 clockService.timeNow(),
280 messageId,
281 localEndpoint,
282 type,
283 payload);
284 return executeOnPooledConnection(ep, c -> c.sendAndReceive(message), executor);
285 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700286
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700287 /**
288 * Executes the given callback on a pooled connection.
289 *
290 * @param endpoint the endpoint to which to send a message
291 * @param callback the callback to execute to send the message
292 * @param <T> the send result type
293 * @return a completable future to be completed with the result of the supplied function
294 */
295 private <T> CompletableFuture<T> executeOnPooledConnection(
296 Endpoint endpoint,
297 Function<ClientConnection, CompletableFuture<T>> callback,
298 Executor executor) {
299 if (endpoint.equals(localEndpoint)) {
300 CompletableFuture<T> future = new CompletableFuture<>();
301 callback.apply(localClientConnection).whenComplete((result, error) -> {
302 if (error == null) {
303 executor.execute(() -> future.complete(result));
304 } else {
305 executor.execute(() -> future.completeExceptionally(error));
306 }
307 });
308 return future;
309 }
310
311 CompletableFuture<T> future = new CompletableFuture<>();
312 ChannelPool pool = channels.get(endpoint);
313 pool.acquire().addListener((FutureListener<Channel>) channelResult -> {
314 if (channelResult.isSuccess()) {
315 Channel channel = channelResult.getNow();
316 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
317 callback.apply(connection).whenComplete((result, error) -> {
318 pool.release(channel).addListener(releaseResult -> {
319 if (!releaseResult.isSuccess()) {
320 clientConnections.remove(channel);
321 connection.close();
322 }
323 });
324
325 if (error == null) {
326 executor.execute(() -> future.complete(result));
327 } else {
328 executor.execute(() -> future.completeExceptionally(error));
329 }
330 });
331 } else {
332 executor.execute(() -> future.completeExceptionally(channelResult.cause()));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800333 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700334 });
335 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800336 }
337
338 @Override
339 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900340 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700341 handlers.put(type, (message, connection) -> executor.execute(() ->
342 handler.accept(message.sender(), message.payload())));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800343 }
344
345 @Override
346 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900347 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700348 handlers.put(type, (message, connection) -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800349 byte[] responsePayload = null;
350 Status status = Status.OK;
351 try {
352 responsePayload = handler.apply(message.sender(), message.payload());
353 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700354 log.debug("An error occurred in a message handler: {}", e);
Madan Jampania9e70a62016-03-02 16:28:18 -0800355 status = Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800356 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700357 connection.reply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800358 }));
359 }
360
361 @Override
362 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900363 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700364 handlers.put(type, (message, connection) -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800365 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700366 Status status;
367 if (error == null) {
368 status = Status.OK;
369 } else {
370 log.debug("An error occurred in a message handler: {}", error);
371 status = Status.ERROR_HANDLER_EXCEPTION;
372 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700373 connection.reply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800374 });
375 });
376 }
377
378 @Override
379 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900380 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800381 handlers.remove(type);
382 }
383
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700384 private Bootstrap bootstrapClient(Endpoint endpoint) {
385 Bootstrap bootstrap = new Bootstrap();
386 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
387 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
388 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
389 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
390 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
391 bootstrap.group(clientGroup);
392 // TODO: Make this faster:
393 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
394 bootstrap.channel(clientChannelClass);
395 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
396 bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
397 return bootstrap;
398 }
399
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800400 private void startAcceptingConnections() throws InterruptedException {
401 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800402 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700403 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800404 b.option(ChannelOption.SO_RCVBUF, 1048576);
405 b.option(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700406 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800407 b.group(serverGroup, clientGroup);
408 b.channel(serverChannelClass);
409 if (enableNettyTls) {
410 b.childHandler(new SslServerCommunicationChannelInitializer());
411 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700412 b.childHandler(new BasicChannelInitializer());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800413 }
414 b.option(ChannelOption.SO_BACKLOG, 128);
415 b.childOption(ChannelOption.SO_KEEPALIVE, true);
416
417 // Bind and start to accept incoming connections.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700418 b.bind(localEndpoint.port()).sync().addListener(future -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800419 if (future.isSuccess()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700420 log.info("{} accepting incoming connections on port {}",
421 localEndpoint.host(), localEndpoint.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800422 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700423 log.warn("{} failed to bind to port {} due to {}",
424 localEndpoint.host(), localEndpoint.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800425 }
426 });
427 }
428
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700429 /**
430 * Channel pool handler.
431 */
432 private class ClientChannelPoolHandler extends AbstractChannelPoolHandler {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800433 @Override
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700434 public void channelCreated(Channel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800435 if (enableNettyTls) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700436 new SslClientCommunicationChannelInitializer().initChannel((SocketChannel) channel);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800437 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700438 new BasicChannelInitializer().initChannel((SocketChannel) channel);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800439 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800440 }
441 }
442
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700443 /**
444 * Channel initializer for TLS servers.
445 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800446 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800447 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
448 private final ChannelHandler encoder = new MessageEncoder(preamble);
449
450 @Override
451 protected void initChannel(SocketChannel channel) throws Exception {
452 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
453 KeyStore ts = KeyStore.getInstance("JKS");
454 ts.load(new FileInputStream(tsLocation), tsPwd);
455 tmFactory.init(ts);
456
457 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
458 KeyStore ks = KeyStore.getInstance("JKS");
459 ks.load(new FileInputStream(ksLocation), ksPwd);
460 kmf.init(ks, ksPwd);
461
462 SSLContext serverContext = SSLContext.getInstance("TLS");
463 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
464
465 SSLEngine serverSslEngine = serverContext.createSSLEngine();
466
467 serverSslEngine.setNeedClientAuth(true);
468 serverSslEngine.setUseClientMode(false);
469 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
470 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
471 serverSslEngine.setEnableSessionCreation(true);
472
473 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
474 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700475 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800476 .addLast("handler", dispatcher);
477 }
478 }
479
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700480 /**
481 * Channel initializer for TLS clients.
482 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800483 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800484 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
485 private final ChannelHandler encoder = new MessageEncoder(preamble);
486
487 @Override
488 protected void initChannel(SocketChannel channel) throws Exception {
489 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
490 KeyStore ts = KeyStore.getInstance("JKS");
491 ts.load(new FileInputStream(tsLocation), tsPwd);
492 tmFactory.init(ts);
493
494 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
495 KeyStore ks = KeyStore.getInstance("JKS");
496 ks.load(new FileInputStream(ksLocation), ksPwd);
497 kmf.init(ks, ksPwd);
498
499 SSLContext clientContext = SSLContext.getInstance("TLS");
500 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
501
502 SSLEngine clientSslEngine = clientContext.createSSLEngine();
503
504 clientSslEngine.setUseClientMode(true);
505 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
506 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
507 clientSslEngine.setEnableSessionCreation(true);
508
509 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
510 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700511 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800512 .addLast("handler", dispatcher);
513 }
514 }
515
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700516 /**
517 * Channel initializer for basic connections.
518 */
519 private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800520 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
521 private final ChannelHandler encoder = new MessageEncoder(preamble);
522
523 @Override
524 protected void initChannel(SocketChannel channel) throws Exception {
525 channel.pipeline()
526 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700527 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800528 .addLast("handler", dispatcher);
529 }
530 }
531
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700532 /**
533 * Channel inbound handler that dispatches messages to the appropriate handler.
534 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800535 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700536 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700537 // Effectively SimpleChannelInboundHandler<InternalMessage>,
538 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800539
540 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700541 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
542 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800543 try {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700544 if (message.isRequest()) {
545 RemoteServerConnection connection =
546 serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
547 connection.dispatch(message);
548 } else {
549 RemoteClientConnection connection =
550 clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
551 connection.dispatch(message);
552 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800553 } catch (RejectedExecutionException e) {
554 log.warn("Unable to dispatch message due to {}", e.getMessage());
555 }
556 }
557
558 @Override
559 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
560 log.error("Exception inside channel handling pipeline.", cause);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700561
562 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
563 if (clientConnection != null) {
564 clientConnection.close();
565 }
566
567 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
568 if (serverConnection != null) {
569 serverConnection.close();
570 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800571 context.close();
572 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700573
574 /**
575 * Returns true if the given message should be handled.
576 *
577 * @param msg inbound message
578 * @return true if {@code msg} is {@link InternalMessage} instance.
579 *
580 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
581 */
582 @Override
583 public final boolean acceptInboundMessage(Object msg) {
584 return msg instanceof InternalMessage;
585 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800586 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700587
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700588 /**
589 * Wraps a {@link CompletableFuture} and tracks its type and creation time.
590 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800591 private final class Callback {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700592 private final String type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800593 private final CompletableFuture<byte[]> future;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700594 private final long time = System.currentTimeMillis();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800595
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700596 Callback(String type, CompletableFuture<byte[]> future) {
597 this.type = type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800598 this.future = future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800599 }
600
601 public void complete(byte[] value) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700602 future.complete(value);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800603 }
604
605 public void completeExceptionally(Throwable error) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700606 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800607 }
608 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800609
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700610 /**
611 * Represents the client side of a connection to a local or remote server.
612 */
613 private interface ClientConnection {
614
615 /**
616 * Sends a message to the other side of the connection.
617 *
618 * @param message the message to send
619 * @return a completable future to be completed once the message has been sent
620 */
621 CompletableFuture<Void> sendAsync(InternalMessage message);
622
623 /**
624 * Sends a message to the other side of the connection, awaiting a reply.
625 *
626 * @param message the message to send
627 * @return a completable future to be completed once a reply is received or the request times out
628 */
629 CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
630
631 /**
632 * Closes the connection.
633 */
634 default void close() {
635 }
636 }
637
638 /**
639 * Represents the server side of a connection.
640 */
641 private interface ServerConnection {
642
643 /**
644 * Sends a reply to the other side of the connection.
645 *
646 * @param message the message to which to reply
647 * @param status the reply status
648 * @param payload the response payload
649 */
650 void reply(InternalMessage message, Status status, Optional<byte[]> payload);
651
652 /**
653 * Closes the connection.
654 */
655 default void close() {
656 }
657 }
658
659 /**
660 * Local connection implementation.
661 */
662 private final class LocalClientConnection implements ClientConnection {
663 @Override
664 public CompletableFuture<Void> sendAsync(InternalMessage message) {
665 BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
666 if (handler != null) {
667 handler.accept(message, localServerConnection);
668 } else {
669 log.debug("No handler for message type {} from {}", message.type(), message.sender());
670 }
671 return CompletableFuture.completedFuture(null);
672 }
673
674 @Override
675 public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
676 CompletableFuture<byte[]> future = new CompletableFuture<>();
677 BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
678 if (handler != null) {
679 handler.accept(message, new LocalServerConnection(future));
680 } else {
681 log.debug("No handler for message type {} from {}", message.type(), message.sender());
682 new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
683 }
684 return future;
685 }
686 }
687
688 /**
689 * Local server connection.
690 */
691 private final class LocalServerConnection implements ServerConnection {
692 private final CompletableFuture<byte[]> future;
693
694 LocalServerConnection(CompletableFuture<byte[]> future) {
695 this.future = future;
696 }
697
698 @Override
699 public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
700 if (future != null) {
701 if (status == Status.OK) {
702 future.complete(payload.orElse(EMPTY_PAYLOAD));
703 } else if (status == Status.ERROR_NO_HANDLER) {
704 future.completeExceptionally(new MessagingException.NoRemoteHandler());
705 } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
706 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
707 } else if (status == Status.PROTOCOL_EXCEPTION) {
708 future.completeExceptionally(new MessagingException.ProtocolException());
709 }
710 }
711 }
712 }
713
714 /**
715 * Remote connection implementation.
716 */
717 private final class RemoteClientConnection implements ClientConnection {
718 private final Channel channel;
719 private final Map<Long, Callback> futures = Maps.newConcurrentMap();
720 private final AtomicBoolean closed = new AtomicBoolean(false);
721
722 RemoteClientConnection(Channel channel) {
723 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800724 }
725
726 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700727 * Times out callbacks for this connection.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800728 */
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700729 private void timeoutCallbacks() {
730 // Store the current time.
731 long currentTime = System.currentTimeMillis();
732
733 // Iterate through future callbacks and time out callbacks that have been alive
734 // longer than the current timeout according to the message type.
735 Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
736 while (iterator.hasNext()) {
737 Callback callback = iterator.next().getValue();
738 try {
739 TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
740 long currentTimeout = timeoutHistory.currentTimeout;
741 if (currentTime - callback.time > currentTimeout) {
742 iterator.remove();
743 long elapsedTime = currentTime - callback.time;
744 timeoutHistory.addReplyTime(elapsedTime);
745 callback.completeExceptionally(
746 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
747 }
748 } catch (ExecutionException e) {
749 throw new AssertionError();
750 }
751 }
752 }
753
754 @Override
755 public CompletableFuture<Void> sendAsync(InternalMessage message) {
756 CompletableFuture<Void> future = new CompletableFuture<>();
757 channel.writeAndFlush(message).addListener(channelFuture -> {
758 if (!channelFuture.isSuccess()) {
759 future.completeExceptionally(channelFuture.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800760 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700761 future.complete(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800762 }
763 });
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700764 return future;
765 }
766
767 @Override
768 public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
769 CompletableFuture<byte[]> future = new CompletableFuture<>();
770 Callback callback = new Callback(message.type(), future);
771 futures.put(message.id(), callback);
772 channel.writeAndFlush(message).addListener(channelFuture -> {
773 if (!channelFuture.isSuccess()) {
774 futures.remove(message.id());
775 callback.completeExceptionally(channelFuture.cause());
776 }
777 });
778 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800779 }
780
781 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700782 * Dispatches a message to a local handler.
783 *
784 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800785 */
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700786 private void dispatch(InternalMessage message) {
787 if (message.preamble() != preamble) {
788 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
789 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800790 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700791
792 clockService.recordEventTime(message.time());
793
794 Callback callback = futures.remove(message.id());
795 if (callback != null) {
796 if (message.status() == Status.OK) {
797 callback.complete(message.payload());
798 } else if (message.status() == Status.ERROR_NO_HANDLER) {
799 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
800 } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
801 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
802 } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
803 callback.completeExceptionally(new MessagingException.ProtocolException());
804 }
805
806 try {
807 TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
808 timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
809 } catch (ExecutionException e) {
810 throw new AssertionError();
811 }
812 } else {
813 log.debug("Received a reply for message id:[{}]. "
814 + " from {}. But was unable to locate the"
815 + " request handle", message.id(), message.sender());
816 }
817 }
818
819 @Override
820 public void close() {
821 if (closed.compareAndSet(false, true)) {
822 timeoutFuture.cancel(false);
823 for (Callback callback : futures.values()) {
824 callback.completeExceptionally(new ConnectException());
825 }
826 }
827 }
828 }
829
830 /**
831 * Remote server connection.
832 */
833 private final class RemoteServerConnection implements ServerConnection {
834 private final Channel channel;
835
836 RemoteServerConnection(Channel channel) {
837 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800838 }
839
840 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700841 * Dispatches a message to a local handler.
842 *
843 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800844 */
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700845 private void dispatch(InternalMessage message) {
846 if (message.preamble() != preamble) {
847 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
848 reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
849 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800850 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700851
852 clockService.recordEventTime(message.time());
853
854 BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
855 if (handler != null) {
856 handler.accept(message, this);
857 } else {
858 log.debug("No handler for message type {} from {}", message.type(), message.sender());
859 reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
860 }
861 }
862
863 @Override
864 public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
865 InternalMessage response = new InternalMessage(preamble,
866 clockService.timeNow(),
867 message.id(),
868 localEndpoint,
869 payload.orElse(EMPTY_PAYLOAD),
870 status);
871 channel.writeAndFlush(response);
872 }
873 }
874
875 /**
876 * Request-reply timeout history tracker.
877 */
878 private static final class TimeoutHistory {
879 private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
880 private final AtomicLong maxReplyTime = new AtomicLong();
881 private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
882
883 /**
884 * Adds a reply time to the history.
885 *
886 * @param replyTime the reply time to add to the history
887 */
888 void addReplyTime(long replyTime) {
889 maxReplyTime.getAndAccumulate(replyTime, Math::max);
890 }
891
892 /**
893 * Computes the current timeout.
894 */
895 private void recomputeTimeoutMillis() {
896 double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
897 timeoutHistory.addValue(
898 Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
899 if (timeoutHistory.getN() == WINDOW_SIZE) {
900 this.currentTimeout = (long) timeoutHistory.getMax();
901 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800902 }
903 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900904}