blob: 9a90999ebf5d91db647a4f6827a66b0beb63bbba [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import javax.net.ssl.KeyManagerFactory;
19import javax.net.ssl.SSLContext;
20import javax.net.ssl.SSLEngine;
21import javax.net.ssl.TrustManagerFactory;
Brian O'Connor740e98c2017-06-29 17:07:17 -070022import java.io.File;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080023import java.io.FileInputStream;
Brian O'Connor740e98c2017-06-29 17:07:17 -070024import java.io.FileNotFoundException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070025import java.net.ConnectException;
Brian O'Connor740e98c2017-06-29 17:07:17 -070026import java.security.Key;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080027import java.security.KeyStore;
Brian O'Connor740e98c2017-06-29 17:07:17 -070028import java.security.MessageDigest;
29import java.security.cert.Certificate;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070030import java.time.Duration;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070031import java.util.ArrayList;
Brian O'Connor740e98c2017-06-29 17:07:17 -070032import java.util.Enumeration;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070033import java.util.Iterator;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070034import java.util.List;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080035import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080036import java.util.Optional;
Brian O'Connor740e98c2017-06-29 17:07:17 -070037import java.util.StringJoiner;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080038import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070040import java.util.concurrent.ExecutionException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080041import java.util.concurrent.Executor;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070042import java.util.concurrent.Executors;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080043import java.util.concurrent.RejectedExecutionException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070044import java.util.concurrent.ScheduledExecutorService;
45import java.util.concurrent.ScheduledFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080046import java.util.concurrent.TimeUnit;
47import java.util.concurrent.TimeoutException;
48import java.util.concurrent.atomic.AtomicBoolean;
49import java.util.concurrent.atomic.AtomicLong;
50import java.util.function.BiConsumer;
51import java.util.function.BiFunction;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070052import java.util.function.Function;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080053
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070054import com.google.common.cache.Cache;
55import com.google.common.cache.CacheBuilder;
56import com.google.common.collect.Lists;
57import com.google.common.collect.Maps;
58import com.google.common.util.concurrent.MoreExecutors;
59import io.netty.bootstrap.Bootstrap;
60import io.netty.bootstrap.ServerBootstrap;
61import io.netty.buffer.PooledByteBufAllocator;
62import io.netty.channel.Channel;
63import io.netty.channel.ChannelFuture;
64import io.netty.channel.ChannelHandler;
65import io.netty.channel.ChannelHandlerContext;
66import io.netty.channel.ChannelInitializer;
67import io.netty.channel.ChannelOption;
68import io.netty.channel.EventLoopGroup;
69import io.netty.channel.ServerChannel;
70import io.netty.channel.SimpleChannelInboundHandler;
71import io.netty.channel.WriteBufferWaterMark;
72import io.netty.channel.epoll.EpollEventLoopGroup;
73import io.netty.channel.epoll.EpollServerSocketChannel;
74import io.netty.channel.epoll.EpollSocketChannel;
75import io.netty.channel.nio.NioEventLoopGroup;
76import io.netty.channel.socket.SocketChannel;
77import io.netty.channel.socket.nio.NioServerSocketChannel;
78import io.netty.channel.socket.nio.NioSocketChannel;
79import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
80import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
81import org.apache.felix.scr.annotations.Activate;
82import org.apache.felix.scr.annotations.Component;
83import org.apache.felix.scr.annotations.Deactivate;
84import org.apache.felix.scr.annotations.Reference;
85import org.apache.felix.scr.annotations.ReferenceCardinality;
86import org.apache.felix.scr.annotations.Service;
87import org.onosproject.cluster.ClusterMetadataService;
88import org.onosproject.cluster.ControllerNode;
89import org.onosproject.core.HybridLogicalClockService;
90import org.onosproject.store.cluster.messaging.Endpoint;
91import org.onosproject.store.cluster.messaging.MessagingException;
92import org.onosproject.store.cluster.messaging.MessagingService;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070093import org.slf4j.Logger;
94import org.slf4j.LoggerFactory;
95
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070096import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090097import static org.onosproject.security.AppGuard.checkPermission;
98import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
99
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700100/**
101 * Netty based MessagingService.
102 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -0700103@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700104@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800105public class NettyMessagingManager implements MessagingService {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700106 private static final long DEFAULT_TIMEOUT_MILLIS = 500;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700107 private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700108 private static final long MIN_TIMEOUT_MILLIS = 100;
109 private static final long MAX_TIMEOUT_MILLIS = 5000;
110 private static final long TIMEOUT_INTERVAL = 50;
111 private static final int WINDOW_SIZE = 100;
112 private static final double TIMEOUT_MULTIPLIER = 2.5;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700113 private static final int CHANNEL_POOL_SIZE = 8;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700114
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700115 private static final byte[] EMPTY_PAYLOAD = new byte[0];
116
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700117 private final Logger log = LoggerFactory.getLogger(getClass());
118
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700119 private final ClientConnection localClientConnection = new LocalClientConnection();
120 private final ServerConnection localServerConnection = new LocalServerConnection(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800121
Brian O'Connor740e98c2017-06-29 17:07:17 -0700122 //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
123 private static final String CONFIG_DIR = "../config";
124 private static final String KS_FILE_NAME = "onos.jks";
125 private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
126 private static final String DEFAULT_KS_PASSWORD = "changeit";
127
Madan Jampani05833872016-07-12 23:01:39 -0700128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected HybridLogicalClockService clockService;
130
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700131 private Endpoint localEndpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800132 private int preamble;
133 private final AtomicBoolean started = new AtomicBoolean(false);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700134 private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700135 private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
136 private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800137 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800138
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700139 private ScheduledFuture<?> timeoutFuture;
140
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700141 private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800142
143 private EventLoopGroup serverGroup;
144 private EventLoopGroup clientGroup;
145 private Class<? extends ServerChannel> serverChannelClass;
146 private Class<? extends Channel> clientChannelClass;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700147 private ScheduledExecutorService timeoutExecutor;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800148
Brian O'Connor740e98c2017-06-29 17:07:17 -0700149 protected static final boolean TLS_ENABLED = true;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800150 protected static final boolean TLS_DISABLED = false;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700151 protected boolean enableNettyTls = TLS_ENABLED;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800152
Brian O'Connor740e98c2017-06-29 17:07:17 -0700153 protected TrustManagerFactory trustManager;
154 protected KeyManagerFactory keyManager;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900155
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700157 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700158
159 @Activate
160 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700161 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800162 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800163
164 if (started.get()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700165 log.warn("Already running at local endpoint: {}", localEndpoint);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800166 return;
167 }
168 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700169 this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800170 initEventLoopGroup();
171 startAcceptingConnections();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700172 timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
173 groupedThreads("NettyMessagingEvt", "timeout", log));
174 timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
175 this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800176 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800182 if (started.get()) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800183 serverGroup.shutdownGracefully();
184 clientGroup.shutdownGracefully();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700185 timeoutFuture.cancel(false);
186 timeoutExecutor.shutdown();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800187 started.set(false);
188 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700189 log.info("Stopped");
190 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900191
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800192 private void getTlsParameters() {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700193 // default is TLS enabled unless key stores cannot be loaded
194 enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
195
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800196 if (enableNettyTls) {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700197 enableNettyTls = loadKeyStores();
198 }
199 }
200
201 private boolean loadKeyStores() {
202 // Maintain a local copy of the trust and key managers in case anything goes wrong
203 TrustManagerFactory tmf;
204 KeyManagerFactory kmf;
205 try {
206 String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
207 String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
208 char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
209 char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
210
211 tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
212 KeyStore ts = KeyStore.getInstance("JKS");
213 ts.load(new FileInputStream(tsLocation), tsPwd);
214 tmf.init(ts);
215
216 kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
217 KeyStore ks = KeyStore.getInstance("JKS");
218 ks.load(new FileInputStream(ksLocation), ksPwd);
219 kmf.init(ks, ksPwd);
220 if (log.isInfoEnabled()) {
221 logKeyStore(ks, ksLocation, ksPwd);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900222 }
Brian O'Connor740e98c2017-06-29 17:07:17 -0700223 } catch (FileNotFoundException e) {
224 log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
225 return TLS_DISABLED;
226 } catch (Exception e) {
227 //TODO we might want to catch exceptions more specifically
228 log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
229 return TLS_DISABLED;
230 }
231 this.trustManager = tmf;
232 this.keyManager = kmf;
233 return TLS_ENABLED;
234 }
235
236 private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
237 if (log.isInfoEnabled()) {
238 log.info("Loaded cluster key store from: {}", ksLocation);
239 try {
240 for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
241 String alias = e.nextElement();
242 Key key = ks.getKey(alias, ksPwd);
243 Certificate[] certs = ks.getCertificateChain(alias);
244 log.debug("{} -> {}", alias, certs);
245 final byte[] encodedKey;
246 if (certs != null && certs.length > 0) {
247 encodedKey = certs[0].getEncoded();
248 } else {
249 log.info("Could not find cert chain for {}, using fingerprint of key instead...", alias);
250 encodedKey = key.getEncoded();
251 }
252 // Compute the certificate's fingerprint (use the key if certificate cannot be found)
253 MessageDigest digest = MessageDigest.getInstance("SHA1");
254 digest.update(encodedKey);
255 StringJoiner fingerprint = new StringJoiner(":");
256 for (byte b : digest.digest()) {
257 fingerprint.add(String.format("%02X", b));
258 }
259 log.info("{} -> {}", alias, fingerprint);
260 }
261 } catch (Exception e) {
262 log.warn("Unable to print contents of key store: {}", ksLocation, e);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900263 }
264 }
265 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700266
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800267 private void initEventLoopGroup() {
268 // try Epoll first and if that does work, use nio.
269 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700270 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
271 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800272 serverChannelClass = EpollServerSocketChannel.class;
273 clientChannelClass = EpollSocketChannel.class;
274 return;
275 } catch (Throwable e) {
276 log.debug("Failed to initialize native (epoll) transport. "
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700277 + "Reason: {}. Proceeding with nio.", e.getMessage());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800278 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700279 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
280 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800281 serverChannelClass = NioServerSocketChannel.class;
282 clientChannelClass = NioSocketChannel.class;
283 }
284
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700285 /**
286 * Times out response callbacks.
287 */
288 private void timeoutAllCallbacks() {
289 // Iterate through all connections and time out callbacks.
290 for (RemoteClientConnection connection : clientConnections.values()) {
291 connection.timeoutCallbacks();
292 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700293 }
294
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800295 @Override
296 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900297 checkPermission(CLUSTER_WRITE);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700298 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700299 clockService.timeNow(),
300 messageIdGenerator.incrementAndGet(),
301 localEndpoint,
302 type,
303 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700304 return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800305 }
306
307 @Override
308 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900309 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800310 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
311 }
312
313 @Override
314 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900315 checkPermission(CLUSTER_WRITE);
Jordan Haltermanb34d30b2017-08-28 15:35:03 -0700316 long messageId = messageIdGenerator.incrementAndGet();
Jordan Haltermane3813a92017-07-29 14:10:31 -0700317 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700318 clockService.timeNow(),
319 messageId,
320 localEndpoint,
321 type,
322 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700323 return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700324 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700325
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700326 private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
327 return channels.computeIfAbsent(endpoint, e -> {
328 List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
329 for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
330 defaultList.add(null);
331 }
332 return Lists.newCopyOnWriteArrayList(defaultList);
333 });
334 }
335
336 private int getChannelOffset(String messageType) {
337 return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
338 }
339
340 private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
341 List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
342 int offset = getChannelOffset(messageType);
343
344 CompletableFuture<Channel> channelFuture = channelPool.get(offset);
345 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
346 synchronized (channelPool) {
347 channelFuture = channelPool.get(offset);
348 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
349 channelFuture = openChannel(endpoint);
350 channelPool.set(offset, channelFuture);
351 }
352 }
353 }
354
355 CompletableFuture<Channel> future = new CompletableFuture<>();
356 final CompletableFuture<Channel> finalFuture = channelFuture;
357 finalFuture.whenComplete((channel, error) -> {
358 if (error == null) {
359 if (!channel.isActive()) {
360 synchronized (channelPool) {
361 CompletableFuture<Channel> currentFuture = channelPool.get(offset);
362 if (currentFuture == finalFuture) {
363 channelPool.set(offset, null);
364 getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
365 if (recursiveError == null) {
366 future.complete(recursiveResult);
367 } else {
368 future.completeExceptionally(recursiveError);
369 }
370 });
371 } else {
372 currentFuture.whenComplete((recursiveResult, recursiveError) -> {
373 if (recursiveError == null) {
374 future.complete(recursiveResult);
375 } else {
376 future.completeExceptionally(recursiveError);
377 }
378 });
379 }
380 }
381 } else {
382 future.complete(channel);
383 }
384 } else {
385 future.completeExceptionally(error);
386 }
387 });
388 return future;
389 }
390
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700391 private <T> CompletableFuture<T> executeOnPooledConnection(
392 Endpoint endpoint,
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700393 String type,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700394 Function<ClientConnection, CompletableFuture<T>> callback,
395 Executor executor) {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700396 CompletableFuture<T> future = new CompletableFuture<T>();
397 executeOnPooledConnection(endpoint, type, callback, executor, future);
398 return future;
399 }
400
401 private <T> void executeOnPooledConnection(
402 Endpoint endpoint,
403 String type,
404 Function<ClientConnection, CompletableFuture<T>> callback,
405 Executor executor,
406 CompletableFuture<T> future) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700407 if (endpoint.equals(localEndpoint)) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700408 callback.apply(localClientConnection).whenComplete((result, error) -> {
409 if (error == null) {
410 executor.execute(() -> future.complete(result));
411 } else {
412 executor.execute(() -> future.completeExceptionally(error));
413 }
414 });
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700415 return;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700416 }
417
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700418 getChannel(endpoint, type).whenComplete((channel, channelError) -> {
419 if (channelError == null) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700420 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700421 callback.apply(connection).whenComplete((result, sendError) -> {
422 if (sendError == null) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700423 executor.execute(() -> future.complete(result));
424 } else {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700425 executor.execute(() -> future.completeExceptionally(sendError));
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700426 }
427 });
428 } else {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700429 executor.execute(() -> future.completeExceptionally(channelError));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800430 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700431 });
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800432 }
433
434 @Override
435 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900436 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700437 handlers.put(type, (message, connection) -> executor.execute(() ->
438 handler.accept(message.sender(), message.payload())));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800439 }
440
441 @Override
442 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900443 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700444 handlers.put(type, (message, connection) -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800445 byte[] responsePayload = null;
Jordan Haltermane3813a92017-07-29 14:10:31 -0700446 InternalReply.Status status = InternalReply.Status.OK;
Madan Jampania9e70a62016-03-02 16:28:18 -0800447 try {
448 responsePayload = handler.apply(message.sender(), message.payload());
449 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700450 log.debug("An error occurred in a message handler: {}", e);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700451 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800452 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700453 connection.reply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800454 }));
455 }
456
457 @Override
458 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900459 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700460 handlers.put(type, (message, connection) -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800461 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700462 InternalReply.Status status;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700463 if (error == null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700464 status = InternalReply.Status.OK;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700465 } else {
466 log.debug("An error occurred in a message handler: {}", error);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700467 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700468 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700469 connection.reply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800470 });
471 });
472 }
473
474 @Override
475 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900476 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800477 handlers.remove(type);
478 }
479
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700480 private Bootstrap bootstrapClient(Endpoint endpoint) {
481 Bootstrap bootstrap = new Bootstrap();
482 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
483 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
484 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
485 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
486 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
487 bootstrap.group(clientGroup);
488 // TODO: Make this faster:
489 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
490 bootstrap.channel(clientChannelClass);
491 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
492 bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700493 if (enableNettyTls) {
494 bootstrap.handler(new SslClientCommunicationChannelInitializer());
495 } else {
496 bootstrap.handler(new BasicChannelInitializer());
497 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700498 return bootstrap;
499 }
500
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800501 private void startAcceptingConnections() throws InterruptedException {
502 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800503 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700504 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800505 b.option(ChannelOption.SO_RCVBUF, 1048576);
506 b.option(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700507 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800508 b.group(serverGroup, clientGroup);
509 b.channel(serverChannelClass);
510 if (enableNettyTls) {
511 b.childHandler(new SslServerCommunicationChannelInitializer());
512 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700513 b.childHandler(new BasicChannelInitializer());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800514 }
515 b.option(ChannelOption.SO_BACKLOG, 128);
516 b.childOption(ChannelOption.SO_KEEPALIVE, true);
517
518 // Bind and start to accept incoming connections.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700519 b.bind(localEndpoint.port()).sync().addListener(future -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800520 if (future.isSuccess()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700521 log.info("{} accepting incoming connections on port {}",
522 localEndpoint.host(), localEndpoint.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800523 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700524 log.warn("{} failed to bind to port {} due to {}",
525 localEndpoint.host(), localEndpoint.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800526 }
527 });
528 }
529
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700530 private CompletableFuture<Channel> openChannel(Endpoint ep) {
531 Bootstrap bootstrap = bootstrapClient(ep);
532 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
533 ChannelFuture f = bootstrap.connect();
534
535 f.addListener(future -> {
536 if (future.isSuccess()) {
537 retFuture.complete(f.channel());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800538 } else {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700539 retFuture.completeExceptionally(future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800540 }
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700541 });
542 log.debug("Established a new connection to {}", ep);
543 return retFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800544 }
545
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700546 /**
547 * Channel initializer for TLS servers.
548 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800549 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800550 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800551
552 @Override
553 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800554 SSLContext serverContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700555 serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800556
557 SSLEngine serverSslEngine = serverContext.createSSLEngine();
558
559 serverSslEngine.setNeedClientAuth(true);
560 serverSslEngine.setUseClientMode(false);
561 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
562 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
563 serverSslEngine.setEnableSessionCreation(true);
564
565 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700566 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700567 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800568 .addLast("handler", dispatcher);
569 }
570 }
571
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700572 /**
573 * Channel initializer for TLS clients.
574 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800575 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800576 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800577
578 @Override
579 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800580 SSLContext clientContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700581 clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800582
583 SSLEngine clientSslEngine = clientContext.createSSLEngine();
584
585 clientSslEngine.setUseClientMode(true);
586 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
587 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
588 clientSslEngine.setEnableSessionCreation(true);
589
590 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700591 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700592 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800593 .addLast("handler", dispatcher);
594 }
595 }
596
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700597 /**
598 * Channel initializer for basic connections.
599 */
600 private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800601 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800602
603 @Override
604 protected void initChannel(SocketChannel channel) throws Exception {
605 channel.pipeline()
Jordan Haltermane3813a92017-07-29 14:10:31 -0700606 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700607 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800608 .addLast("handler", dispatcher);
609 }
610 }
611
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700612 /**
613 * Channel inbound handler that dispatches messages to the appropriate handler.
614 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800615 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700616 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700617 // Effectively SimpleChannelInboundHandler<InternalMessage>,
618 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800619
620 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700621 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
622 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800623 try {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700624 if (message.isRequest()) {
625 RemoteServerConnection connection =
626 serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700627 connection.dispatch((InternalRequest) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700628 } else {
629 RemoteClientConnection connection =
630 clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700631 connection.dispatch((InternalReply) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700632 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800633 } catch (RejectedExecutionException e) {
634 log.warn("Unable to dispatch message due to {}", e.getMessage());
635 }
636 }
637
638 @Override
639 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
640 log.error("Exception inside channel handling pipeline.", cause);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700641
642 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
643 if (clientConnection != null) {
644 clientConnection.close();
645 }
646
647 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
648 if (serverConnection != null) {
649 serverConnection.close();
650 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800651 context.close();
652 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700653
654 /**
655 * Returns true if the given message should be handled.
656 *
657 * @param msg inbound message
658 * @return true if {@code msg} is {@link InternalMessage} instance.
659 *
660 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
661 */
662 @Override
663 public final boolean acceptInboundMessage(Object msg) {
664 return msg instanceof InternalMessage;
665 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800666 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700667
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700668 /**
669 * Wraps a {@link CompletableFuture} and tracks its type and creation time.
670 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800671 private final class Callback {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700672 private final String type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800673 private final CompletableFuture<byte[]> future;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700674 private final long time = System.currentTimeMillis();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800675
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700676 Callback(String type, CompletableFuture<byte[]> future) {
677 this.type = type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800678 this.future = future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800679 }
680
681 public void complete(byte[] value) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700682 future.complete(value);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800683 }
684
685 public void completeExceptionally(Throwable error) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700686 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800687 }
688 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800689
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700690 /**
691 * Represents the client side of a connection to a local or remote server.
692 */
693 private interface ClientConnection {
694
695 /**
696 * Sends a message to the other side of the connection.
697 *
698 * @param message the message to send
699 * @return a completable future to be completed once the message has been sent
700 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700701 CompletableFuture<Void> sendAsync(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700702
703 /**
704 * Sends a message to the other side of the connection, awaiting a reply.
705 *
706 * @param message the message to send
707 * @return a completable future to be completed once a reply is received or the request times out
708 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700709 CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700710
711 /**
712 * Closes the connection.
713 */
714 default void close() {
715 }
716 }
717
718 /**
719 * Represents the server side of a connection.
720 */
721 private interface ServerConnection {
722
723 /**
724 * Sends a reply to the other side of the connection.
725 *
726 * @param message the message to which to reply
727 * @param status the reply status
728 * @param payload the response payload
729 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700730 void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700731
732 /**
733 * Closes the connection.
734 */
735 default void close() {
736 }
737 }
738
739 /**
740 * Local connection implementation.
741 */
742 private final class LocalClientConnection implements ClientConnection {
743 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700744 public CompletableFuture<Void> sendAsync(InternalRequest message) {
745 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700746 if (handler != null) {
747 handler.accept(message, localServerConnection);
748 } else {
749 log.debug("No handler for message type {} from {}", message.type(), message.sender());
750 }
751 return CompletableFuture.completedFuture(null);
752 }
753
754 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700755 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700756 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Haltermane3813a92017-07-29 14:10:31 -0700757 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700758 if (handler != null) {
759 handler.accept(message, new LocalServerConnection(future));
760 } else {
761 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700762 new LocalServerConnection(future)
763 .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700764 }
765 return future;
766 }
767 }
768
769 /**
770 * Local server connection.
771 */
772 private final class LocalServerConnection implements ServerConnection {
773 private final CompletableFuture<byte[]> future;
774
775 LocalServerConnection(CompletableFuture<byte[]> future) {
776 this.future = future;
777 }
778
779 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700780 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700781 if (future != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700782 if (status == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700783 future.complete(payload.orElse(EMPTY_PAYLOAD));
Jordan Haltermane3813a92017-07-29 14:10:31 -0700784 } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700785 future.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700786 } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700787 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700788 } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700789 future.completeExceptionally(new MessagingException.ProtocolException());
790 }
791 }
792 }
793 }
794
795 /**
796 * Remote connection implementation.
797 */
798 private final class RemoteClientConnection implements ClientConnection {
799 private final Channel channel;
800 private final Map<Long, Callback> futures = Maps.newConcurrentMap();
801 private final AtomicBoolean closed = new AtomicBoolean(false);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700802 private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
803 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
804 .build();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700805
806 RemoteClientConnection(Channel channel) {
807 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800808 }
809
810 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700811 * Times out callbacks for this connection.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800812 */
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700813 private void timeoutCallbacks() {
814 // Store the current time.
815 long currentTime = System.currentTimeMillis();
816
817 // Iterate through future callbacks and time out callbacks that have been alive
818 // longer than the current timeout according to the message type.
819 Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
820 while (iterator.hasNext()) {
821 Callback callback = iterator.next().getValue();
822 try {
823 TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
824 long currentTimeout = timeoutHistory.currentTimeout;
825 if (currentTime - callback.time > currentTimeout) {
826 iterator.remove();
827 long elapsedTime = currentTime - callback.time;
828 timeoutHistory.addReplyTime(elapsedTime);
829 callback.completeExceptionally(
830 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
831 }
832 } catch (ExecutionException e) {
833 throw new AssertionError();
834 }
835 }
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700836
837 // Iterate through all timeout histories and recompute the timeout.
838 for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
839 timeoutHistory.recomputeTimeoutMillis();
840 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700841 }
842
843 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700844 public CompletableFuture<Void> sendAsync(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700845 CompletableFuture<Void> future = new CompletableFuture<>();
846 channel.writeAndFlush(message).addListener(channelFuture -> {
847 if (!channelFuture.isSuccess()) {
848 future.completeExceptionally(channelFuture.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800849 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700850 future.complete(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800851 }
852 });
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700853 return future;
854 }
855
856 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700857 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700858 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Haltermane3813a92017-07-29 14:10:31 -0700859 Callback callback = new Callback(message.subject(), future);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700860 futures.put(message.id(), callback);
861 channel.writeAndFlush(message).addListener(channelFuture -> {
862 if (!channelFuture.isSuccess()) {
863 futures.remove(message.id());
864 callback.completeExceptionally(channelFuture.cause());
865 }
866 });
867 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800868 }
869
870 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700871 * Dispatches a message to a local handler.
872 *
873 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800874 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700875 private void dispatch(InternalReply message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700876 if (message.preamble() != preamble) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700877 log.debug("Received {} with invalid preamble", message.type());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700878 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800879 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700880
881 clockService.recordEventTime(message.time());
882
883 Callback callback = futures.remove(message.id());
884 if (callback != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700885 if (message.status() == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700886 callback.complete(message.payload());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700887 } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700888 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700889 } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700890 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700891 } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700892 callback.completeExceptionally(new MessagingException.ProtocolException());
893 }
894
895 try {
896 TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
897 timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
898 } catch (ExecutionException e) {
899 throw new AssertionError();
900 }
901 } else {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700902 log.debug("Received a reply for message id:[{}] "
903 + "but was unable to locate the"
904 + " request handle", message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700905 }
906 }
907
908 @Override
909 public void close() {
910 if (closed.compareAndSet(false, true)) {
911 timeoutFuture.cancel(false);
912 for (Callback callback : futures.values()) {
913 callback.completeExceptionally(new ConnectException());
914 }
915 }
916 }
917 }
918
919 /**
920 * Remote server connection.
921 */
922 private final class RemoteServerConnection implements ServerConnection {
923 private final Channel channel;
924
925 RemoteServerConnection(Channel channel) {
926 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800927 }
928
929 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700930 * Dispatches a message to a local handler.
931 *
932 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800933 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700934 private void dispatch(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700935 if (message.preamble() != preamble) {
936 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700937 reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700938 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800939 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700940
941 clockService.recordEventTime(message.time());
942
Jordan Haltermane3813a92017-07-29 14:10:31 -0700943 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700944 if (handler != null) {
945 handler.accept(message, this);
946 } else {
947 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700948 reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700949 }
950 }
951
952 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700953 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
954 InternalReply response = new InternalReply(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700955 clockService.timeNow(),
956 message.id(),
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700957 payload.orElse(EMPTY_PAYLOAD),
958 status);
959 channel.writeAndFlush(response);
960 }
961 }
962
963 /**
964 * Request-reply timeout history tracker.
965 */
966 private static final class TimeoutHistory {
967 private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
968 private final AtomicLong maxReplyTime = new AtomicLong();
969 private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
970
971 /**
972 * Adds a reply time to the history.
973 *
974 * @param replyTime the reply time to add to the history
975 */
976 void addReplyTime(long replyTime) {
977 maxReplyTime.getAndAccumulate(replyTime, Math::max);
978 }
979
980 /**
981 * Computes the current timeout.
982 */
983 private void recomputeTimeoutMillis() {
984 double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
985 timeoutHistory.addValue(
986 Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
987 if (timeoutHistory.getN() == WINDOW_SIZE) {
988 this.currentTimeout = (long) timeoutHistory.getMax();
989 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800990 }
991 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900992}