blob: d7e478b5511dfbd3215f839761d2be328b897d02 [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import javax.net.ssl.KeyManagerFactory;
19import javax.net.ssl.SSLContext;
20import javax.net.ssl.SSLEngine;
21import javax.net.ssl.TrustManagerFactory;
Brian O'Connor740e98c2017-06-29 17:07:17 -070022import java.io.File;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080023import java.io.FileInputStream;
Brian O'Connor740e98c2017-06-29 17:07:17 -070024import java.io.FileNotFoundException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070025import java.net.ConnectException;
Brian O'Connor740e98c2017-06-29 17:07:17 -070026import java.security.Key;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080027import java.security.KeyStore;
Brian O'Connor740e98c2017-06-29 17:07:17 -070028import java.security.MessageDigest;
29import java.security.cert.Certificate;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070030import java.time.Duration;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070031import java.util.ArrayList;
Brian O'Connor740e98c2017-06-29 17:07:17 -070032import java.util.Enumeration;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070033import java.util.Iterator;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070034import java.util.List;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080035import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080036import java.util.Optional;
Brian O'Connor740e98c2017-06-29 17:07:17 -070037import java.util.StringJoiner;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080038import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070040import java.util.concurrent.ExecutionException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080041import java.util.concurrent.Executor;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070042import java.util.concurrent.Executors;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080043import java.util.concurrent.RejectedExecutionException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070044import java.util.concurrent.ScheduledExecutorService;
45import java.util.concurrent.ScheduledFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080046import java.util.concurrent.TimeUnit;
47import java.util.concurrent.TimeoutException;
48import java.util.concurrent.atomic.AtomicBoolean;
49import java.util.concurrent.atomic.AtomicLong;
50import java.util.function.BiConsumer;
51import java.util.function.BiFunction;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070052import java.util.function.Function;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080053
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070054import com.google.common.cache.Cache;
55import com.google.common.cache.CacheBuilder;
56import com.google.common.collect.Lists;
57import com.google.common.collect.Maps;
58import com.google.common.util.concurrent.MoreExecutors;
59import io.netty.bootstrap.Bootstrap;
60import io.netty.bootstrap.ServerBootstrap;
61import io.netty.buffer.PooledByteBufAllocator;
62import io.netty.channel.Channel;
63import io.netty.channel.ChannelFuture;
64import io.netty.channel.ChannelHandler;
65import io.netty.channel.ChannelHandlerContext;
66import io.netty.channel.ChannelInitializer;
67import io.netty.channel.ChannelOption;
68import io.netty.channel.EventLoopGroup;
69import io.netty.channel.ServerChannel;
70import io.netty.channel.SimpleChannelInboundHandler;
71import io.netty.channel.WriteBufferWaterMark;
72import io.netty.channel.epoll.EpollEventLoopGroup;
73import io.netty.channel.epoll.EpollServerSocketChannel;
74import io.netty.channel.epoll.EpollSocketChannel;
75import io.netty.channel.nio.NioEventLoopGroup;
76import io.netty.channel.socket.SocketChannel;
77import io.netty.channel.socket.nio.NioServerSocketChannel;
78import io.netty.channel.socket.nio.NioSocketChannel;
79import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
80import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
81import org.apache.felix.scr.annotations.Activate;
82import org.apache.felix.scr.annotations.Component;
83import org.apache.felix.scr.annotations.Deactivate;
84import org.apache.felix.scr.annotations.Reference;
85import org.apache.felix.scr.annotations.ReferenceCardinality;
86import org.apache.felix.scr.annotations.Service;
87import org.onosproject.cluster.ClusterMetadataService;
88import org.onosproject.cluster.ControllerNode;
89import org.onosproject.core.HybridLogicalClockService;
90import org.onosproject.store.cluster.messaging.Endpoint;
91import org.onosproject.store.cluster.messaging.MessagingException;
92import org.onosproject.store.cluster.messaging.MessagingService;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070093import org.slf4j.Logger;
94import org.slf4j.LoggerFactory;
95
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070096import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090097import static org.onosproject.security.AppGuard.checkPermission;
98import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
99
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700100/**
101 * Netty based MessagingService.
102 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -0700103@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700104@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800105public class NettyMessagingManager implements MessagingService {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700106 private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700107 private static final long TIMEOUT_INTERVAL = 50;
108 private static final int WINDOW_SIZE = 100;
Jordan Haltermanef92f192017-12-21 11:59:38 -0800109 private static final int MIN_SAMPLES = 25;
110 private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
111 private static final int PHI_FAILURE_THRESHOLD = 5;
Jordan Halterman111aab72018-01-12 16:28:57 -0800112 private static final long MIN_TIMEOUT_MILLIS = 100;
Jordan Haltermanef92f192017-12-21 11:59:38 -0800113 private static final long MAX_TIMEOUT_MILLIS = 15000;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700114 private static final int CHANNEL_POOL_SIZE = 8;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700115
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700116 private static final byte[] EMPTY_PAYLOAD = new byte[0];
117
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700118 private final Logger log = LoggerFactory.getLogger(getClass());
119
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800120 private final LocalClientConnection localClientConnection = new LocalClientConnection();
121 private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800122
Brian O'Connor740e98c2017-06-29 17:07:17 -0700123 //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
124 private static final String CONFIG_DIR = "../config";
125 private static final String KS_FILE_NAME = "onos.jks";
126 private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
127 private static final String DEFAULT_KS_PASSWORD = "changeit";
128
Madan Jampani05833872016-07-12 23:01:39 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected HybridLogicalClockService clockService;
131
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700132 private Endpoint localEndpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800133 private int preamble;
134 private final AtomicBoolean started = new AtomicBoolean(false);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700135 private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700136 private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
137 private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800138 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800139
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700140 private ScheduledFuture<?> timeoutFuture;
141
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700142 private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800143
144 private EventLoopGroup serverGroup;
145 private EventLoopGroup clientGroup;
146 private Class<? extends ServerChannel> serverChannelClass;
147 private Class<? extends Channel> clientChannelClass;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700148 private ScheduledExecutorService timeoutExecutor;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800149
Brian O'Connor740e98c2017-06-29 17:07:17 -0700150 protected static final boolean TLS_ENABLED = true;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800151 protected static final boolean TLS_DISABLED = false;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700152 protected boolean enableNettyTls = TLS_ENABLED;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800153
Brian O'Connor740e98c2017-06-29 17:07:17 -0700154 protected TrustManagerFactory trustManager;
155 protected KeyManagerFactory keyManager;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900156
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700157 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700158 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700159
160 @Activate
Ray Milkey986a47a2018-01-25 11:38:51 -0800161 public void activate() throws InterruptedException {
Madan Jampaniec1df022015-10-13 21:23:03 -0700162 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800163 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800164
165 if (started.get()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700166 log.warn("Already running at local endpoint: {}", localEndpoint);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800167 return;
168 }
169 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700170 this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800171 initEventLoopGroup();
172 startAcceptingConnections();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700173 timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
174 groupedThreads("NettyMessagingEvt", "timeout", log));
175 timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
176 this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800177 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700178 log.info("Started");
179 }
180
181 @Deactivate
Ray Milkey986a47a2018-01-25 11:38:51 -0800182 public void deactivate() {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800183 if (started.get()) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800184 serverGroup.shutdownGracefully();
185 clientGroup.shutdownGracefully();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700186 timeoutFuture.cancel(false);
187 timeoutExecutor.shutdown();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800188 started.set(false);
189 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700190 log.info("Stopped");
191 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900192
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800193 private void getTlsParameters() {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700194 // default is TLS enabled unless key stores cannot be loaded
195 enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
196
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800197 if (enableNettyTls) {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700198 enableNettyTls = loadKeyStores();
199 }
200 }
201
202 private boolean loadKeyStores() {
203 // Maintain a local copy of the trust and key managers in case anything goes wrong
204 TrustManagerFactory tmf;
205 KeyManagerFactory kmf;
206 try {
207 String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
208 String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
209 char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
210 char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
211
212 tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
213 KeyStore ts = KeyStore.getInstance("JKS");
214 ts.load(new FileInputStream(tsLocation), tsPwd);
215 tmf.init(ts);
216
217 kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
218 KeyStore ks = KeyStore.getInstance("JKS");
219 ks.load(new FileInputStream(ksLocation), ksPwd);
220 kmf.init(ks, ksPwd);
221 if (log.isInfoEnabled()) {
222 logKeyStore(ks, ksLocation, ksPwd);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900223 }
Brian O'Connor740e98c2017-06-29 17:07:17 -0700224 } catch (FileNotFoundException e) {
225 log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
226 return TLS_DISABLED;
227 } catch (Exception e) {
228 //TODO we might want to catch exceptions more specifically
229 log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
230 return TLS_DISABLED;
231 }
232 this.trustManager = tmf;
233 this.keyManager = kmf;
234 return TLS_ENABLED;
235 }
236
237 private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
238 if (log.isInfoEnabled()) {
239 log.info("Loaded cluster key store from: {}", ksLocation);
240 try {
241 for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
242 String alias = e.nextElement();
243 Key key = ks.getKey(alias, ksPwd);
244 Certificate[] certs = ks.getCertificateChain(alias);
245 log.debug("{} -> {}", alias, certs);
246 final byte[] encodedKey;
247 if (certs != null && certs.length > 0) {
248 encodedKey = certs[0].getEncoded();
249 } else {
250 log.info("Could not find cert chain for {}, using fingerprint of key instead...", alias);
251 encodedKey = key.getEncoded();
252 }
253 // Compute the certificate's fingerprint (use the key if certificate cannot be found)
254 MessageDigest digest = MessageDigest.getInstance("SHA1");
255 digest.update(encodedKey);
256 StringJoiner fingerprint = new StringJoiner(":");
257 for (byte b : digest.digest()) {
258 fingerprint.add(String.format("%02X", b));
259 }
260 log.info("{} -> {}", alias, fingerprint);
261 }
262 } catch (Exception e) {
263 log.warn("Unable to print contents of key store: {}", ksLocation, e);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900264 }
265 }
266 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700267
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800268 private void initEventLoopGroup() {
269 // try Epoll first and if that does work, use nio.
270 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700271 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
272 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800273 serverChannelClass = EpollServerSocketChannel.class;
274 clientChannelClass = EpollSocketChannel.class;
275 return;
276 } catch (Throwable e) {
277 log.debug("Failed to initialize native (epoll) transport. "
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700278 + "Reason: {}. Proceeding with nio.", e.getMessage());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800279 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700280 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
281 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800282 serverChannelClass = NioServerSocketChannel.class;
283 clientChannelClass = NioSocketChannel.class;
284 }
285
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700286 /**
287 * Times out response callbacks.
288 */
289 private void timeoutAllCallbacks() {
290 // Iterate through all connections and time out callbacks.
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800291 localClientConnection.timeoutCallbacks();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700292 for (RemoteClientConnection connection : clientConnections.values()) {
293 connection.timeoutCallbacks();
294 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700295 }
296
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800297 @Override
298 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900299 checkPermission(CLUSTER_WRITE);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700300 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700301 clockService.timeNow(),
302 messageIdGenerator.incrementAndGet(),
303 localEndpoint,
304 type,
305 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700306 return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800307 }
308
309 @Override
310 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900311 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800312 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
313 }
314
315 @Override
316 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900317 checkPermission(CLUSTER_WRITE);
Jordan Halterman5ceb3892017-08-28 15:35:03 -0700318 long messageId = messageIdGenerator.incrementAndGet();
Jordan Haltermane3813a92017-07-29 14:10:31 -0700319 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700320 clockService.timeNow(),
321 messageId,
322 localEndpoint,
323 type,
324 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700325 return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700326 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700327
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700328 private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
329 return channels.computeIfAbsent(endpoint, e -> {
330 List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
331 for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
332 defaultList.add(null);
333 }
334 return Lists.newCopyOnWriteArrayList(defaultList);
335 });
336 }
337
338 private int getChannelOffset(String messageType) {
339 return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
340 }
341
Ray Milkey4f350762018-01-23 23:32:03 +0000342 private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
Jordan Halterman83b1d932018-01-13 14:10:56 -0800343 List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
Ray Milkey4f350762018-01-23 23:32:03 +0000344 int offset = getChannelOffset(messageType);
Jordan Halterman83b1d932018-01-13 14:10:56 -0800345
Jordan Halterman83b1d932018-01-13 14:10:56 -0800346 CompletableFuture<Channel> channelFuture = channelPool.get(offset);
347 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
348 synchronized (channelPool) {
349 channelFuture = channelPool.get(offset);
350 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
351 channelFuture = openChannel(endpoint);
352 channelPool.set(offset, channelFuture);
353 }
354 }
355 }
356
Ray Milkey4f350762018-01-23 23:32:03 +0000357 CompletableFuture<Channel> future = new CompletableFuture<>();
Jordan Halterman83b1d932018-01-13 14:10:56 -0800358 final CompletableFuture<Channel> finalFuture = channelFuture;
359 finalFuture.whenComplete((channel, error) -> {
360 if (error == null) {
361 if (!channel.isActive()) {
362 synchronized (channelPool) {
363 CompletableFuture<Channel> currentFuture = channelPool.get(offset);
364 if (currentFuture == finalFuture) {
365 channelPool.set(offset, null);
Ray Milkey4f350762018-01-23 23:32:03 +0000366 getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
367 if (recursiveError == null) {
368 future.complete(recursiveResult);
369 } else {
370 future.completeExceptionally(recursiveError);
371 }
372 });
Jordan Halterman83b1d932018-01-13 14:10:56 -0800373 } else {
374 currentFuture.whenComplete((recursiveResult, recursiveError) -> {
375 if (recursiveError == null) {
Ray Milkey4f350762018-01-23 23:32:03 +0000376 future.complete(recursiveResult);
Jordan Halterman83b1d932018-01-13 14:10:56 -0800377 } else {
378 future.completeExceptionally(recursiveError);
379 }
380 });
381 }
382 }
383 } else {
Ray Milkey4f350762018-01-23 23:32:03 +0000384 future.complete(channel);
Jordan Halterman83b1d932018-01-13 14:10:56 -0800385 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700386 } else {
Jordan Halterman83b1d932018-01-13 14:10:56 -0800387 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800388 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700389 });
Ray Milkey4f350762018-01-23 23:32:03 +0000390 return future;
391 }
392
393 private <T> CompletableFuture<T> executeOnPooledConnection(
394 Endpoint endpoint,
395 String type,
396 Function<ClientConnection, CompletableFuture<T>> callback,
397 Executor executor) {
398 CompletableFuture<T> future = new CompletableFuture<T>();
399 executeOnPooledConnection(endpoint, type, callback, executor, future);
400 return future;
401 }
402
403 private <T> void executeOnPooledConnection(
404 Endpoint endpoint,
405 String type,
406 Function<ClientConnection, CompletableFuture<T>> callback,
407 Executor executor,
408 CompletableFuture<T> future) {
409 if (endpoint.equals(localEndpoint)) {
410 callback.apply(localClientConnection).whenComplete((result, error) -> {
411 if (error == null) {
412 executor.execute(() -> future.complete(result));
413 } else {
414 executor.execute(() -> future.completeExceptionally(error));
415 }
416 });
417 return;
418 }
419
420 getChannel(endpoint, type).whenComplete((channel, channelError) -> {
421 if (channelError == null) {
422 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
423 callback.apply(connection).whenComplete((result, sendError) -> {
424 if (sendError == null) {
425 executor.execute(() -> future.complete(result));
426 } else {
427 executor.execute(() -> future.completeExceptionally(sendError));
428 }
429 });
430 } else {
431 executor.execute(() -> future.completeExceptionally(channelError));
432 }
433 });
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800434 }
435
436 @Override
437 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900438 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700439 handlers.put(type, (message, connection) -> executor.execute(() ->
440 handler.accept(message.sender(), message.payload())));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800441 }
442
443 @Override
444 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900445 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700446 handlers.put(type, (message, connection) -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800447 byte[] responsePayload = null;
Jordan Haltermane3813a92017-07-29 14:10:31 -0700448 InternalReply.Status status = InternalReply.Status.OK;
Madan Jampania9e70a62016-03-02 16:28:18 -0800449 try {
450 responsePayload = handler.apply(message.sender(), message.payload());
451 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700452 log.debug("An error occurred in a message handler: {}", e);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700453 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800454 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700455 connection.reply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800456 }));
457 }
458
459 @Override
460 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900461 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700462 handlers.put(type, (message, connection) -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800463 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700464 InternalReply.Status status;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700465 if (error == null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700466 status = InternalReply.Status.OK;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700467 } else {
468 log.debug("An error occurred in a message handler: {}", error);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700469 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700470 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700471 connection.reply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800472 });
473 });
474 }
475
476 @Override
477 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900478 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800479 handlers.remove(type);
480 }
481
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700482 private Bootstrap bootstrapClient(Endpoint endpoint) {
483 Bootstrap bootstrap = new Bootstrap();
484 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
485 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
486 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
487 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
488 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
489 bootstrap.group(clientGroup);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700490 bootstrap.channel(clientChannelClass);
491 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
492 bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700493 if (enableNettyTls) {
494 bootstrap.handler(new SslClientCommunicationChannelInitializer());
495 } else {
496 bootstrap.handler(new BasicChannelInitializer());
497 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700498 return bootstrap;
499 }
500
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800501 private void startAcceptingConnections() throws InterruptedException {
502 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800503 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700504 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800505 b.option(ChannelOption.SO_RCVBUF, 1048576);
Jordan Halterman153dbd52017-12-21 11:08:19 -0800506 b.childOption(ChannelOption.SO_KEEPALIVE, true);
507 b.childOption(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700508 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800509 b.group(serverGroup, clientGroup);
510 b.channel(serverChannelClass);
511 if (enableNettyTls) {
512 b.childHandler(new SslServerCommunicationChannelInitializer());
513 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700514 b.childHandler(new BasicChannelInitializer());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800515 }
516 b.option(ChannelOption.SO_BACKLOG, 128);
517 b.childOption(ChannelOption.SO_KEEPALIVE, true);
518
519 // Bind and start to accept incoming connections.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700520 b.bind(localEndpoint.port()).sync().addListener(future -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800521 if (future.isSuccess()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700522 log.info("{} accepting incoming connections on port {}",
523 localEndpoint.host(), localEndpoint.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800524 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700525 log.warn("{} failed to bind to port {} due to {}",
526 localEndpoint.host(), localEndpoint.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800527 }
528 });
529 }
530
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700531 private CompletableFuture<Channel> openChannel(Endpoint ep) {
532 Bootstrap bootstrap = bootstrapClient(ep);
533 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
534 ChannelFuture f = bootstrap.connect();
535
536 f.addListener(future -> {
537 if (future.isSuccess()) {
538 retFuture.complete(f.channel());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800539 } else {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700540 retFuture.completeExceptionally(future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800541 }
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700542 });
543 log.debug("Established a new connection to {}", ep);
544 return retFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800545 }
546
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700547 /**
548 * Channel initializer for TLS servers.
549 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800550 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800551 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800552
553 @Override
554 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800555 SSLContext serverContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700556 serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800557
558 SSLEngine serverSslEngine = serverContext.createSSLEngine();
559
560 serverSslEngine.setNeedClientAuth(true);
561 serverSslEngine.setUseClientMode(false);
562 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
563 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
564 serverSslEngine.setEnableSessionCreation(true);
565
566 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700567 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700568 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800569 .addLast("handler", dispatcher);
570 }
571 }
572
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700573 /**
574 * Channel initializer for TLS clients.
575 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800576 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800577 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800578
579 @Override
580 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800581 SSLContext clientContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700582 clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800583
584 SSLEngine clientSslEngine = clientContext.createSSLEngine();
585
586 clientSslEngine.setUseClientMode(true);
587 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
588 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
589 clientSslEngine.setEnableSessionCreation(true);
590
591 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700592 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700593 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800594 .addLast("handler", dispatcher);
595 }
596 }
597
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700598 /**
599 * Channel initializer for basic connections.
600 */
601 private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800602 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800603
604 @Override
605 protected void initChannel(SocketChannel channel) throws Exception {
606 channel.pipeline()
Jordan Haltermane3813a92017-07-29 14:10:31 -0700607 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700608 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800609 .addLast("handler", dispatcher);
610 }
611 }
612
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700613 /**
614 * Channel inbound handler that dispatches messages to the appropriate handler.
615 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800616 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700617 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700618 // Effectively SimpleChannelInboundHandler<InternalMessage>,
619 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800620
621 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700622 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
623 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800624 try {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700625 if (message.isRequest()) {
626 RemoteServerConnection connection =
627 serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700628 connection.dispatch((InternalRequest) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700629 } else {
630 RemoteClientConnection connection =
631 clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700632 connection.dispatch((InternalReply) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700633 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800634 } catch (RejectedExecutionException e) {
635 log.warn("Unable to dispatch message due to {}", e.getMessage());
636 }
637 }
638
639 @Override
640 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
641 log.error("Exception inside channel handling pipeline.", cause);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700642
643 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
644 if (clientConnection != null) {
645 clientConnection.close();
646 }
647
648 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
649 if (serverConnection != null) {
650 serverConnection.close();
651 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800652 context.close();
653 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700654
655 /**
656 * Returns true if the given message should be handled.
657 *
658 * @param msg inbound message
659 * @return true if {@code msg} is {@link InternalMessage} instance.
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700660 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
661 */
662 @Override
663 public final boolean acceptInboundMessage(Object msg) {
664 return msg instanceof InternalMessage;
665 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800666 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700667
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700668 /**
669 * Wraps a {@link CompletableFuture} and tracks its type and creation time.
670 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800671 private final class Callback {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700672 private final String type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800673 private final CompletableFuture<byte[]> future;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700674 private final long time = System.currentTimeMillis();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800675
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700676 Callback(String type, CompletableFuture<byte[]> future) {
677 this.type = type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800678 this.future = future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800679 }
680
681 public void complete(byte[] value) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700682 future.complete(value);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800683 }
684
685 public void completeExceptionally(Throwable error) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700686 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800687 }
688 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800689
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700690 /**
691 * Represents the client side of a connection to a local or remote server.
692 */
693 private interface ClientConnection {
694
695 /**
696 * Sends a message to the other side of the connection.
697 *
698 * @param message the message to send
699 * @return a completable future to be completed once the message has been sent
700 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700701 CompletableFuture<Void> sendAsync(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700702
703 /**
704 * Sends a message to the other side of the connection, awaiting a reply.
705 *
706 * @param message the message to send
707 * @return a completable future to be completed once a reply is received or the request times out
708 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700709 CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700710
711 /**
712 * Closes the connection.
713 */
714 default void close() {
715 }
716 }
717
718 /**
719 * Represents the server side of a connection.
720 */
721 private interface ServerConnection {
722
723 /**
724 * Sends a reply to the other side of the connection.
725 *
726 * @param message the message to which to reply
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800727 * @param status the reply status
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700728 * @param payload the response payload
729 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700730 void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700731
732 /**
733 * Closes the connection.
734 */
735 default void close() {
736 }
737 }
738
739 /**
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800740 * Remote connection implementation.
741 */
742 private abstract class AbstractClientConnection implements ClientConnection {
743 private final Map<Long, Callback> futures = Maps.newConcurrentMap();
744 private final AtomicBoolean closed = new AtomicBoolean(false);
Jordan Haltermanef92f192017-12-21 11:59:38 -0800745 private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800746 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
747 .build();
748
749 /**
750 * Times out callbacks for this connection.
751 */
Jordan Haltermanef92f192017-12-21 11:59:38 -0800752 void timeoutCallbacks() {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800753 // Store the current time.
754 long currentTime = System.currentTimeMillis();
755
756 // Iterate through future callbacks and time out callbacks that have been alive
757 // longer than the current timeout according to the message type.
758 Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
759 while (iterator.hasNext()) {
760 Callback callback = iterator.next().getValue();
761 try {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800762 RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
763 long elapsedTime = currentTime - callback.time;
Jordan Halterman111aab72018-01-12 16:28:57 -0800764 if (elapsedTime > MAX_TIMEOUT_MILLIS ||
765 (elapsedTime > MIN_TIMEOUT_MILLIS && requestMonitor.isTimedOut(elapsedTime))) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800766 iterator.remove();
Jordan Haltermanef92f192017-12-21 11:59:38 -0800767 requestMonitor.addReplyTime(elapsedTime);
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800768 callback.completeExceptionally(
769 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
770 }
771 } catch (ExecutionException e) {
772 throw new AssertionError();
773 }
774 }
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800775 }
776
777 protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
778 futures.put(id, new Callback(subject, future));
779 }
780
781 protected Callback completeCallback(long id) {
782 Callback callback = futures.remove(id);
783 if (callback != null) {
784 try {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800785 RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
786 requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800787 } catch (ExecutionException e) {
788 throw new AssertionError();
789 }
790 }
791 return callback;
792 }
793
794 protected Callback failCallback(long id) {
795 return futures.remove(id);
796 }
797
798 @Override
799 public void close() {
800 if (closed.compareAndSet(false, true)) {
801 timeoutFuture.cancel(false);
802 for (Callback callback : futures.values()) {
803 callback.completeExceptionally(new ConnectException());
804 }
805 }
806 }
807 }
808
809 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700810 * Local connection implementation.
811 */
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800812 private final class LocalClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700813 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700814 public CompletableFuture<Void> sendAsync(InternalRequest message) {
815 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700816 if (handler != null) {
817 handler.accept(message, localServerConnection);
818 } else {
819 log.debug("No handler for message type {} from {}", message.type(), message.sender());
820 }
821 return CompletableFuture.completedFuture(null);
822 }
823
824 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700825 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700826 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800827 future.whenComplete((r, e) -> completeCallback(message.id()));
828 registerCallback(message.id(), message.subject(), future);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700829 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700830 if (handler != null) {
831 handler.accept(message, new LocalServerConnection(future));
832 } else {
833 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700834 new LocalServerConnection(future)
835 .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700836 }
837 return future;
838 }
839 }
840
841 /**
842 * Local server connection.
843 */
844 private final class LocalServerConnection implements ServerConnection {
845 private final CompletableFuture<byte[]> future;
846
847 LocalServerConnection(CompletableFuture<byte[]> future) {
848 this.future = future;
849 }
850
851 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700852 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700853 if (future != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700854 if (status == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700855 future.complete(payload.orElse(EMPTY_PAYLOAD));
Jordan Haltermane3813a92017-07-29 14:10:31 -0700856 } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700857 future.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700858 } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700859 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700860 } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700861 future.completeExceptionally(new MessagingException.ProtocolException());
862 }
863 }
864 }
865 }
866
867 /**
868 * Remote connection implementation.
869 */
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800870 private final class RemoteClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700871 private final Channel channel;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700872
873 RemoteClientConnection(Channel channel) {
874 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800875 }
876
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700877 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700878 public CompletableFuture<Void> sendAsync(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700879 CompletableFuture<Void> future = new CompletableFuture<>();
880 channel.writeAndFlush(message).addListener(channelFuture -> {
881 if (!channelFuture.isSuccess()) {
882 future.completeExceptionally(channelFuture.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800883 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700884 future.complete(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800885 }
886 });
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700887 return future;
888 }
889
890 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700891 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700892 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800893 registerCallback(message.id(), message.subject(), future);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700894 channel.writeAndFlush(message).addListener(channelFuture -> {
895 if (!channelFuture.isSuccess()) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800896 Callback callback = failCallback(message.id());
897 if (callback != null) {
898 callback.completeExceptionally(channelFuture.cause());
899 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700900 }
901 });
902 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800903 }
904
905 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700906 * Dispatches a message to a local handler.
907 *
908 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800909 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700910 private void dispatch(InternalReply message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700911 if (message.preamble() != preamble) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700912 log.debug("Received {} with invalid preamble", message.type());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700913 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800914 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700915
916 clockService.recordEventTime(message.time());
917
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800918 Callback callback = completeCallback(message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700919 if (callback != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700920 if (message.status() == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700921 callback.complete(message.payload());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700922 } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700923 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700924 } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700925 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700926 } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700927 callback.completeExceptionally(new MessagingException.ProtocolException());
928 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700929 } else {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700930 log.debug("Received a reply for message id:[{}] "
931 + "but was unable to locate the"
932 + " request handle", message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700933 }
934 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700935 }
936
937 /**
938 * Remote server connection.
939 */
940 private final class RemoteServerConnection implements ServerConnection {
941 private final Channel channel;
942
943 RemoteServerConnection(Channel channel) {
944 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800945 }
946
947 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700948 * Dispatches a message to a local handler.
949 *
950 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800951 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700952 private void dispatch(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700953 if (message.preamble() != preamble) {
954 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700955 reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700956 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800957 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700958
959 clockService.recordEventTime(message.time());
960
Jordan Haltermane3813a92017-07-29 14:10:31 -0700961 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700962 if (handler != null) {
963 handler.accept(message, this);
964 } else {
965 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700966 reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700967 }
968 }
969
970 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700971 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
972 InternalReply response = new InternalReply(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700973 clockService.timeNow(),
974 message.id(),
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700975 payload.orElse(EMPTY_PAYLOAD),
976 status);
977 channel.writeAndFlush(response);
978 }
979 }
980
981 /**
982 * Request-reply timeout history tracker.
983 */
Jordan Haltermanef92f192017-12-21 11:59:38 -0800984 private static final class RequestMonitor {
985 private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700986
987 /**
988 * Adds a reply time to the history.
989 *
990 * @param replyTime the reply time to add to the history
991 */
992 void addReplyTime(long replyTime) {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800993 samples.addValue(replyTime);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700994 }
995
996 /**
Jordan Haltermanef92f192017-12-21 11:59:38 -0800997 * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
998 *
999 * @param elapsedTime the elapsed request time
1000 * @return indicates whether the request should be timed out
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001001 */
Jordan Haltermanef92f192017-12-21 11:59:38 -08001002 boolean isTimedOut(long elapsedTime) {
1003 return phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
1004 }
1005
1006 /**
1007 * Compute phi for the specified node id.
1008 *
1009 * @param elapsedTime the duration since the request was sent
1010 * @return phi value
1011 */
1012 private double phi(long elapsedTime) {
1013 if (samples.getN() < MIN_SAMPLES) {
1014 return 0.0;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001015 }
Jordan Haltermanef92f192017-12-21 11:59:38 -08001016 return computePhi(samples, elapsedTime);
1017 }
1018
1019 /**
1020 * Computes the phi value from the given samples.
1021 *
1022 * @param samples the samples from which to compute phi
1023 * @param elapsedTime the duration since the request was sent
1024 * @return phi
1025 */
1026 private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
1027 return samples.getN() > 0 ? PHI_FACTOR * elapsedTime / samples.getMean() : 100;
Aaron Kruglikov1b727382016-02-09 16:17:47 -08001028 }
1029 }
JunHuy Lam39eb4292015-06-26 17:24:23 +09001030}