blob: 669e824106a9499aad9772606dfc73bcf1779b69 [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import javax.net.ssl.KeyManagerFactory;
19import javax.net.ssl.SSLContext;
20import javax.net.ssl.SSLEngine;
21import javax.net.ssl.TrustManagerFactory;
Brian O'Connor740e98c2017-06-29 17:07:17 -070022import java.io.File;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080023import java.io.FileInputStream;
Brian O'Connor740e98c2017-06-29 17:07:17 -070024import java.io.FileNotFoundException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070025import java.net.ConnectException;
Brian O'Connor740e98c2017-06-29 17:07:17 -070026import java.security.Key;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080027import java.security.KeyStore;
Brian O'Connor740e98c2017-06-29 17:07:17 -070028import java.security.MessageDigest;
29import java.security.cert.Certificate;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070030import java.time.Duration;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070031import java.util.ArrayList;
Brian O'Connor740e98c2017-06-29 17:07:17 -070032import java.util.Enumeration;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070033import java.util.Iterator;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070034import java.util.List;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080035import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080036import java.util.Optional;
Brian O'Connor740e98c2017-06-29 17:07:17 -070037import java.util.StringJoiner;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080038import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070040import java.util.concurrent.ExecutionException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080041import java.util.concurrent.Executor;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070042import java.util.concurrent.Executors;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080043import java.util.concurrent.RejectedExecutionException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070044import java.util.concurrent.ScheduledExecutorService;
45import java.util.concurrent.ScheduledFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080046import java.util.concurrent.TimeUnit;
47import java.util.concurrent.TimeoutException;
48import java.util.concurrent.atomic.AtomicBoolean;
49import java.util.concurrent.atomic.AtomicLong;
50import java.util.function.BiConsumer;
51import java.util.function.BiFunction;
Jordan Haltermancd25bd72018-01-13 14:10:56 -080052import java.util.function.Consumer;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070053import java.util.function.Function;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080054
Jordan Haltermancd25bd72018-01-13 14:10:56 -080055import com.google.common.base.Throwables;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070056import com.google.common.cache.Cache;
57import com.google.common.cache.CacheBuilder;
58import com.google.common.collect.Lists;
59import com.google.common.collect.Maps;
60import com.google.common.util.concurrent.MoreExecutors;
61import io.netty.bootstrap.Bootstrap;
62import io.netty.bootstrap.ServerBootstrap;
63import io.netty.buffer.PooledByteBufAllocator;
64import io.netty.channel.Channel;
65import io.netty.channel.ChannelFuture;
66import io.netty.channel.ChannelHandler;
67import io.netty.channel.ChannelHandlerContext;
68import io.netty.channel.ChannelInitializer;
69import io.netty.channel.ChannelOption;
70import io.netty.channel.EventLoopGroup;
71import io.netty.channel.ServerChannel;
72import io.netty.channel.SimpleChannelInboundHandler;
73import io.netty.channel.WriteBufferWaterMark;
74import io.netty.channel.epoll.EpollEventLoopGroup;
75import io.netty.channel.epoll.EpollServerSocketChannel;
76import io.netty.channel.epoll.EpollSocketChannel;
77import io.netty.channel.nio.NioEventLoopGroup;
78import io.netty.channel.socket.SocketChannel;
79import io.netty.channel.socket.nio.NioServerSocketChannel;
80import io.netty.channel.socket.nio.NioSocketChannel;
81import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
82import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
83import org.apache.felix.scr.annotations.Activate;
84import org.apache.felix.scr.annotations.Component;
85import org.apache.felix.scr.annotations.Deactivate;
86import org.apache.felix.scr.annotations.Reference;
87import org.apache.felix.scr.annotations.ReferenceCardinality;
88import org.apache.felix.scr.annotations.Service;
89import org.onosproject.cluster.ClusterMetadataService;
90import org.onosproject.cluster.ControllerNode;
91import org.onosproject.core.HybridLogicalClockService;
92import org.onosproject.store.cluster.messaging.Endpoint;
93import org.onosproject.store.cluster.messaging.MessagingException;
94import org.onosproject.store.cluster.messaging.MessagingService;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070095import org.slf4j.Logger;
96import org.slf4j.LoggerFactory;
97
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070098import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090099import static org.onosproject.security.AppGuard.checkPermission;
100import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
101
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700102/**
103 * Netty based MessagingService.
104 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -0700105@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700106@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800107public class NettyMessagingManager implements MessagingService {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700108 private static final long DEFAULT_TIMEOUT_MILLIS = 500;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700109 private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
Jordan Halterman5a5ed592017-09-12 15:09:03 -0700110 private static final long MIN_TIMEOUT_MILLIS = 250;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700111 private static final long MAX_TIMEOUT_MILLIS = 5000;
112 private static final long TIMEOUT_INTERVAL = 50;
113 private static final int WINDOW_SIZE = 100;
114 private static final double TIMEOUT_MULTIPLIER = 2.5;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700115 private static final int CHANNEL_POOL_SIZE = 8;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700116
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700117 private static final byte[] EMPTY_PAYLOAD = new byte[0];
118
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700119 private final Logger log = LoggerFactory.getLogger(getClass());
120
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800121 private final LocalClientConnection localClientConnection = new LocalClientConnection();
122 private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800123
Brian O'Connor740e98c2017-06-29 17:07:17 -0700124 //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
125 private static final String CONFIG_DIR = "../config";
126 private static final String KS_FILE_NAME = "onos.jks";
127 private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
128 private static final String DEFAULT_KS_PASSWORD = "changeit";
129
Madan Jampani05833872016-07-12 23:01:39 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected HybridLogicalClockService clockService;
132
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700133 private Endpoint localEndpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800134 private int preamble;
135 private final AtomicBoolean started = new AtomicBoolean(false);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700136 private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700137 private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
138 private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800139 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800140
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700141 private ScheduledFuture<?> timeoutFuture;
142
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700143 private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800144
145 private EventLoopGroup serverGroup;
146 private EventLoopGroup clientGroup;
147 private Class<? extends ServerChannel> serverChannelClass;
148 private Class<? extends Channel> clientChannelClass;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700149 private ScheduledExecutorService timeoutExecutor;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800150
Brian O'Connor740e98c2017-06-29 17:07:17 -0700151 protected static final boolean TLS_ENABLED = true;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800152 protected static final boolean TLS_DISABLED = false;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700153 protected boolean enableNettyTls = TLS_ENABLED;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800154
Brian O'Connor740e98c2017-06-29 17:07:17 -0700155 protected TrustManagerFactory trustManager;
156 protected KeyManagerFactory keyManager;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900157
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700158 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700159 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700160
161 @Activate
162 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700163 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800164 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800165
166 if (started.get()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700167 log.warn("Already running at local endpoint: {}", localEndpoint);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800168 return;
169 }
170 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700171 this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800172 initEventLoopGroup();
173 startAcceptingConnections();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700174 timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
175 groupedThreads("NettyMessagingEvt", "timeout", log));
176 timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
177 this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800178 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700179 log.info("Started");
180 }
181
182 @Deactivate
183 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800184 if (started.get()) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800185 serverGroup.shutdownGracefully();
186 clientGroup.shutdownGracefully();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700187 timeoutFuture.cancel(false);
188 timeoutExecutor.shutdown();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800189 started.set(false);
190 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700191 log.info("Stopped");
192 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900193
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800194 private void getTlsParameters() {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700195 // default is TLS enabled unless key stores cannot be loaded
196 enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
197
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800198 if (enableNettyTls) {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700199 enableNettyTls = loadKeyStores();
200 }
201 }
202
203 private boolean loadKeyStores() {
204 // Maintain a local copy of the trust and key managers in case anything goes wrong
205 TrustManagerFactory tmf;
206 KeyManagerFactory kmf;
207 try {
208 String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
209 String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
210 char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
211 char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
212
213 tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
214 KeyStore ts = KeyStore.getInstance("JKS");
215 ts.load(new FileInputStream(tsLocation), tsPwd);
216 tmf.init(ts);
217
218 kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
219 KeyStore ks = KeyStore.getInstance("JKS");
220 ks.load(new FileInputStream(ksLocation), ksPwd);
221 kmf.init(ks, ksPwd);
222 if (log.isInfoEnabled()) {
223 logKeyStore(ks, ksLocation, ksPwd);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900224 }
Brian O'Connor740e98c2017-06-29 17:07:17 -0700225 } catch (FileNotFoundException e) {
226 log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
227 return TLS_DISABLED;
228 } catch (Exception e) {
229 //TODO we might want to catch exceptions more specifically
230 log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
231 return TLS_DISABLED;
232 }
233 this.trustManager = tmf;
234 this.keyManager = kmf;
235 return TLS_ENABLED;
236 }
237
238 private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
239 if (log.isInfoEnabled()) {
240 log.info("Loaded cluster key store from: {}", ksLocation);
241 try {
242 for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
243 String alias = e.nextElement();
244 Key key = ks.getKey(alias, ksPwd);
245 Certificate[] certs = ks.getCertificateChain(alias);
246 log.debug("{} -> {}", alias, certs);
247 final byte[] encodedKey;
248 if (certs != null && certs.length > 0) {
249 encodedKey = certs[0].getEncoded();
250 } else {
251 log.info("Could not find cert chain for {}, using fingerprint of key instead...", alias);
252 encodedKey = key.getEncoded();
253 }
254 // Compute the certificate's fingerprint (use the key if certificate cannot be found)
255 MessageDigest digest = MessageDigest.getInstance("SHA1");
256 digest.update(encodedKey);
257 StringJoiner fingerprint = new StringJoiner(":");
258 for (byte b : digest.digest()) {
259 fingerprint.add(String.format("%02X", b));
260 }
261 log.info("{} -> {}", alias, fingerprint);
262 }
263 } catch (Exception e) {
264 log.warn("Unable to print contents of key store: {}", ksLocation, e);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900265 }
266 }
267 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700268
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800269 private void initEventLoopGroup() {
270 // try Epoll first and if that does work, use nio.
271 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700272 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
273 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800274 serverChannelClass = EpollServerSocketChannel.class;
275 clientChannelClass = EpollSocketChannel.class;
276 return;
277 } catch (Throwable e) {
278 log.debug("Failed to initialize native (epoll) transport. "
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700279 + "Reason: {}. Proceeding with nio.", e.getMessage());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800280 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700281 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
282 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800283 serverChannelClass = NioServerSocketChannel.class;
284 clientChannelClass = NioSocketChannel.class;
285 }
286
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700287 /**
288 * Times out response callbacks.
289 */
290 private void timeoutAllCallbacks() {
291 // Iterate through all connections and time out callbacks.
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800292 localClientConnection.timeoutCallbacks();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700293 for (RemoteClientConnection connection : clientConnections.values()) {
294 connection.timeoutCallbacks();
295 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700296 }
297
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800298 @Override
299 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900300 checkPermission(CLUSTER_WRITE);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700301 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700302 clockService.timeNow(),
303 messageIdGenerator.incrementAndGet(),
304 localEndpoint,
305 type,
306 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700307 return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800308 }
309
310 @Override
311 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900312 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800313 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
314 }
315
316 @Override
317 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900318 checkPermission(CLUSTER_WRITE);
Jordan Haltermanb34d30b2017-08-28 15:35:03 -0700319 long messageId = messageIdGenerator.incrementAndGet();
Jordan Haltermane3813a92017-07-29 14:10:31 -0700320 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700321 clockService.timeNow(),
322 messageId,
323 localEndpoint,
324 type,
325 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700326 return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700327 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700328
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700329 private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
330 return channels.computeIfAbsent(endpoint, e -> {
331 List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
332 for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
333 defaultList.add(null);
334 }
335 return Lists.newCopyOnWriteArrayList(defaultList);
336 });
337 }
338
339 private int getChannelOffset(String messageType) {
340 return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
341 }
342
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700343 private <T> CompletableFuture<T> executeOnPooledConnection(
344 Endpoint endpoint,
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700345 String type,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700346 Function<ClientConnection, CompletableFuture<T>> callback,
347 Executor executor) {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700348 CompletableFuture<T> future = new CompletableFuture<T>();
349 executeOnPooledConnection(endpoint, type, callback, executor, future);
350 return future;
351 }
352
353 private <T> void executeOnPooledConnection(
Jordan Haltermancd25bd72018-01-13 14:10:56 -0800354 Endpoint endpoint,
355 String type,
356 Function<ClientConnection, CompletableFuture<T>> callback,
357 Executor executor,
358 CompletableFuture<T> future) {
359
360 // If the endpoint is the local node, avoid the loopback interface and use the singleton local connection.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700361 if (endpoint.equals(localEndpoint)) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700362 callback.apply(localClientConnection).whenComplete((result, error) -> {
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800363 if (error == null) {
364 executor.execute(() -> future.complete(result));
365 } else {
366 executor.execute(() -> future.completeExceptionally(error));
367 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700368 });
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700369 return;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700370 }
371
Jordan Haltermancd25bd72018-01-13 14:10:56 -0800372 // Get the channel pool and the offset for this message type.
373 List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
374 int offset = getChannelOffset(type);
375
376 // If the channel future is completed exceptionally, open a new channel.
377 CompletableFuture<Channel> channelFuture = channelPool.get(offset);
378 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
379 synchronized (channelPool) {
380 channelFuture = channelPool.get(offset);
381 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
382 channelFuture = openChannel(endpoint);
383 channelPool.set(offset, channelFuture);
384 }
385 }
386 }
387
388 // Create a consumer with which to complete the send operation on a given channel.
389 final Consumer<Channel> runner = channel -> {
390 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
391 callback.apply(connection).whenComplete((result, sendError) -> {
392 if (sendError == null) {
393 executor.execute(() -> future.complete(result));
394 } else {
395 // If an exception other than a TimeoutException occurred, close the connection and
396 // remove the channel from the pool.
397 if (!(Throwables.getRootCause(sendError) instanceof TimeoutException)) {
398 synchronized (channelPool) {
399 channelPool.set(offset, null);
400 }
401 connection.close();
402 channel.close();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700403 }
Jordan Haltermancd25bd72018-01-13 14:10:56 -0800404 executor.execute(() -> future.completeExceptionally(sendError));
405 }
406 });
407 };
408
409 // Wait for the channel future to be completed. Once it's complete, if the channel is active then
410 // attempt to send the message. Otherwise, if the channel is inactive then attempt to open a new channel.
411 final CompletableFuture<Channel> finalFuture = channelFuture;
412 finalFuture.whenComplete((channel, error) -> {
413 if (error == null) {
414 if (!channel.isActive()) {
415 synchronized (channelPool) {
416 CompletableFuture<Channel> currentFuture = channelPool.get(offset);
417 if (currentFuture == finalFuture) {
418 channelPool.set(offset, null);
419 executeOnPooledConnection(endpoint, type, callback, executor);
420 } else {
421 currentFuture.whenComplete((recursiveResult, recursiveError) -> {
422 if (recursiveError == null) {
423 runner.accept(recursiveResult);
424 } else {
425 future.completeExceptionally(recursiveError);
426 }
427 });
428 }
429 }
430 } else {
431 runner.accept(channel);
432 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700433 } else {
Jordan Haltermancd25bd72018-01-13 14:10:56 -0800434 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800435 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700436 });
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800437 }
438
439 @Override
440 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900441 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700442 handlers.put(type, (message, connection) -> executor.execute(() ->
443 handler.accept(message.sender(), message.payload())));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800444 }
445
446 @Override
447 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900448 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700449 handlers.put(type, (message, connection) -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800450 byte[] responsePayload = null;
Jordan Haltermane3813a92017-07-29 14:10:31 -0700451 InternalReply.Status status = InternalReply.Status.OK;
Madan Jampania9e70a62016-03-02 16:28:18 -0800452 try {
453 responsePayload = handler.apply(message.sender(), message.payload());
454 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700455 log.debug("An error occurred in a message handler: {}", e);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700456 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800457 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700458 connection.reply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800459 }));
460 }
461
462 @Override
463 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900464 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700465 handlers.put(type, (message, connection) -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800466 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700467 InternalReply.Status status;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700468 if (error == null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700469 status = InternalReply.Status.OK;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700470 } else {
471 log.debug("An error occurred in a message handler: {}", error);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700472 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700473 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700474 connection.reply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800475 });
476 });
477 }
478
479 @Override
480 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900481 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800482 handlers.remove(type);
483 }
484
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700485 private Bootstrap bootstrapClient(Endpoint endpoint) {
486 Bootstrap bootstrap = new Bootstrap();
487 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
488 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
489 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
490 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
491 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
492 bootstrap.group(clientGroup);
493 // TODO: Make this faster:
494 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
495 bootstrap.channel(clientChannelClass);
496 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
497 bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700498 if (enableNettyTls) {
499 bootstrap.handler(new SslClientCommunicationChannelInitializer());
500 } else {
501 bootstrap.handler(new BasicChannelInitializer());
502 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700503 return bootstrap;
504 }
505
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800506 private void startAcceptingConnections() throws InterruptedException {
507 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800508 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700509 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800510 b.option(ChannelOption.SO_RCVBUF, 1048576);
511 b.option(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700512 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800513 b.group(serverGroup, clientGroup);
514 b.channel(serverChannelClass);
515 if (enableNettyTls) {
516 b.childHandler(new SslServerCommunicationChannelInitializer());
517 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700518 b.childHandler(new BasicChannelInitializer());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800519 }
520 b.option(ChannelOption.SO_BACKLOG, 128);
521 b.childOption(ChannelOption.SO_KEEPALIVE, true);
522
523 // Bind and start to accept incoming connections.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700524 b.bind(localEndpoint.port()).sync().addListener(future -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800525 if (future.isSuccess()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700526 log.info("{} accepting incoming connections on port {}",
527 localEndpoint.host(), localEndpoint.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800528 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700529 log.warn("{} failed to bind to port {} due to {}",
530 localEndpoint.host(), localEndpoint.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800531 }
532 });
533 }
534
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700535 private CompletableFuture<Channel> openChannel(Endpoint ep) {
536 Bootstrap bootstrap = bootstrapClient(ep);
537 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
538 ChannelFuture f = bootstrap.connect();
539
540 f.addListener(future -> {
541 if (future.isSuccess()) {
542 retFuture.complete(f.channel());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800543 } else {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700544 retFuture.completeExceptionally(future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800545 }
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700546 });
547 log.debug("Established a new connection to {}", ep);
548 return retFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800549 }
550
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700551 /**
552 * Channel initializer for TLS servers.
553 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800554 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800555 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800556
557 @Override
558 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800559 SSLContext serverContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700560 serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800561
562 SSLEngine serverSslEngine = serverContext.createSSLEngine();
563
564 serverSslEngine.setNeedClientAuth(true);
565 serverSslEngine.setUseClientMode(false);
566 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
567 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
568 serverSslEngine.setEnableSessionCreation(true);
569
570 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700571 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700572 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800573 .addLast("handler", dispatcher);
574 }
575 }
576
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700577 /**
578 * Channel initializer for TLS clients.
579 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800580 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800581 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800582
583 @Override
584 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800585 SSLContext clientContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700586 clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800587
588 SSLEngine clientSslEngine = clientContext.createSSLEngine();
589
590 clientSslEngine.setUseClientMode(true);
591 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
592 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
593 clientSslEngine.setEnableSessionCreation(true);
594
595 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700596 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700597 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800598 .addLast("handler", dispatcher);
599 }
600 }
601
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700602 /**
603 * Channel initializer for basic connections.
604 */
605 private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800606 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800607
608 @Override
609 protected void initChannel(SocketChannel channel) throws Exception {
610 channel.pipeline()
Jordan Haltermane3813a92017-07-29 14:10:31 -0700611 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700612 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800613 .addLast("handler", dispatcher);
614 }
615 }
616
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700617 /**
618 * Channel inbound handler that dispatches messages to the appropriate handler.
619 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800620 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700621 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700622 // Effectively SimpleChannelInboundHandler<InternalMessage>,
623 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800624
625 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700626 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
627 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800628 try {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700629 if (message.isRequest()) {
630 RemoteServerConnection connection =
631 serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700632 connection.dispatch((InternalRequest) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700633 } else {
634 RemoteClientConnection connection =
635 clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700636 connection.dispatch((InternalReply) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700637 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800638 } catch (RejectedExecutionException e) {
639 log.warn("Unable to dispatch message due to {}", e.getMessage());
640 }
641 }
642
643 @Override
644 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
645 log.error("Exception inside channel handling pipeline.", cause);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700646
647 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
648 if (clientConnection != null) {
649 clientConnection.close();
650 }
651
652 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
653 if (serverConnection != null) {
654 serverConnection.close();
655 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800656 context.close();
657 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700658
659 /**
660 * Returns true if the given message should be handled.
661 *
662 * @param msg inbound message
663 * @return true if {@code msg} is {@link InternalMessage} instance.
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700664 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
665 */
666 @Override
667 public final boolean acceptInboundMessage(Object msg) {
668 return msg instanceof InternalMessage;
669 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800670 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700671
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700672 /**
673 * Wraps a {@link CompletableFuture} and tracks its type and creation time.
674 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800675 private final class Callback {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700676 private final String type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800677 private final CompletableFuture<byte[]> future;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700678 private final long time = System.currentTimeMillis();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800679
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700680 Callback(String type, CompletableFuture<byte[]> future) {
681 this.type = type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800682 this.future = future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800683 }
684
685 public void complete(byte[] value) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700686 future.complete(value);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800687 }
688
689 public void completeExceptionally(Throwable error) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700690 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800691 }
692 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800693
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700694 /**
695 * Represents the client side of a connection to a local or remote server.
696 */
697 private interface ClientConnection {
698
699 /**
700 * Sends a message to the other side of the connection.
701 *
702 * @param message the message to send
703 * @return a completable future to be completed once the message has been sent
704 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700705 CompletableFuture<Void> sendAsync(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700706
707 /**
708 * Sends a message to the other side of the connection, awaiting a reply.
709 *
710 * @param message the message to send
711 * @return a completable future to be completed once a reply is received or the request times out
712 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700713 CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700714
715 /**
716 * Closes the connection.
717 */
718 default void close() {
719 }
720 }
721
722 /**
723 * Represents the server side of a connection.
724 */
725 private interface ServerConnection {
726
727 /**
728 * Sends a reply to the other side of the connection.
729 *
730 * @param message the message to which to reply
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800731 * @param status the reply status
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700732 * @param payload the response payload
733 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700734 void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700735
736 /**
737 * Closes the connection.
738 */
739 default void close() {
740 }
741 }
742
743 /**
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800744 * Remote connection implementation.
745 */
746 private abstract class AbstractClientConnection implements ClientConnection {
747 private final Map<Long, Callback> futures = Maps.newConcurrentMap();
748 private final AtomicBoolean closed = new AtomicBoolean(false);
749 private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
750 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
751 .build();
752
753 /**
754 * Times out callbacks for this connection.
755 */
756 protected void timeoutCallbacks() {
757 // Store the current time.
758 long currentTime = System.currentTimeMillis();
759
760 // Iterate through future callbacks and time out callbacks that have been alive
761 // longer than the current timeout according to the message type.
762 Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
763 while (iterator.hasNext()) {
764 Callback callback = iterator.next().getValue();
765 try {
766 TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
767 long currentTimeout = timeoutHistory.currentTimeout;
768 if (currentTime - callback.time > currentTimeout) {
769 iterator.remove();
770 long elapsedTime = currentTime - callback.time;
771 timeoutHistory.addReplyTime(elapsedTime);
772 callback.completeExceptionally(
773 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
774 }
775 } catch (ExecutionException e) {
776 throw new AssertionError();
777 }
778 }
779
780 // Iterate through all timeout histories and recompute the timeout.
781 for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
782 timeoutHistory.recomputeTimeoutMillis();
783 }
784 }
785
786 protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
787 futures.put(id, new Callback(subject, future));
788 }
789
790 protected Callback completeCallback(long id) {
791 Callback callback = futures.remove(id);
792 if (callback != null) {
793 try {
794 TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
795 timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
796 } catch (ExecutionException e) {
797 throw new AssertionError();
798 }
799 }
800 return callback;
801 }
802
803 protected Callback failCallback(long id) {
804 return futures.remove(id);
805 }
806
807 @Override
808 public void close() {
809 if (closed.compareAndSet(false, true)) {
810 timeoutFuture.cancel(false);
811 for (Callback callback : futures.values()) {
812 callback.completeExceptionally(new ConnectException());
813 }
814 }
815 }
816 }
817
818 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700819 * Local connection implementation.
820 */
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800821 private final class LocalClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700822 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700823 public CompletableFuture<Void> sendAsync(InternalRequest message) {
824 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700825 if (handler != null) {
826 handler.accept(message, localServerConnection);
827 } else {
828 log.debug("No handler for message type {} from {}", message.type(), message.sender());
829 }
830 return CompletableFuture.completedFuture(null);
831 }
832
833 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700834 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700835 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800836 future.whenComplete((r, e) -> completeCallback(message.id()));
837 registerCallback(message.id(), message.subject(), future);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700838 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700839 if (handler != null) {
840 handler.accept(message, new LocalServerConnection(future));
841 } else {
842 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700843 new LocalServerConnection(future)
844 .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700845 }
846 return future;
847 }
848 }
849
850 /**
851 * Local server connection.
852 */
853 private final class LocalServerConnection implements ServerConnection {
854 private final CompletableFuture<byte[]> future;
855
856 LocalServerConnection(CompletableFuture<byte[]> future) {
857 this.future = future;
858 }
859
860 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700861 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700862 if (future != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700863 if (status == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700864 future.complete(payload.orElse(EMPTY_PAYLOAD));
Jordan Haltermane3813a92017-07-29 14:10:31 -0700865 } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700866 future.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700867 } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700868 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700869 } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700870 future.completeExceptionally(new MessagingException.ProtocolException());
871 }
872 }
873 }
874 }
875
876 /**
877 * Remote connection implementation.
878 */
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800879 private final class RemoteClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700880 private final Channel channel;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700881
882 RemoteClientConnection(Channel channel) {
883 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800884 }
885
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700886 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700887 public CompletableFuture<Void> sendAsync(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700888 CompletableFuture<Void> future = new CompletableFuture<>();
889 channel.writeAndFlush(message).addListener(channelFuture -> {
890 if (!channelFuture.isSuccess()) {
891 future.completeExceptionally(channelFuture.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800892 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700893 future.complete(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800894 }
895 });
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700896 return future;
897 }
898
899 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700900 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700901 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800902 registerCallback(message.id(), message.subject(), future);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700903 channel.writeAndFlush(message).addListener(channelFuture -> {
904 if (!channelFuture.isSuccess()) {
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800905 Callback callback = failCallback(message.id());
906 if (callback != null) {
907 callback.completeExceptionally(channelFuture.cause());
908 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700909 }
910 });
911 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800912 }
913
914 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700915 * Dispatches a message to a local handler.
916 *
917 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800918 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700919 private void dispatch(InternalReply message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700920 if (message.preamble() != preamble) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700921 log.debug("Received {} with invalid preamble", message.type());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700922 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800923 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700924
925 clockService.recordEventTime(message.time());
926
Jordan Haltermanfe2f93d2017-12-08 23:52:54 -0800927 Callback callback = completeCallback(message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700928 if (callback != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700929 if (message.status() == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700930 callback.complete(message.payload());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700931 } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700932 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700933 } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700934 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700935 } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700936 callback.completeExceptionally(new MessagingException.ProtocolException());
937 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700938 } else {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700939 log.debug("Received a reply for message id:[{}] "
940 + "but was unable to locate the"
941 + " request handle", message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700942 }
943 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700944 }
945
946 /**
947 * Remote server connection.
948 */
949 private final class RemoteServerConnection implements ServerConnection {
950 private final Channel channel;
951
952 RemoteServerConnection(Channel channel) {
953 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800954 }
955
956 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700957 * Dispatches a message to a local handler.
958 *
959 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800960 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700961 private void dispatch(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700962 if (message.preamble() != preamble) {
963 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700964 reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700965 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800966 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700967
968 clockService.recordEventTime(message.time());
969
Jordan Haltermane3813a92017-07-29 14:10:31 -0700970 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700971 if (handler != null) {
972 handler.accept(message, this);
973 } else {
974 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700975 reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700976 }
977 }
978
979 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700980 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
981 InternalReply response = new InternalReply(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700982 clockService.timeNow(),
983 message.id(),
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700984 payload.orElse(EMPTY_PAYLOAD),
985 status);
986 channel.writeAndFlush(response);
987 }
988 }
989
990 /**
991 * Request-reply timeout history tracker.
992 */
993 private static final class TimeoutHistory {
994 private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
995 private final AtomicLong maxReplyTime = new AtomicLong();
996 private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
997
998 /**
999 * Adds a reply time to the history.
1000 *
1001 * @param replyTime the reply time to add to the history
1002 */
1003 void addReplyTime(long replyTime) {
1004 maxReplyTime.getAndAccumulate(replyTime, Math::max);
1005 }
1006
1007 /**
1008 * Computes the current timeout.
1009 */
1010 private void recomputeTimeoutMillis() {
1011 double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
1012 timeoutHistory.addValue(
1013 Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
1014 if (timeoutHistory.getN() == WINDOW_SIZE) {
1015 this.currentTimeout = (long) timeoutHistory.getMax();
1016 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -08001017 }
1018 }
JunHuy Lam39eb4292015-06-26 17:24:23 +09001019}