blob: 193e81aa8b3928a537678e0d6b39f2c879c98c5f [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070020import com.google.common.collect.Maps;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080021import com.google.common.util.concurrent.MoreExecutors;
22import io.netty.bootstrap.Bootstrap;
23import io.netty.bootstrap.ServerBootstrap;
24import io.netty.buffer.PooledByteBufAllocator;
25import io.netty.channel.Channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080026import io.netty.channel.ChannelHandler;
27import io.netty.channel.ChannelHandlerContext;
28import io.netty.channel.ChannelInitializer;
29import io.netty.channel.ChannelOption;
30import io.netty.channel.EventLoopGroup;
31import io.netty.channel.ServerChannel;
32import io.netty.channel.SimpleChannelInboundHandler;
Jon Hall9a44d6a2017-03-02 18:14:37 -080033import io.netty.channel.WriteBufferWaterMark;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080034import io.netty.channel.epoll.EpollEventLoopGroup;
35import io.netty.channel.epoll.EpollServerSocketChannel;
36import io.netty.channel.epoll.EpollSocketChannel;
37import io.netty.channel.nio.NioEventLoopGroup;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070038import io.netty.channel.pool.AbstractChannelPoolHandler;
39import io.netty.channel.pool.AbstractChannelPoolMap;
40import io.netty.channel.pool.ChannelPool;
41import io.netty.channel.pool.ChannelPoolMap;
42import io.netty.channel.pool.SimpleChannelPool;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080043import io.netty.channel.socket.SocketChannel;
44import io.netty.channel.socket.nio.NioServerSocketChannel;
45import io.netty.channel.socket.nio.NioSocketChannel;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070046import io.netty.util.concurrent.FutureListener;
47import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
48import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070049import org.apache.felix.scr.annotations.Activate;
50import org.apache.felix.scr.annotations.Component;
51import org.apache.felix.scr.annotations.Deactivate;
52import org.apache.felix.scr.annotations.Reference;
53import org.apache.felix.scr.annotations.ReferenceCardinality;
54import org.apache.felix.scr.annotations.Service;
Madan Jampaniec1df022015-10-13 21:23:03 -070055import org.onosproject.cluster.ClusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070056import org.onosproject.cluster.ControllerNode;
Madan Jampani05833872016-07-12 23:01:39 -070057import org.onosproject.core.HybridLogicalClockService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070058import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampania9e70a62016-03-02 16:28:18 -080059import org.onosproject.store.cluster.messaging.MessagingException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080060import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampania9e70a62016-03-02 16:28:18 -080061import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070062import org.slf4j.Logger;
63import org.slf4j.LoggerFactory;
64
Aaron Kruglikov1b727382016-02-09 16:17:47 -080065import javax.net.ssl.KeyManagerFactory;
66import javax.net.ssl.SSLContext;
67import javax.net.ssl.SSLEngine;
68import javax.net.ssl.TrustManagerFactory;
Madan Jampania9e70a62016-03-02 16:28:18 -080069
Brian O'Connor740e98c2017-06-29 17:07:17 -070070import java.io.File;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080071import java.io.FileInputStream;
Brian O'Connor740e98c2017-06-29 17:07:17 -070072import java.io.FileNotFoundException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070073import java.net.ConnectException;
Brian O'Connor740e98c2017-06-29 17:07:17 -070074import java.security.Key;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080075import java.security.KeyStore;
Brian O'Connor740e98c2017-06-29 17:07:17 -070076import java.security.MessageDigest;
77import java.security.cert.Certificate;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070078import java.time.Duration;
Brian O'Connor740e98c2017-06-29 17:07:17 -070079import java.util.Enumeration;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070080import java.util.Iterator;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080081import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080082import java.util.Optional;
Brian O'Connor740e98c2017-06-29 17:07:17 -070083import java.util.StringJoiner;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080084import java.util.concurrent.CompletableFuture;
85import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070086import java.util.concurrent.ExecutionException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080087import java.util.concurrent.Executor;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070088import java.util.concurrent.Executors;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080089import java.util.concurrent.RejectedExecutionException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070090import java.util.concurrent.ScheduledExecutorService;
91import java.util.concurrent.ScheduledFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080092import java.util.concurrent.TimeUnit;
93import java.util.concurrent.TimeoutException;
94import java.util.concurrent.atomic.AtomicBoolean;
95import java.util.concurrent.atomic.AtomicLong;
96import java.util.function.BiConsumer;
97import java.util.function.BiFunction;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070098import java.util.function.Function;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080099
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700100import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +0900101import static org.onosproject.security.AppGuard.checkPermission;
102import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
103
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700104/**
105 * Netty based MessagingService.
106 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -0700107@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700108@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800109public class NettyMessagingManager implements MessagingService {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700110 private static final long DEFAULT_TIMEOUT_MILLIS = 500;
111 private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(10).toMillis();
112 private static final long MIN_TIMEOUT_MILLIS = 100;
113 private static final long MAX_TIMEOUT_MILLIS = 5000;
114 private static final long TIMEOUT_INTERVAL = 50;
115 private static final int WINDOW_SIZE = 100;
116 private static final double TIMEOUT_MULTIPLIER = 2.5;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800117 private static final short MIN_KS_LENGTH = 6;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700118
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700119 private static final byte[] EMPTY_PAYLOAD = new byte[0];
120
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700121 private final Logger log = LoggerFactory.getLogger(getClass());
122
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700123 private final ClientConnection localClientConnection = new LocalClientConnection();
124 private final ServerConnection localServerConnection = new LocalServerConnection(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800125
Brian O'Connor740e98c2017-06-29 17:07:17 -0700126 //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
127 private static final String CONFIG_DIR = "../config";
128 private static final String KS_FILE_NAME = "onos.jks";
129 private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
130 private static final String DEFAULT_KS_PASSWORD = "changeit";
131
Madan Jampani05833872016-07-12 23:01:39 -0700132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected HybridLogicalClockService clockService;
134
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700135 private Endpoint localEndpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800136 private int preamble;
137 private final AtomicBoolean started = new AtomicBoolean(false);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700138 private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
139 private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
140 private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800141 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800142
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700143 private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
144 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
145 .build();
146 private ScheduledFuture<?> timeoutFuture;
147
148 private final ChannelPoolMap<Endpoint, SimpleChannelPool> channels =
149 new AbstractChannelPoolMap<Endpoint, SimpleChannelPool>() {
150 @Override
151 protected SimpleChannelPool newPool(Endpoint endpoint) {
152 return new SimpleChannelPool(bootstrapClient(endpoint), new ClientChannelPoolHandler());
153 }
154 };
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800155
156 private EventLoopGroup serverGroup;
157 private EventLoopGroup clientGroup;
158 private Class<? extends ServerChannel> serverChannelClass;
159 private Class<? extends Channel> clientChannelClass;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700160 private ScheduledExecutorService timeoutExecutor;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800161
Brian O'Connor740e98c2017-06-29 17:07:17 -0700162 protected static final boolean TLS_ENABLED = true;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800163 protected static final boolean TLS_DISABLED = false;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700164 protected boolean enableNettyTls = TLS_ENABLED;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800165
Brian O'Connor740e98c2017-06-29 17:07:17 -0700166 protected TrustManagerFactory trustManager;
167 protected KeyManagerFactory keyManager;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900168
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700169 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700170 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700171
172 @Activate
173 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700174 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800175 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800176
177 if (started.get()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700178 log.warn("Already running at local endpoint: {}", localEndpoint);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800179 return;
180 }
181 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700182 this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800183 initEventLoopGroup();
184 startAcceptingConnections();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700185 timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
186 groupedThreads("NettyMessagingEvt", "timeout", log));
187 timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
188 this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800189 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700190 log.info("Started");
191 }
192
193 @Deactivate
194 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800195 if (started.get()) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800196 serverGroup.shutdownGracefully();
197 clientGroup.shutdownGracefully();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700198 timeoutFuture.cancel(false);
199 timeoutExecutor.shutdown();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800200 started.set(false);
201 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700202 log.info("Stopped");
203 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900204
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800205 private void getTlsParameters() {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700206 // default is TLS enabled unless key stores cannot be loaded
207 enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
208
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800209 if (enableNettyTls) {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700210 enableNettyTls = loadKeyStores();
211 }
212 }
213
214 private boolean loadKeyStores() {
215 // Maintain a local copy of the trust and key managers in case anything goes wrong
216 TrustManagerFactory tmf;
217 KeyManagerFactory kmf;
218 try {
219 String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
220 String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
221 char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
222 char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
223
224 tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
225 KeyStore ts = KeyStore.getInstance("JKS");
226 ts.load(new FileInputStream(tsLocation), tsPwd);
227 tmf.init(ts);
228
229 kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
230 KeyStore ks = KeyStore.getInstance("JKS");
231 ks.load(new FileInputStream(ksLocation), ksPwd);
232 kmf.init(ks, ksPwd);
233 if (log.isInfoEnabled()) {
234 logKeyStore(ks, ksLocation, ksPwd);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900235 }
Brian O'Connor740e98c2017-06-29 17:07:17 -0700236 } catch (FileNotFoundException e) {
237 log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
238 return TLS_DISABLED;
239 } catch (Exception e) {
240 //TODO we might want to catch exceptions more specifically
241 log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
242 return TLS_DISABLED;
243 }
244 this.trustManager = tmf;
245 this.keyManager = kmf;
246 return TLS_ENABLED;
247 }
248
249 private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
250 if (log.isInfoEnabled()) {
251 log.info("Loaded cluster key store from: {}", ksLocation);
252 try {
253 for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
254 String alias = e.nextElement();
255 Key key = ks.getKey(alias, ksPwd);
256 Certificate[] certs = ks.getCertificateChain(alias);
257 log.debug("{} -> {}", alias, certs);
258 final byte[] encodedKey;
259 if (certs != null && certs.length > 0) {
260 encodedKey = certs[0].getEncoded();
261 } else {
262 log.info("Could not find cert chain for {}, using fingerprint of key instead...", alias);
263 encodedKey = key.getEncoded();
264 }
265 // Compute the certificate's fingerprint (use the key if certificate cannot be found)
266 MessageDigest digest = MessageDigest.getInstance("SHA1");
267 digest.update(encodedKey);
268 StringJoiner fingerprint = new StringJoiner(":");
269 for (byte b : digest.digest()) {
270 fingerprint.add(String.format("%02X", b));
271 }
272 log.info("{} -> {}", alias, fingerprint);
273 }
274 } catch (Exception e) {
275 log.warn("Unable to print contents of key store: {}", ksLocation, e);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900276 }
277 }
278 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700279
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800280 private void initEventLoopGroup() {
281 // try Epoll first and if that does work, use nio.
282 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700283 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
284 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800285 serverChannelClass = EpollServerSocketChannel.class;
286 clientChannelClass = EpollSocketChannel.class;
287 return;
288 } catch (Throwable e) {
289 log.debug("Failed to initialize native (epoll) transport. "
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700290 + "Reason: {}. Proceeding with nio.", e.getMessage());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800291 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700292 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
293 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800294 serverChannelClass = NioServerSocketChannel.class;
295 clientChannelClass = NioSocketChannel.class;
296 }
297
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700298 /**
299 * Times out response callbacks.
300 */
301 private void timeoutAllCallbacks() {
302 // Iterate through all connections and time out callbacks.
303 for (RemoteClientConnection connection : clientConnections.values()) {
304 connection.timeoutCallbacks();
305 }
306
307 // Iterate through all timeout histories and recompute the timeout.
308 for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
309 timeoutHistory.recomputeTimeoutMillis();
310 }
311 }
312
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800313 @Override
314 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900315 checkPermission(CLUSTER_WRITE);
Madan Jampanib825aeb2016-04-01 15:18:25 -0700316 InternalMessage message = new InternalMessage(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700317 clockService.timeNow(),
318 messageIdGenerator.incrementAndGet(),
319 localEndpoint,
320 type,
321 payload);
322 return executeOnPooledConnection(ep, c -> c.sendAsync(message), MoreExecutors.directExecutor());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800323 }
324
325 @Override
326 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900327 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800328 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
329 }
330
331 @Override
332 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900333 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800334 Long messageId = messageIdGenerator.incrementAndGet();
Madan Jampani05833872016-07-12 23:01:39 -0700335 InternalMessage message = new InternalMessage(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700336 clockService.timeNow(),
337 messageId,
338 localEndpoint,
339 type,
340 payload);
341 return executeOnPooledConnection(ep, c -> c.sendAndReceive(message), executor);
342 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700343
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700344 /**
345 * Executes the given callback on a pooled connection.
346 *
347 * @param endpoint the endpoint to which to send a message
348 * @param callback the callback to execute to send the message
349 * @param <T> the send result type
350 * @return a completable future to be completed with the result of the supplied function
351 */
352 private <T> CompletableFuture<T> executeOnPooledConnection(
353 Endpoint endpoint,
354 Function<ClientConnection, CompletableFuture<T>> callback,
355 Executor executor) {
356 if (endpoint.equals(localEndpoint)) {
357 CompletableFuture<T> future = new CompletableFuture<>();
358 callback.apply(localClientConnection).whenComplete((result, error) -> {
359 if (error == null) {
360 executor.execute(() -> future.complete(result));
361 } else {
362 executor.execute(() -> future.completeExceptionally(error));
363 }
364 });
365 return future;
366 }
367
368 CompletableFuture<T> future = new CompletableFuture<>();
369 ChannelPool pool = channels.get(endpoint);
370 pool.acquire().addListener((FutureListener<Channel>) channelResult -> {
371 if (channelResult.isSuccess()) {
372 Channel channel = channelResult.getNow();
373 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
374 callback.apply(connection).whenComplete((result, error) -> {
375 pool.release(channel).addListener(releaseResult -> {
376 if (!releaseResult.isSuccess()) {
377 clientConnections.remove(channel);
378 connection.close();
379 }
380 });
381
382 if (error == null) {
383 executor.execute(() -> future.complete(result));
384 } else {
385 executor.execute(() -> future.completeExceptionally(error));
386 }
387 });
388 } else {
389 executor.execute(() -> future.completeExceptionally(channelResult.cause()));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800390 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700391 });
392 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800393 }
394
395 @Override
396 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900397 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700398 handlers.put(type, (message, connection) -> executor.execute(() ->
399 handler.accept(message.sender(), message.payload())));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800400 }
401
402 @Override
403 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900404 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700405 handlers.put(type, (message, connection) -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800406 byte[] responsePayload = null;
407 Status status = Status.OK;
408 try {
409 responsePayload = handler.apply(message.sender(), message.payload());
410 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700411 log.debug("An error occurred in a message handler: {}", e);
Madan Jampania9e70a62016-03-02 16:28:18 -0800412 status = Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800413 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700414 connection.reply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800415 }));
416 }
417
418 @Override
419 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900420 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700421 handlers.put(type, (message, connection) -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800422 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700423 Status status;
424 if (error == null) {
425 status = Status.OK;
426 } else {
427 log.debug("An error occurred in a message handler: {}", error);
428 status = Status.ERROR_HANDLER_EXCEPTION;
429 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700430 connection.reply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800431 });
432 });
433 }
434
435 @Override
436 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900437 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800438 handlers.remove(type);
439 }
440
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700441 private Bootstrap bootstrapClient(Endpoint endpoint) {
442 Bootstrap bootstrap = new Bootstrap();
443 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
444 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
445 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
446 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
447 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
448 bootstrap.group(clientGroup);
449 // TODO: Make this faster:
450 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
451 bootstrap.channel(clientChannelClass);
452 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
453 bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
454 return bootstrap;
455 }
456
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800457 private void startAcceptingConnections() throws InterruptedException {
458 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800459 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700460 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800461 b.option(ChannelOption.SO_RCVBUF, 1048576);
462 b.option(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700463 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800464 b.group(serverGroup, clientGroup);
465 b.channel(serverChannelClass);
466 if (enableNettyTls) {
467 b.childHandler(new SslServerCommunicationChannelInitializer());
468 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700469 b.childHandler(new BasicChannelInitializer());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800470 }
471 b.option(ChannelOption.SO_BACKLOG, 128);
472 b.childOption(ChannelOption.SO_KEEPALIVE, true);
473
474 // Bind and start to accept incoming connections.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700475 b.bind(localEndpoint.port()).sync().addListener(future -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800476 if (future.isSuccess()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700477 log.info("{} accepting incoming connections on port {}",
478 localEndpoint.host(), localEndpoint.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800479 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700480 log.warn("{} failed to bind to port {} due to {}",
481 localEndpoint.host(), localEndpoint.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800482 }
483 });
484 }
485
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700486 /**
487 * Channel pool handler.
488 */
489 private class ClientChannelPoolHandler extends AbstractChannelPoolHandler {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800490 @Override
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700491 public void channelCreated(Channel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800492 if (enableNettyTls) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700493 new SslClientCommunicationChannelInitializer().initChannel((SocketChannel) channel);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800494 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700495 new BasicChannelInitializer().initChannel((SocketChannel) channel);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800496 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800497 }
498 }
499
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700500 /**
501 * Channel initializer for TLS servers.
502 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800503 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800504 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
505 private final ChannelHandler encoder = new MessageEncoder(preamble);
506
507 @Override
508 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800509 SSLContext serverContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700510 serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800511
512 SSLEngine serverSslEngine = serverContext.createSSLEngine();
513
514 serverSslEngine.setNeedClientAuth(true);
515 serverSslEngine.setUseClientMode(false);
516 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
517 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
518 serverSslEngine.setEnableSessionCreation(true);
519
520 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
521 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700522 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800523 .addLast("handler", dispatcher);
524 }
525 }
526
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700527 /**
528 * Channel initializer for TLS clients.
529 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800530 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800531 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
532 private final ChannelHandler encoder = new MessageEncoder(preamble);
533
534 @Override
535 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800536 SSLContext clientContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700537 clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800538
539 SSLEngine clientSslEngine = clientContext.createSSLEngine();
540
541 clientSslEngine.setUseClientMode(true);
542 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
543 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
544 clientSslEngine.setEnableSessionCreation(true);
545
546 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
547 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700548 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800549 .addLast("handler", dispatcher);
550 }
551 }
552
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700553 /**
554 * Channel initializer for basic connections.
555 */
556 private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800557 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
558 private final ChannelHandler encoder = new MessageEncoder(preamble);
559
560 @Override
561 protected void initChannel(SocketChannel channel) throws Exception {
562 channel.pipeline()
563 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700564 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800565 .addLast("handler", dispatcher);
566 }
567 }
568
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700569 /**
570 * Channel inbound handler that dispatches messages to the appropriate handler.
571 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800572 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700573 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700574 // Effectively SimpleChannelInboundHandler<InternalMessage>,
575 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800576
577 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700578 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
579 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800580 try {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700581 if (message.isRequest()) {
582 RemoteServerConnection connection =
583 serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
584 connection.dispatch(message);
585 } else {
586 RemoteClientConnection connection =
587 clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
588 connection.dispatch(message);
589 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800590 } catch (RejectedExecutionException e) {
591 log.warn("Unable to dispatch message due to {}", e.getMessage());
592 }
593 }
594
595 @Override
596 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
597 log.error("Exception inside channel handling pipeline.", cause);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700598
599 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
600 if (clientConnection != null) {
601 clientConnection.close();
602 }
603
604 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
605 if (serverConnection != null) {
606 serverConnection.close();
607 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800608 context.close();
609 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700610
611 /**
612 * Returns true if the given message should be handled.
613 *
614 * @param msg inbound message
615 * @return true if {@code msg} is {@link InternalMessage} instance.
616 *
617 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
618 */
619 @Override
620 public final boolean acceptInboundMessage(Object msg) {
621 return msg instanceof InternalMessage;
622 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800623 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700624
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700625 /**
626 * Wraps a {@link CompletableFuture} and tracks its type and creation time.
627 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800628 private final class Callback {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700629 private final String type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800630 private final CompletableFuture<byte[]> future;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700631 private final long time = System.currentTimeMillis();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800632
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700633 Callback(String type, CompletableFuture<byte[]> future) {
634 this.type = type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800635 this.future = future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800636 }
637
638 public void complete(byte[] value) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700639 future.complete(value);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800640 }
641
642 public void completeExceptionally(Throwable error) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700643 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800644 }
645 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800646
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700647 /**
648 * Represents the client side of a connection to a local or remote server.
649 */
650 private interface ClientConnection {
651
652 /**
653 * Sends a message to the other side of the connection.
654 *
655 * @param message the message to send
656 * @return a completable future to be completed once the message has been sent
657 */
658 CompletableFuture<Void> sendAsync(InternalMessage message);
659
660 /**
661 * Sends a message to the other side of the connection, awaiting a reply.
662 *
663 * @param message the message to send
664 * @return a completable future to be completed once a reply is received or the request times out
665 */
666 CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
667
668 /**
669 * Closes the connection.
670 */
671 default void close() {
672 }
673 }
674
675 /**
676 * Represents the server side of a connection.
677 */
678 private interface ServerConnection {
679
680 /**
681 * Sends a reply to the other side of the connection.
682 *
683 * @param message the message to which to reply
684 * @param status the reply status
685 * @param payload the response payload
686 */
687 void reply(InternalMessage message, Status status, Optional<byte[]> payload);
688
689 /**
690 * Closes the connection.
691 */
692 default void close() {
693 }
694 }
695
696 /**
697 * Local connection implementation.
698 */
699 private final class LocalClientConnection implements ClientConnection {
700 @Override
701 public CompletableFuture<Void> sendAsync(InternalMessage message) {
702 BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
703 if (handler != null) {
704 handler.accept(message, localServerConnection);
705 } else {
706 log.debug("No handler for message type {} from {}", message.type(), message.sender());
707 }
708 return CompletableFuture.completedFuture(null);
709 }
710
711 @Override
712 public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
713 CompletableFuture<byte[]> future = new CompletableFuture<>();
714 BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
715 if (handler != null) {
716 handler.accept(message, new LocalServerConnection(future));
717 } else {
718 log.debug("No handler for message type {} from {}", message.type(), message.sender());
719 new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
720 }
721 return future;
722 }
723 }
724
725 /**
726 * Local server connection.
727 */
728 private final class LocalServerConnection implements ServerConnection {
729 private final CompletableFuture<byte[]> future;
730
731 LocalServerConnection(CompletableFuture<byte[]> future) {
732 this.future = future;
733 }
734
735 @Override
736 public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
737 if (future != null) {
738 if (status == Status.OK) {
739 future.complete(payload.orElse(EMPTY_PAYLOAD));
740 } else if (status == Status.ERROR_NO_HANDLER) {
741 future.completeExceptionally(new MessagingException.NoRemoteHandler());
742 } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
743 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
744 } else if (status == Status.PROTOCOL_EXCEPTION) {
745 future.completeExceptionally(new MessagingException.ProtocolException());
746 }
747 }
748 }
749 }
750
751 /**
752 * Remote connection implementation.
753 */
754 private final class RemoteClientConnection implements ClientConnection {
755 private final Channel channel;
756 private final Map<Long, Callback> futures = Maps.newConcurrentMap();
757 private final AtomicBoolean closed = new AtomicBoolean(false);
758
759 RemoteClientConnection(Channel channel) {
760 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800761 }
762
763 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700764 * Times out callbacks for this connection.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800765 */
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700766 private void timeoutCallbacks() {
767 // Store the current time.
768 long currentTime = System.currentTimeMillis();
769
770 // Iterate through future callbacks and time out callbacks that have been alive
771 // longer than the current timeout according to the message type.
772 Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
773 while (iterator.hasNext()) {
774 Callback callback = iterator.next().getValue();
775 try {
776 TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
777 long currentTimeout = timeoutHistory.currentTimeout;
778 if (currentTime - callback.time > currentTimeout) {
779 iterator.remove();
780 long elapsedTime = currentTime - callback.time;
781 timeoutHistory.addReplyTime(elapsedTime);
782 callback.completeExceptionally(
783 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
784 }
785 } catch (ExecutionException e) {
786 throw new AssertionError();
787 }
788 }
789 }
790
791 @Override
792 public CompletableFuture<Void> sendAsync(InternalMessage message) {
793 CompletableFuture<Void> future = new CompletableFuture<>();
794 channel.writeAndFlush(message).addListener(channelFuture -> {
795 if (!channelFuture.isSuccess()) {
796 future.completeExceptionally(channelFuture.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800797 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700798 future.complete(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800799 }
800 });
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700801 return future;
802 }
803
804 @Override
805 public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
806 CompletableFuture<byte[]> future = new CompletableFuture<>();
807 Callback callback = new Callback(message.type(), future);
808 futures.put(message.id(), callback);
809 channel.writeAndFlush(message).addListener(channelFuture -> {
810 if (!channelFuture.isSuccess()) {
811 futures.remove(message.id());
812 callback.completeExceptionally(channelFuture.cause());
813 }
814 });
815 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800816 }
817
818 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700819 * Dispatches a message to a local handler.
820 *
821 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800822 */
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700823 private void dispatch(InternalMessage message) {
824 if (message.preamble() != preamble) {
825 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
826 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800827 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700828
829 clockService.recordEventTime(message.time());
830
831 Callback callback = futures.remove(message.id());
832 if (callback != null) {
833 if (message.status() == Status.OK) {
834 callback.complete(message.payload());
835 } else if (message.status() == Status.ERROR_NO_HANDLER) {
836 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
837 } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
838 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
839 } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
840 callback.completeExceptionally(new MessagingException.ProtocolException());
841 }
842
843 try {
844 TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
845 timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
846 } catch (ExecutionException e) {
847 throw new AssertionError();
848 }
849 } else {
850 log.debug("Received a reply for message id:[{}]. "
851 + " from {}. But was unable to locate the"
852 + " request handle", message.id(), message.sender());
853 }
854 }
855
856 @Override
857 public void close() {
858 if (closed.compareAndSet(false, true)) {
859 timeoutFuture.cancel(false);
860 for (Callback callback : futures.values()) {
861 callback.completeExceptionally(new ConnectException());
862 }
863 }
864 }
865 }
866
867 /**
868 * Remote server connection.
869 */
870 private final class RemoteServerConnection implements ServerConnection {
871 private final Channel channel;
872
873 RemoteServerConnection(Channel channel) {
874 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800875 }
876
877 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700878 * Dispatches a message to a local handler.
879 *
880 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800881 */
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700882 private void dispatch(InternalMessage message) {
883 if (message.preamble() != preamble) {
884 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
885 reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
886 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800887 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700888
889 clockService.recordEventTime(message.time());
890
891 BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
892 if (handler != null) {
893 handler.accept(message, this);
894 } else {
895 log.debug("No handler for message type {} from {}", message.type(), message.sender());
896 reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
897 }
898 }
899
900 @Override
901 public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
902 InternalMessage response = new InternalMessage(preamble,
903 clockService.timeNow(),
904 message.id(),
905 localEndpoint,
906 payload.orElse(EMPTY_PAYLOAD),
907 status);
908 channel.writeAndFlush(response);
909 }
910 }
911
912 /**
913 * Request-reply timeout history tracker.
914 */
915 private static final class TimeoutHistory {
916 private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
917 private final AtomicLong maxReplyTime = new AtomicLong();
918 private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
919
920 /**
921 * Adds a reply time to the history.
922 *
923 * @param replyTime the reply time to add to the history
924 */
925 void addReplyTime(long replyTime) {
926 maxReplyTime.getAndAccumulate(replyTime, Math::max);
927 }
928
929 /**
930 * Computes the current timeout.
931 */
932 private void recomputeTimeoutMillis() {
933 double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
934 timeoutHistory.addValue(
935 Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
936 if (timeoutHistory.getN() == WINDOW_SIZE) {
937 this.currentTimeout = (long) timeoutHistory.getMax();
938 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800939 }
940 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900941}