blob: 4cc407bbe43dd3ad5dcb71c1bf4ce63f2de35694 [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import javax.net.ssl.KeyManagerFactory;
19import javax.net.ssl.SSLContext;
20import javax.net.ssl.SSLEngine;
21import javax.net.ssl.TrustManagerFactory;
Brian O'Connor740e98c2017-06-29 17:07:17 -070022import java.io.File;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080023import java.io.FileInputStream;
Brian O'Connor740e98c2017-06-29 17:07:17 -070024import java.io.FileNotFoundException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070025import java.net.ConnectException;
Brian O'Connor740e98c2017-06-29 17:07:17 -070026import java.security.Key;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080027import java.security.KeyStore;
Brian O'Connor740e98c2017-06-29 17:07:17 -070028import java.security.MessageDigest;
29import java.security.cert.Certificate;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070030import java.time.Duration;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070031import java.util.ArrayList;
Brian O'Connor740e98c2017-06-29 17:07:17 -070032import java.util.Enumeration;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070033import java.util.Iterator;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070034import java.util.List;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080035import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080036import java.util.Optional;
Brian O'Connor740e98c2017-06-29 17:07:17 -070037import java.util.StringJoiner;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080038import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070040import java.util.concurrent.ExecutionException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080041import java.util.concurrent.Executor;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070042import java.util.concurrent.Executors;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080043import java.util.concurrent.RejectedExecutionException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070044import java.util.concurrent.ScheduledExecutorService;
45import java.util.concurrent.ScheduledFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080046import java.util.concurrent.TimeUnit;
47import java.util.concurrent.TimeoutException;
48import java.util.concurrent.atomic.AtomicBoolean;
49import java.util.concurrent.atomic.AtomicLong;
50import java.util.function.BiConsumer;
51import java.util.function.BiFunction;
Jordan Halterman23e73c52018-01-13 14:10:56 -080052import java.util.function.Consumer;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070053import java.util.function.Function;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080054
Jordan Halterman23e73c52018-01-13 14:10:56 -080055import com.google.common.base.Throwables;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070056import com.google.common.cache.Cache;
57import com.google.common.cache.CacheBuilder;
58import com.google.common.collect.Lists;
59import com.google.common.collect.Maps;
60import com.google.common.util.concurrent.MoreExecutors;
61import io.netty.bootstrap.Bootstrap;
62import io.netty.bootstrap.ServerBootstrap;
63import io.netty.buffer.PooledByteBufAllocator;
64import io.netty.channel.Channel;
65import io.netty.channel.ChannelFuture;
66import io.netty.channel.ChannelHandler;
67import io.netty.channel.ChannelHandlerContext;
68import io.netty.channel.ChannelInitializer;
69import io.netty.channel.ChannelOption;
70import io.netty.channel.EventLoopGroup;
71import io.netty.channel.ServerChannel;
72import io.netty.channel.SimpleChannelInboundHandler;
73import io.netty.channel.WriteBufferWaterMark;
74import io.netty.channel.epoll.EpollEventLoopGroup;
75import io.netty.channel.epoll.EpollServerSocketChannel;
76import io.netty.channel.epoll.EpollSocketChannel;
77import io.netty.channel.nio.NioEventLoopGroup;
78import io.netty.channel.socket.SocketChannel;
79import io.netty.channel.socket.nio.NioServerSocketChannel;
80import io.netty.channel.socket.nio.NioSocketChannel;
81import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
82import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
83import org.apache.felix.scr.annotations.Activate;
84import org.apache.felix.scr.annotations.Component;
85import org.apache.felix.scr.annotations.Deactivate;
86import org.apache.felix.scr.annotations.Reference;
87import org.apache.felix.scr.annotations.ReferenceCardinality;
88import org.apache.felix.scr.annotations.Service;
89import org.onosproject.cluster.ClusterMetadataService;
90import org.onosproject.cluster.ControllerNode;
91import org.onosproject.core.HybridLogicalClockService;
92import org.onosproject.store.cluster.messaging.Endpoint;
93import org.onosproject.store.cluster.messaging.MessagingException;
94import org.onosproject.store.cluster.messaging.MessagingService;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070095import org.slf4j.Logger;
96import org.slf4j.LoggerFactory;
97
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070098import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090099import static org.onosproject.security.AppGuard.checkPermission;
100import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
101
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700102/**
103 * Netty based MessagingService.
104 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -0700105@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700106@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800107public class NettyMessagingManager implements MessagingService {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700108 private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700109 private static final long TIMEOUT_INTERVAL = 50;
110 private static final int WINDOW_SIZE = 100;
Jordan Haltermanef92f192017-12-21 11:59:38 -0800111 private static final int MIN_SAMPLES = 25;
112 private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
113 private static final int PHI_FAILURE_THRESHOLD = 5;
Jordan Halterman111aab72018-01-12 16:28:57 -0800114 private static final long MIN_TIMEOUT_MILLIS = 100;
Jordan Haltermanef92f192017-12-21 11:59:38 -0800115 private static final long MAX_TIMEOUT_MILLIS = 15000;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700116 private static final int CHANNEL_POOL_SIZE = 8;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700117
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700118 private static final byte[] EMPTY_PAYLOAD = new byte[0];
119
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700120 private final Logger log = LoggerFactory.getLogger(getClass());
121
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800122 private final LocalClientConnection localClientConnection = new LocalClientConnection();
123 private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800124
Brian O'Connor740e98c2017-06-29 17:07:17 -0700125 //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
126 private static final String CONFIG_DIR = "../config";
127 private static final String KS_FILE_NAME = "onos.jks";
128 private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
129 private static final String DEFAULT_KS_PASSWORD = "changeit";
130
Madan Jampani05833872016-07-12 23:01:39 -0700131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected HybridLogicalClockService clockService;
133
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700134 private Endpoint localEndpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800135 private int preamble;
136 private final AtomicBoolean started = new AtomicBoolean(false);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700137 private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700138 private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
139 private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800140 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800141
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700142 private ScheduledFuture<?> timeoutFuture;
143
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700144 private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800145
146 private EventLoopGroup serverGroup;
147 private EventLoopGroup clientGroup;
148 private Class<? extends ServerChannel> serverChannelClass;
149 private Class<? extends Channel> clientChannelClass;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700150 private ScheduledExecutorService timeoutExecutor;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800151
Brian O'Connor740e98c2017-06-29 17:07:17 -0700152 protected static final boolean TLS_ENABLED = true;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800153 protected static final boolean TLS_DISABLED = false;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700154 protected boolean enableNettyTls = TLS_ENABLED;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800155
Brian O'Connor740e98c2017-06-29 17:07:17 -0700156 protected TrustManagerFactory trustManager;
157 protected KeyManagerFactory keyManager;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900158
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700160 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700161
162 @Activate
Ray Milkey986a47a2018-01-25 11:38:51 -0800163 public void activate() throws InterruptedException {
Madan Jampaniec1df022015-10-13 21:23:03 -0700164 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800165 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800166
167 if (started.get()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700168 log.warn("Already running at local endpoint: {}", localEndpoint);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800169 return;
170 }
171 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700172 this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800173 initEventLoopGroup();
174 startAcceptingConnections();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700175 timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
176 groupedThreads("NettyMessagingEvt", "timeout", log));
177 timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
178 this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800179 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700180 log.info("Started");
181 }
182
183 @Deactivate
Ray Milkey986a47a2018-01-25 11:38:51 -0800184 public void deactivate() {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800185 if (started.get()) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800186 serverGroup.shutdownGracefully();
187 clientGroup.shutdownGracefully();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700188 timeoutFuture.cancel(false);
189 timeoutExecutor.shutdown();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800190 started.set(false);
191 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700192 log.info("Stopped");
193 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900194
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800195 private void getTlsParameters() {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700196 // default is TLS enabled unless key stores cannot be loaded
197 enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
198
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800199 if (enableNettyTls) {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700200 enableNettyTls = loadKeyStores();
201 }
202 }
203
204 private boolean loadKeyStores() {
205 // Maintain a local copy of the trust and key managers in case anything goes wrong
206 TrustManagerFactory tmf;
207 KeyManagerFactory kmf;
208 try {
209 String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
210 String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
211 char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
212 char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
213
214 tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
215 KeyStore ts = KeyStore.getInstance("JKS");
216 ts.load(new FileInputStream(tsLocation), tsPwd);
217 tmf.init(ts);
218
219 kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
220 KeyStore ks = KeyStore.getInstance("JKS");
221 ks.load(new FileInputStream(ksLocation), ksPwd);
222 kmf.init(ks, ksPwd);
223 if (log.isInfoEnabled()) {
224 logKeyStore(ks, ksLocation, ksPwd);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900225 }
Brian O'Connor740e98c2017-06-29 17:07:17 -0700226 } catch (FileNotFoundException e) {
227 log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
228 return TLS_DISABLED;
229 } catch (Exception e) {
230 //TODO we might want to catch exceptions more specifically
231 log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
232 return TLS_DISABLED;
233 }
234 this.trustManager = tmf;
235 this.keyManager = kmf;
236 return TLS_ENABLED;
237 }
238
239 private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
240 if (log.isInfoEnabled()) {
241 log.info("Loaded cluster key store from: {}", ksLocation);
242 try {
243 for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
244 String alias = e.nextElement();
245 Key key = ks.getKey(alias, ksPwd);
246 Certificate[] certs = ks.getCertificateChain(alias);
247 log.debug("{} -> {}", alias, certs);
248 final byte[] encodedKey;
249 if (certs != null && certs.length > 0) {
250 encodedKey = certs[0].getEncoded();
251 } else {
252 log.info("Could not find cert chain for {}, using fingerprint of key instead...", alias);
253 encodedKey = key.getEncoded();
254 }
255 // Compute the certificate's fingerprint (use the key if certificate cannot be found)
256 MessageDigest digest = MessageDigest.getInstance("SHA1");
257 digest.update(encodedKey);
258 StringJoiner fingerprint = new StringJoiner(":");
259 for (byte b : digest.digest()) {
260 fingerprint.add(String.format("%02X", b));
261 }
262 log.info("{} -> {}", alias, fingerprint);
263 }
264 } catch (Exception e) {
265 log.warn("Unable to print contents of key store: {}", ksLocation, e);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900266 }
267 }
268 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700269
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800270 private void initEventLoopGroup() {
271 // try Epoll first and if that does work, use nio.
272 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700273 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
274 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800275 serverChannelClass = EpollServerSocketChannel.class;
276 clientChannelClass = EpollSocketChannel.class;
277 return;
278 } catch (Throwable e) {
279 log.debug("Failed to initialize native (epoll) transport. "
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700280 + "Reason: {}. Proceeding with nio.", e.getMessage());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800281 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700282 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
283 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800284 serverChannelClass = NioServerSocketChannel.class;
285 clientChannelClass = NioSocketChannel.class;
286 }
287
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700288 /**
289 * Times out response callbacks.
290 */
291 private void timeoutAllCallbacks() {
292 // Iterate through all connections and time out callbacks.
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800293 localClientConnection.timeoutCallbacks();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700294 for (RemoteClientConnection connection : clientConnections.values()) {
295 connection.timeoutCallbacks();
296 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700297 }
298
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800299 @Override
300 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900301 checkPermission(CLUSTER_WRITE);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700302 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700303 clockService.timeNow(),
304 messageIdGenerator.incrementAndGet(),
305 localEndpoint,
306 type,
307 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700308 return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800309 }
310
311 @Override
312 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900313 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800314 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
315 }
316
317 @Override
318 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900319 checkPermission(CLUSTER_WRITE);
Jordan Halterman5ceb3892017-08-28 15:35:03 -0700320 long messageId = messageIdGenerator.incrementAndGet();
Jordan Haltermane3813a92017-07-29 14:10:31 -0700321 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700322 clockService.timeNow(),
323 messageId,
324 localEndpoint,
325 type,
326 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700327 return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700328 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700329
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700330 private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
331 return channels.computeIfAbsent(endpoint, e -> {
332 List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
333 for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
334 defaultList.add(null);
335 }
336 return Lists.newCopyOnWriteArrayList(defaultList);
337 });
338 }
339
340 private int getChannelOffset(String messageType) {
341 return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
342 }
343
Ray Milkey4f350762018-01-23 23:32:03 +0000344 private <T> CompletableFuture<T> executeOnPooledConnection(
345 Endpoint endpoint,
346 String type,
347 Function<ClientConnection, CompletableFuture<T>> callback,
348 Executor executor) {
349 CompletableFuture<T> future = new CompletableFuture<T>();
350 executeOnPooledConnection(endpoint, type, callback, executor, future);
351 return future;
352 }
353
354 private <T> void executeOnPooledConnection(
Jordan Halterman23e73c52018-01-13 14:10:56 -0800355 Endpoint endpoint,
356 String type,
357 Function<ClientConnection, CompletableFuture<T>> callback,
358 Executor executor,
359 CompletableFuture<T> future) {
360
361 // If the endpoint is the local node, avoid the loopback interface and use the singleton local connection.
Ray Milkey4f350762018-01-23 23:32:03 +0000362 if (endpoint.equals(localEndpoint)) {
363 callback.apply(localClientConnection).whenComplete((result, error) -> {
364 if (error == null) {
365 executor.execute(() -> future.complete(result));
366 } else {
367 executor.execute(() -> future.completeExceptionally(error));
368 }
369 });
370 return;
371 }
372
Jordan Halterman23e73c52018-01-13 14:10:56 -0800373 // Get the channel pool and the offset for this message type.
374 List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
375 int offset = getChannelOffset(type);
376
377 // If the channel future is completed exceptionally, open a new channel.
378 CompletableFuture<Channel> channelFuture = channelPool.get(offset);
379 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
380 synchronized (channelPool) {
381 channelFuture = channelPool.get(offset);
382 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
383 channelFuture = openChannel(endpoint);
384 channelPool.set(offset, channelFuture);
385 }
386 }
387 }
388
389 // Create a consumer with which to complete the send operation on a given channel.
390 final Consumer<Channel> runner = channel -> {
391 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
392 callback.apply(connection).whenComplete((result, sendError) -> {
393 if (sendError == null) {
394 executor.execute(() -> future.complete(result));
395 } else {
396 // If an exception other than a TimeoutException occurred, close the connection and
397 // remove the channel from the pool.
398 Throwable cause = Throwables.getRootCause(sendError);
399 if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
400 synchronized (channelPool) {
401 channelPool.set(offset, null);
402 }
403 channel.close();
404 clientConnections.remove(channel);
405 connection.close();
Ray Milkey4f350762018-01-23 23:32:03 +0000406 }
Jordan Halterman23e73c52018-01-13 14:10:56 -0800407 executor.execute(() -> future.completeExceptionally(sendError));
408 }
409 });
410 };
411
412 // Wait for the channel future to be completed. Once it's complete, if the channel is active then
413 // attempt to send the message. Otherwise, if the channel is inactive then attempt to open a new channel.
414 final CompletableFuture<Channel> finalFuture = channelFuture;
415 finalFuture.whenComplete((channel, error) -> {
416 if (error == null) {
417 if (!channel.isActive()) {
418 final CompletableFuture<Channel> currentFuture;
419 synchronized (channelPool) {
420 currentFuture = channelPool.get(offset);
421 if (currentFuture == finalFuture) {
422 channelPool.set(offset, null);
423 }
424 }
425 if (currentFuture == finalFuture) {
426 executeOnPooledConnection(endpoint, type, callback, executor);
427 } else {
428 currentFuture.whenComplete((recursiveResult, recursiveError) -> {
429 if (recursiveError == null) {
430 runner.accept(recursiveResult);
431 } else {
432 future.completeExceptionally(recursiveError);
433 }
434 });
435 }
436 } else {
437 runner.accept(channel);
438 }
Ray Milkey4f350762018-01-23 23:32:03 +0000439 } else {
Jordan Halterman23e73c52018-01-13 14:10:56 -0800440 future.completeExceptionally(error);
Ray Milkey4f350762018-01-23 23:32:03 +0000441 }
442 });
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800443 }
444
445 @Override
446 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900447 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700448 handlers.put(type, (message, connection) -> executor.execute(() ->
449 handler.accept(message.sender(), message.payload())));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800450 }
451
452 @Override
453 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900454 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700455 handlers.put(type, (message, connection) -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800456 byte[] responsePayload = null;
Jordan Haltermane3813a92017-07-29 14:10:31 -0700457 InternalReply.Status status = InternalReply.Status.OK;
Madan Jampania9e70a62016-03-02 16:28:18 -0800458 try {
459 responsePayload = handler.apply(message.sender(), message.payload());
460 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700461 log.debug("An error occurred in a message handler: {}", e);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700462 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800463 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700464 connection.reply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800465 }));
466 }
467
468 @Override
469 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900470 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700471 handlers.put(type, (message, connection) -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800472 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700473 InternalReply.Status status;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700474 if (error == null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700475 status = InternalReply.Status.OK;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700476 } else {
477 log.debug("An error occurred in a message handler: {}", error);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700478 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700479 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700480 connection.reply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800481 });
482 });
483 }
484
485 @Override
486 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900487 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800488 handlers.remove(type);
489 }
490
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700491 private Bootstrap bootstrapClient(Endpoint endpoint) {
492 Bootstrap bootstrap = new Bootstrap();
493 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
494 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
495 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
496 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
497 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
498 bootstrap.group(clientGroup);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700499 bootstrap.channel(clientChannelClass);
500 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
501 bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700502 if (enableNettyTls) {
503 bootstrap.handler(new SslClientCommunicationChannelInitializer());
504 } else {
505 bootstrap.handler(new BasicChannelInitializer());
506 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700507 return bootstrap;
508 }
509
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800510 private void startAcceptingConnections() throws InterruptedException {
511 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800512 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700513 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800514 b.option(ChannelOption.SO_RCVBUF, 1048576);
Jordan Halterman153dbd52017-12-21 11:08:19 -0800515 b.childOption(ChannelOption.SO_KEEPALIVE, true);
516 b.childOption(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700517 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800518 b.group(serverGroup, clientGroup);
519 b.channel(serverChannelClass);
520 if (enableNettyTls) {
521 b.childHandler(new SslServerCommunicationChannelInitializer());
522 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700523 b.childHandler(new BasicChannelInitializer());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800524 }
525 b.option(ChannelOption.SO_BACKLOG, 128);
526 b.childOption(ChannelOption.SO_KEEPALIVE, true);
527
528 // Bind and start to accept incoming connections.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700529 b.bind(localEndpoint.port()).sync().addListener(future -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800530 if (future.isSuccess()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700531 log.info("{} accepting incoming connections on port {}",
532 localEndpoint.host(), localEndpoint.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800533 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700534 log.warn("{} failed to bind to port {} due to {}",
535 localEndpoint.host(), localEndpoint.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800536 }
537 });
538 }
539
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700540 private CompletableFuture<Channel> openChannel(Endpoint ep) {
541 Bootstrap bootstrap = bootstrapClient(ep);
542 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
543 ChannelFuture f = bootstrap.connect();
544
545 f.addListener(future -> {
546 if (future.isSuccess()) {
547 retFuture.complete(f.channel());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800548 } else {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700549 retFuture.completeExceptionally(future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800550 }
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700551 });
552 log.debug("Established a new connection to {}", ep);
553 return retFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800554 }
555
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700556 /**
557 * Channel initializer for TLS servers.
558 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800559 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800560 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800561
562 @Override
563 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800564 SSLContext serverContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700565 serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800566
567 SSLEngine serverSslEngine = serverContext.createSSLEngine();
568
569 serverSslEngine.setNeedClientAuth(true);
570 serverSslEngine.setUseClientMode(false);
571 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
572 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
573 serverSslEngine.setEnableSessionCreation(true);
574
575 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700576 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700577 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800578 .addLast("handler", dispatcher);
579 }
580 }
581
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700582 /**
583 * Channel initializer for TLS clients.
584 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800585 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800586 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800587
588 @Override
589 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800590 SSLContext clientContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700591 clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800592
593 SSLEngine clientSslEngine = clientContext.createSSLEngine();
594
595 clientSslEngine.setUseClientMode(true);
596 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
597 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
598 clientSslEngine.setEnableSessionCreation(true);
599
600 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700601 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700602 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800603 .addLast("handler", dispatcher);
604 }
605 }
606
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700607 /**
608 * Channel initializer for basic connections.
609 */
610 private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800611 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800612
613 @Override
614 protected void initChannel(SocketChannel channel) throws Exception {
615 channel.pipeline()
Jordan Haltermane3813a92017-07-29 14:10:31 -0700616 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700617 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800618 .addLast("handler", dispatcher);
619 }
620 }
621
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700622 /**
623 * Channel inbound handler that dispatches messages to the appropriate handler.
624 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800625 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700626 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700627 // Effectively SimpleChannelInboundHandler<InternalMessage>,
628 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800629
630 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700631 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
632 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800633 try {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700634 if (message.isRequest()) {
635 RemoteServerConnection connection =
636 serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700637 connection.dispatch((InternalRequest) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700638 } else {
639 RemoteClientConnection connection =
640 clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700641 connection.dispatch((InternalReply) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700642 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800643 } catch (RejectedExecutionException e) {
644 log.warn("Unable to dispatch message due to {}", e.getMessage());
645 }
646 }
647
648 @Override
649 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
650 log.error("Exception inside channel handling pipeline.", cause);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700651
652 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
653 if (clientConnection != null) {
654 clientConnection.close();
655 }
656
657 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
658 if (serverConnection != null) {
659 serverConnection.close();
660 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800661 context.close();
662 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700663
Jordan Halterman23e73c52018-01-13 14:10:56 -0800664 @Override
665 public void channelInactive(ChannelHandlerContext context) throws Exception {
666 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
667 if (clientConnection != null) {
668 clientConnection.close();
669 }
670
671 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
672 if (serverConnection != null) {
673 serverConnection.close();
674 }
675 context.close();
676 }
677
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700678 /**
679 * Returns true if the given message should be handled.
680 *
681 * @param msg inbound message
682 * @return true if {@code msg} is {@link InternalMessage} instance.
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700683 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
684 */
685 @Override
686 public final boolean acceptInboundMessage(Object msg) {
687 return msg instanceof InternalMessage;
688 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800689 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700690
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700691 /**
692 * Wraps a {@link CompletableFuture} and tracks its type and creation time.
693 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800694 private final class Callback {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700695 private final String type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800696 private final CompletableFuture<byte[]> future;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700697 private final long time = System.currentTimeMillis();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800698
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700699 Callback(String type, CompletableFuture<byte[]> future) {
700 this.type = type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800701 this.future = future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800702 }
703
704 public void complete(byte[] value) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700705 future.complete(value);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800706 }
707
708 public void completeExceptionally(Throwable error) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700709 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800710 }
711 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800712
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700713 /**
714 * Represents the client side of a connection to a local or remote server.
715 */
716 private interface ClientConnection {
717
718 /**
719 * Sends a message to the other side of the connection.
720 *
721 * @param message the message to send
722 * @return a completable future to be completed once the message has been sent
723 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700724 CompletableFuture<Void> sendAsync(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700725
726 /**
727 * Sends a message to the other side of the connection, awaiting a reply.
728 *
729 * @param message the message to send
730 * @return a completable future to be completed once a reply is received or the request times out
731 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700732 CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700733
734 /**
735 * Closes the connection.
736 */
737 default void close() {
738 }
739 }
740
741 /**
742 * Represents the server side of a connection.
743 */
744 private interface ServerConnection {
745
746 /**
747 * Sends a reply to the other side of the connection.
748 *
749 * @param message the message to which to reply
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800750 * @param status the reply status
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700751 * @param payload the response payload
752 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700753 void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700754
755 /**
756 * Closes the connection.
757 */
758 default void close() {
759 }
760 }
761
762 /**
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800763 * Remote connection implementation.
764 */
765 private abstract class AbstractClientConnection implements ClientConnection {
766 private final Map<Long, Callback> futures = Maps.newConcurrentMap();
767 private final AtomicBoolean closed = new AtomicBoolean(false);
Jordan Haltermanef92f192017-12-21 11:59:38 -0800768 private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800769 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
770 .build();
771
772 /**
773 * Times out callbacks for this connection.
774 */
Jordan Haltermanef92f192017-12-21 11:59:38 -0800775 void timeoutCallbacks() {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800776 // Store the current time.
777 long currentTime = System.currentTimeMillis();
778
779 // Iterate through future callbacks and time out callbacks that have been alive
780 // longer than the current timeout according to the message type.
781 Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
782 while (iterator.hasNext()) {
783 Callback callback = iterator.next().getValue();
784 try {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800785 RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
786 long elapsedTime = currentTime - callback.time;
Jordan Halterman111aab72018-01-12 16:28:57 -0800787 if (elapsedTime > MAX_TIMEOUT_MILLIS ||
788 (elapsedTime > MIN_TIMEOUT_MILLIS && requestMonitor.isTimedOut(elapsedTime))) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800789 iterator.remove();
Jordan Haltermanef92f192017-12-21 11:59:38 -0800790 requestMonitor.addReplyTime(elapsedTime);
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800791 callback.completeExceptionally(
792 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
793 }
794 } catch (ExecutionException e) {
795 throw new AssertionError();
796 }
797 }
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800798 }
799
800 protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
801 futures.put(id, new Callback(subject, future));
802 }
803
804 protected Callback completeCallback(long id) {
805 Callback callback = futures.remove(id);
806 if (callback != null) {
807 try {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800808 RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
809 requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800810 } catch (ExecutionException e) {
811 throw new AssertionError();
812 }
813 }
814 return callback;
815 }
816
817 protected Callback failCallback(long id) {
818 return futures.remove(id);
819 }
820
821 @Override
822 public void close() {
823 if (closed.compareAndSet(false, true)) {
824 timeoutFuture.cancel(false);
825 for (Callback callback : futures.values()) {
826 callback.completeExceptionally(new ConnectException());
827 }
828 }
829 }
830 }
831
832 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700833 * Local connection implementation.
834 */
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800835 private final class LocalClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700836 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700837 public CompletableFuture<Void> sendAsync(InternalRequest message) {
838 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700839 if (handler != null) {
840 handler.accept(message, localServerConnection);
841 } else {
842 log.debug("No handler for message type {} from {}", message.type(), message.sender());
843 }
844 return CompletableFuture.completedFuture(null);
845 }
846
847 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700848 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700849 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800850 future.whenComplete((r, e) -> completeCallback(message.id()));
851 registerCallback(message.id(), message.subject(), future);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700852 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700853 if (handler != null) {
854 handler.accept(message, new LocalServerConnection(future));
855 } else {
856 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700857 new LocalServerConnection(future)
858 .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700859 }
860 return future;
861 }
862 }
863
864 /**
865 * Local server connection.
866 */
867 private final class LocalServerConnection implements ServerConnection {
868 private final CompletableFuture<byte[]> future;
869
870 LocalServerConnection(CompletableFuture<byte[]> future) {
871 this.future = future;
872 }
873
874 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700875 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700876 if (future != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700877 if (status == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700878 future.complete(payload.orElse(EMPTY_PAYLOAD));
Jordan Haltermane3813a92017-07-29 14:10:31 -0700879 } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700880 future.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700881 } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700882 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700883 } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700884 future.completeExceptionally(new MessagingException.ProtocolException());
885 }
886 }
887 }
888 }
889
890 /**
891 * Remote connection implementation.
892 */
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800893 private final class RemoteClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700894 private final Channel channel;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700895
896 RemoteClientConnection(Channel channel) {
897 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800898 }
899
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700900 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700901 public CompletableFuture<Void> sendAsync(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700902 CompletableFuture<Void> future = new CompletableFuture<>();
903 channel.writeAndFlush(message).addListener(channelFuture -> {
904 if (!channelFuture.isSuccess()) {
905 future.completeExceptionally(channelFuture.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800906 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700907 future.complete(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800908 }
909 });
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700910 return future;
911 }
912
913 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700914 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700915 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800916 registerCallback(message.id(), message.subject(), future);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700917 channel.writeAndFlush(message).addListener(channelFuture -> {
918 if (!channelFuture.isSuccess()) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800919 Callback callback = failCallback(message.id());
920 if (callback != null) {
921 callback.completeExceptionally(channelFuture.cause());
922 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700923 }
924 });
925 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800926 }
927
928 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700929 * Dispatches a message to a local handler.
930 *
931 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800932 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700933 private void dispatch(InternalReply message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700934 if (message.preamble() != preamble) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700935 log.debug("Received {} with invalid preamble", message.type());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700936 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800937 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700938
939 clockService.recordEventTime(message.time());
940
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800941 Callback callback = completeCallback(message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700942 if (callback != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700943 if (message.status() == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700944 callback.complete(message.payload());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700945 } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700946 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700947 } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700948 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700949 } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700950 callback.completeExceptionally(new MessagingException.ProtocolException());
951 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700952 } else {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700953 log.debug("Received a reply for message id:[{}] "
954 + "but was unable to locate the"
955 + " request handle", message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700956 }
957 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700958 }
959
960 /**
961 * Remote server connection.
962 */
963 private final class RemoteServerConnection implements ServerConnection {
964 private final Channel channel;
965
966 RemoteServerConnection(Channel channel) {
967 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800968 }
969
970 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700971 * Dispatches a message to a local handler.
972 *
973 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800974 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700975 private void dispatch(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700976 if (message.preamble() != preamble) {
977 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700978 reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700979 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800980 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700981
982 clockService.recordEventTime(message.time());
983
Jordan Haltermane3813a92017-07-29 14:10:31 -0700984 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700985 if (handler != null) {
986 handler.accept(message, this);
987 } else {
988 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700989 reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700990 }
991 }
992
993 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700994 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
995 InternalReply response = new InternalReply(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700996 clockService.timeNow(),
997 message.id(),
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700998 payload.orElse(EMPTY_PAYLOAD),
999 status);
1000 channel.writeAndFlush(response);
1001 }
1002 }
1003
1004 /**
1005 * Request-reply timeout history tracker.
1006 */
Jordan Haltermanef92f192017-12-21 11:59:38 -08001007 private static final class RequestMonitor {
1008 private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001009
1010 /**
1011 * Adds a reply time to the history.
1012 *
1013 * @param replyTime the reply time to add to the history
1014 */
1015 void addReplyTime(long replyTime) {
Jordan Haltermanef92f192017-12-21 11:59:38 -08001016 samples.addValue(replyTime);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001017 }
1018
1019 /**
Jordan Haltermanef92f192017-12-21 11:59:38 -08001020 * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
1021 *
1022 * @param elapsedTime the elapsed request time
1023 * @return indicates whether the request should be timed out
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001024 */
Jordan Haltermanef92f192017-12-21 11:59:38 -08001025 boolean isTimedOut(long elapsedTime) {
1026 return phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
1027 }
1028
1029 /**
1030 * Compute phi for the specified node id.
1031 *
1032 * @param elapsedTime the duration since the request was sent
1033 * @return phi value
1034 */
1035 private double phi(long elapsedTime) {
1036 if (samples.getN() < MIN_SAMPLES) {
1037 return 0.0;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001038 }
Jordan Haltermanef92f192017-12-21 11:59:38 -08001039 return computePhi(samples, elapsedTime);
1040 }
1041
1042 /**
1043 * Computes the phi value from the given samples.
1044 *
1045 * @param samples the samples from which to compute phi
1046 * @param elapsedTime the duration since the request was sent
1047 * @return phi
1048 */
1049 private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
1050 return samples.getN() > 0 ? PHI_FACTOR * elapsedTime / samples.getMean() : 100;
Aaron Kruglikov1b727382016-02-09 16:17:47 -08001051 }
1052 }
JunHuy Lam39eb4292015-06-26 17:24:23 +09001053}