blob: c98c72c967507ed66a28b444249a9bc64fac4399 [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import javax.net.ssl.KeyManagerFactory;
19import javax.net.ssl.SSLContext;
20import javax.net.ssl.SSLEngine;
21import javax.net.ssl.TrustManagerFactory;
Brian O'Connor740e98c2017-06-29 17:07:17 -070022import java.io.File;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080023import java.io.FileInputStream;
Brian O'Connor740e98c2017-06-29 17:07:17 -070024import java.io.FileNotFoundException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070025import java.net.ConnectException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080026import java.security.KeyStore;
Brian O'Connor740e98c2017-06-29 17:07:17 -070027import java.security.MessageDigest;
Brian O'Connorf69e3e32018-05-10 02:25:09 -070028import java.security.PublicKey;
Brian O'Connor740e98c2017-06-29 17:07:17 -070029import java.security.cert.Certificate;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070030import java.time.Duration;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070031import java.util.ArrayList;
Brian O'Connor740e98c2017-06-29 17:07:17 -070032import java.util.Enumeration;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070033import java.util.Iterator;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070034import java.util.List;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080035import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080036import java.util.Optional;
Brian O'Connor740e98c2017-06-29 17:07:17 -070037import java.util.StringJoiner;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080038import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070040import java.util.concurrent.ExecutionException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080041import java.util.concurrent.Executor;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070042import java.util.concurrent.Executors;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080043import java.util.concurrent.RejectedExecutionException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070044import java.util.concurrent.ScheduledExecutorService;
45import java.util.concurrent.ScheduledFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080046import java.util.concurrent.TimeUnit;
47import java.util.concurrent.TimeoutException;
48import java.util.concurrent.atomic.AtomicBoolean;
49import java.util.concurrent.atomic.AtomicLong;
50import java.util.function.BiConsumer;
51import java.util.function.BiFunction;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070052import java.util.function.Function;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080053
Jordan Halterman23e73c52018-01-13 14:10:56 -080054import com.google.common.base.Throwables;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070055import com.google.common.cache.Cache;
56import com.google.common.cache.CacheBuilder;
57import com.google.common.collect.Lists;
58import com.google.common.collect.Maps;
59import com.google.common.util.concurrent.MoreExecutors;
60import io.netty.bootstrap.Bootstrap;
61import io.netty.bootstrap.ServerBootstrap;
62import io.netty.buffer.PooledByteBufAllocator;
63import io.netty.channel.Channel;
64import io.netty.channel.ChannelFuture;
65import io.netty.channel.ChannelHandler;
66import io.netty.channel.ChannelHandlerContext;
67import io.netty.channel.ChannelInitializer;
68import io.netty.channel.ChannelOption;
69import io.netty.channel.EventLoopGroup;
70import io.netty.channel.ServerChannel;
71import io.netty.channel.SimpleChannelInboundHandler;
72import io.netty.channel.WriteBufferWaterMark;
73import io.netty.channel.epoll.EpollEventLoopGroup;
74import io.netty.channel.epoll.EpollServerSocketChannel;
75import io.netty.channel.epoll.EpollSocketChannel;
76import io.netty.channel.nio.NioEventLoopGroup;
77import io.netty.channel.socket.SocketChannel;
78import io.netty.channel.socket.nio.NioServerSocketChannel;
79import io.netty.channel.socket.nio.NioSocketChannel;
80import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
81import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
82import org.apache.felix.scr.annotations.Activate;
83import org.apache.felix.scr.annotations.Component;
84import org.apache.felix.scr.annotations.Deactivate;
85import org.apache.felix.scr.annotations.Reference;
86import org.apache.felix.scr.annotations.ReferenceCardinality;
87import org.apache.felix.scr.annotations.Service;
88import org.onosproject.cluster.ClusterMetadataService;
89import org.onosproject.cluster.ControllerNode;
90import org.onosproject.core.HybridLogicalClockService;
91import org.onosproject.store.cluster.messaging.Endpoint;
92import org.onosproject.store.cluster.messaging.MessagingException;
93import org.onosproject.store.cluster.messaging.MessagingService;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070094import org.slf4j.Logger;
95import org.slf4j.LoggerFactory;
96
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070097import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090098import static org.onosproject.security.AppGuard.checkPermission;
99import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
100
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700101/**
102 * Netty based MessagingService.
103 */
Jordan Halterman00e92da2018-05-22 23:05:52 -0700104@Component(enabled = false)
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700105@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800106public class NettyMessagingManager implements MessagingService {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700107 private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700108 private static final long TIMEOUT_INTERVAL = 50;
Jordan Halterman05299082018-05-14 14:45:20 -0700109 private static final int WINDOW_SIZE = 60;
110 private static final int WINDOW_UPDATE_SAMPLE_SIZE = 100;
111 private static final long WINDOW_UPDATE_MILLIS = 10000;
Jordan Haltermanef92f192017-12-21 11:59:38 -0800112 private static final int MIN_SAMPLES = 25;
Jordan Halterman05299082018-05-14 14:45:20 -0700113 private static final int MIN_STANDARD_DEVIATION = 100;
114 private static final int PHI_FAILURE_THRESHOLD = 12;
Jordan Halterman111aab72018-01-12 16:28:57 -0800115 private static final long MIN_TIMEOUT_MILLIS = 100;
Jordan Halterman05299082018-05-14 14:45:20 -0700116 private static final long MAX_TIMEOUT_MILLIS = 5000;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700117 private static final int CHANNEL_POOL_SIZE = 8;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700118
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700119 private static final byte[] EMPTY_PAYLOAD = new byte[0];
120
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700121 private final Logger log = LoggerFactory.getLogger(getClass());
122
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800123 private final LocalClientConnection localClientConnection = new LocalClientConnection();
124 private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800125
Brian O'Connor740e98c2017-06-29 17:07:17 -0700126 //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
127 private static final String CONFIG_DIR = "../config";
128 private static final String KS_FILE_NAME = "onos.jks";
129 private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
130 private static final String DEFAULT_KS_PASSWORD = "changeit";
131
Madan Jampani05833872016-07-12 23:01:39 -0700132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected HybridLogicalClockService clockService;
134
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700135 private Endpoint localEndpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800136 private int preamble;
137 private final AtomicBoolean started = new AtomicBoolean(false);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700138 private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700139 private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
140 private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800141 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800142
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700143 private ScheduledFuture<?> timeoutFuture;
144
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700145 private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800146
147 private EventLoopGroup serverGroup;
148 private EventLoopGroup clientGroup;
149 private Class<? extends ServerChannel> serverChannelClass;
150 private Class<? extends Channel> clientChannelClass;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700151 private ScheduledExecutorService timeoutExecutor;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800152
Brian O'Connor740e98c2017-06-29 17:07:17 -0700153 protected static final boolean TLS_ENABLED = true;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800154 protected static final boolean TLS_DISABLED = false;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700155 protected boolean enableNettyTls = TLS_ENABLED;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800156
Brian O'Connor740e98c2017-06-29 17:07:17 -0700157 protected TrustManagerFactory trustManager;
158 protected KeyManagerFactory keyManager;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900159
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700160 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700161 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700162
163 @Activate
Ray Milkey986a47a2018-01-25 11:38:51 -0800164 public void activate() throws InterruptedException {
Madan Jampaniec1df022015-10-13 21:23:03 -0700165 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800166 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800167
168 if (started.get()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700169 log.warn("Already running at local endpoint: {}", localEndpoint);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800170 return;
171 }
172 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700173 this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800174 initEventLoopGroup();
175 startAcceptingConnections();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700176 timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
177 groupedThreads("NettyMessagingEvt", "timeout", log));
178 timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
179 this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800180 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700181 log.info("Started");
182 }
183
184 @Deactivate
Ray Milkey986a47a2018-01-25 11:38:51 -0800185 public void deactivate() {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800186 if (started.get()) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800187 serverGroup.shutdownGracefully();
188 clientGroup.shutdownGracefully();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700189 timeoutFuture.cancel(false);
190 timeoutExecutor.shutdown();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800191 started.set(false);
192 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700193 log.info("Stopped");
194 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900195
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800196 private void getTlsParameters() {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700197 // default is TLS enabled unless key stores cannot be loaded
198 enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
199
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800200 if (enableNettyTls) {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700201 enableNettyTls = loadKeyStores();
202 }
203 }
204
205 private boolean loadKeyStores() {
206 // Maintain a local copy of the trust and key managers in case anything goes wrong
207 TrustManagerFactory tmf;
208 KeyManagerFactory kmf;
209 try {
210 String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
211 String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
212 char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
213 char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
214
215 tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
216 KeyStore ts = KeyStore.getInstance("JKS");
217 ts.load(new FileInputStream(tsLocation), tsPwd);
218 tmf.init(ts);
219
220 kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
221 KeyStore ks = KeyStore.getInstance("JKS");
222 ks.load(new FileInputStream(ksLocation), ksPwd);
223 kmf.init(ks, ksPwd);
224 if (log.isInfoEnabled()) {
225 logKeyStore(ks, ksLocation, ksPwd);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900226 }
Brian O'Connor740e98c2017-06-29 17:07:17 -0700227 } catch (FileNotFoundException e) {
228 log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
229 return TLS_DISABLED;
230 } catch (Exception e) {
231 //TODO we might want to catch exceptions more specifically
232 log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
233 return TLS_DISABLED;
234 }
235 this.trustManager = tmf;
236 this.keyManager = kmf;
237 return TLS_ENABLED;
238 }
239
240 private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
241 if (log.isInfoEnabled()) {
242 log.info("Loaded cluster key store from: {}", ksLocation);
243 try {
244 for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
245 String alias = e.nextElement();
Brian O'Connorf69e3e32018-05-10 02:25:09 -0700246 Certificate cert = ks.getCertificate(alias);
247 if (cert == null) {
248 log.info("No certificate for alias {}", alias);
249 continue;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700250 }
Brian O'Connorf69e3e32018-05-10 02:25:09 -0700251 PublicKey key = cert.getPublicKey();
Brian O'Connor740e98c2017-06-29 17:07:17 -0700252 // Compute the certificate's fingerprint (use the key if certificate cannot be found)
253 MessageDigest digest = MessageDigest.getInstance("SHA1");
Brian O'Connorf69e3e32018-05-10 02:25:09 -0700254 digest.update(key.getEncoded());
Brian O'Connor740e98c2017-06-29 17:07:17 -0700255 StringJoiner fingerprint = new StringJoiner(":");
256 for (byte b : digest.digest()) {
257 fingerprint.add(String.format("%02X", b));
258 }
259 log.info("{} -> {}", alias, fingerprint);
260 }
261 } catch (Exception e) {
262 log.warn("Unable to print contents of key store: {}", ksLocation, e);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900263 }
264 }
265 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700266
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800267 private void initEventLoopGroup() {
268 // try Epoll first and if that does work, use nio.
269 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700270 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
271 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800272 serverChannelClass = EpollServerSocketChannel.class;
273 clientChannelClass = EpollSocketChannel.class;
274 return;
275 } catch (Throwable e) {
276 log.debug("Failed to initialize native (epoll) transport. "
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700277 + "Reason: {}. Proceeding with nio.", e.getMessage());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800278 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700279 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
280 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800281 serverChannelClass = NioServerSocketChannel.class;
282 clientChannelClass = NioSocketChannel.class;
283 }
284
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700285 /**
286 * Times out response callbacks.
287 */
288 private void timeoutAllCallbacks() {
289 // Iterate through all connections and time out callbacks.
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800290 localClientConnection.timeoutCallbacks();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700291 for (RemoteClientConnection connection : clientConnections.values()) {
292 connection.timeoutCallbacks();
293 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700294 }
295
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800296 @Override
297 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900298 checkPermission(CLUSTER_WRITE);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700299 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700300 clockService.timeNow(),
301 messageIdGenerator.incrementAndGet(),
302 localEndpoint,
303 type,
304 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700305 return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800306 }
307
308 @Override
309 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900310 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800311 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
312 }
313
314 @Override
315 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900316 checkPermission(CLUSTER_WRITE);
Jordan Halterman5ceb3892017-08-28 15:35:03 -0700317 long messageId = messageIdGenerator.incrementAndGet();
Jordan Haltermane3813a92017-07-29 14:10:31 -0700318 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700319 clockService.timeNow(),
320 messageId,
321 localEndpoint,
322 type,
323 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700324 return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700325 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700326
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700327 private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
328 return channels.computeIfAbsent(endpoint, e -> {
329 List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
330 for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
331 defaultList.add(null);
332 }
333 return Lists.newCopyOnWriteArrayList(defaultList);
334 });
335 }
336
337 private int getChannelOffset(String messageType) {
338 return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
339 }
340
Jordan Halterman94db1912018-02-08 14:45:20 -0800341 private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
Jordan Halterman23e73c52018-01-13 14:10:56 -0800342 List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
Jordan Halterman94db1912018-02-08 14:45:20 -0800343 int offset = getChannelOffset(messageType);
Jordan Halterman23e73c52018-01-13 14:10:56 -0800344
Jordan Halterman23e73c52018-01-13 14:10:56 -0800345 CompletableFuture<Channel> channelFuture = channelPool.get(offset);
346 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
347 synchronized (channelPool) {
348 channelFuture = channelPool.get(offset);
349 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
350 channelFuture = openChannel(endpoint);
351 channelPool.set(offset, channelFuture);
352 }
353 }
354 }
355
Jordan Halterman94db1912018-02-08 14:45:20 -0800356 CompletableFuture<Channel> future = new CompletableFuture<>();
Jordan Halterman23e73c52018-01-13 14:10:56 -0800357 final CompletableFuture<Channel> finalFuture = channelFuture;
358 finalFuture.whenComplete((channel, error) -> {
359 if (error == null) {
360 if (!channel.isActive()) {
Jordan Halterman94db1912018-02-08 14:45:20 -0800361 CompletableFuture<Channel> currentFuture;
Jordan Halterman23e73c52018-01-13 14:10:56 -0800362 synchronized (channelPool) {
363 currentFuture = channelPool.get(offset);
364 if (currentFuture == finalFuture) {
365 channelPool.set(offset, null);
366 }
367 }
Jordan Halterman94db1912018-02-08 14:45:20 -0800368
369 ClientConnection connection = clientConnections.remove(channel);
370 if (connection != null) {
371 connection.close();
372 }
373
Jordan Halterman23e73c52018-01-13 14:10:56 -0800374 if (currentFuture == finalFuture) {
Jordan Halterman94db1912018-02-08 14:45:20 -0800375 getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
376 if (recursiveError == null) {
377 future.complete(recursiveResult);
378 } else {
379 future.completeExceptionally(recursiveError);
380 }
381 });
Jordan Halterman23e73c52018-01-13 14:10:56 -0800382 } else {
383 currentFuture.whenComplete((recursiveResult, recursiveError) -> {
384 if (recursiveError == null) {
Jordan Halterman94db1912018-02-08 14:45:20 -0800385 future.complete(recursiveResult);
Jordan Halterman23e73c52018-01-13 14:10:56 -0800386 } else {
387 future.completeExceptionally(recursiveError);
388 }
389 });
390 }
391 } else {
Jordan Halterman94db1912018-02-08 14:45:20 -0800392 future.complete(channel);
Jordan Halterman23e73c52018-01-13 14:10:56 -0800393 }
Ray Milkey4f350762018-01-23 23:32:03 +0000394 } else {
Jordan Halterman23e73c52018-01-13 14:10:56 -0800395 future.completeExceptionally(error);
Ray Milkey4f350762018-01-23 23:32:03 +0000396 }
397 });
Jordan Halterman94db1912018-02-08 14:45:20 -0800398 return future;
399 }
400
401 private <T> CompletableFuture<T> executeOnPooledConnection(
402 Endpoint endpoint,
403 String type,
404 Function<ClientConnection, CompletableFuture<T>> callback,
405 Executor executor) {
406 CompletableFuture<T> future = new CompletableFuture<T>();
407 executeOnPooledConnection(endpoint, type, callback, executor, future);
408 return future;
409 }
410
411 private <T> void executeOnPooledConnection(
412 Endpoint endpoint,
413 String type,
414 Function<ClientConnection, CompletableFuture<T>> callback,
415 Executor executor,
416 CompletableFuture<T> future) {
417 if (endpoint.equals(localEndpoint)) {
418 callback.apply(localClientConnection).whenComplete((result, error) -> {
419 if (error == null) {
420 executor.execute(() -> future.complete(result));
421 } else {
422 executor.execute(() -> future.completeExceptionally(error));
423 }
424 });
425 return;
426 }
427
428 getChannel(endpoint, type).whenComplete((channel, channelError) -> {
429 if (channelError == null) {
430 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
431 callback.apply(connection).whenComplete((result, sendError) -> {
432 if (sendError == null) {
433 executor.execute(() -> future.complete(result));
434 } else {
435 Throwable cause = Throwables.getRootCause(sendError);
436 if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
437 channel.close().addListener(f -> {
438 connection.close();
439 clientConnections.remove(channel);
440 });
441 }
442 executor.execute(() -> future.completeExceptionally(sendError));
443 }
444 });
445 } else {
446 executor.execute(() -> future.completeExceptionally(channelError));
447 }
448 });
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800449 }
450
451 @Override
452 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900453 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700454 handlers.put(type, (message, connection) -> executor.execute(() ->
455 handler.accept(message.sender(), message.payload())));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800456 }
457
458 @Override
459 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900460 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700461 handlers.put(type, (message, connection) -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800462 byte[] responsePayload = null;
Jordan Haltermane3813a92017-07-29 14:10:31 -0700463 InternalReply.Status status = InternalReply.Status.OK;
Madan Jampania9e70a62016-03-02 16:28:18 -0800464 try {
465 responsePayload = handler.apply(message.sender(), message.payload());
466 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700467 log.debug("An error occurred in a message handler: {}", e);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700468 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800469 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700470 connection.reply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800471 }));
472 }
473
474 @Override
475 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900476 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700477 handlers.put(type, (message, connection) -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800478 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700479 InternalReply.Status status;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700480 if (error == null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700481 status = InternalReply.Status.OK;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700482 } else {
483 log.debug("An error occurred in a message handler: {}", error);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700484 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700485 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700486 connection.reply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800487 });
488 });
489 }
490
491 @Override
492 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900493 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800494 handlers.remove(type);
495 }
496
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700497 private Bootstrap bootstrapClient(Endpoint endpoint) {
498 Bootstrap bootstrap = new Bootstrap();
499 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
500 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
501 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
502 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
503 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
504 bootstrap.group(clientGroup);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700505 bootstrap.channel(clientChannelClass);
506 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
507 bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700508 if (enableNettyTls) {
509 bootstrap.handler(new SslClientCommunicationChannelInitializer());
510 } else {
511 bootstrap.handler(new BasicChannelInitializer());
512 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700513 return bootstrap;
514 }
515
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800516 private void startAcceptingConnections() throws InterruptedException {
517 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800518 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700519 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800520 b.option(ChannelOption.SO_RCVBUF, 1048576);
Jordan Halterman153dbd52017-12-21 11:08:19 -0800521 b.childOption(ChannelOption.SO_KEEPALIVE, true);
522 b.childOption(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700523 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800524 b.group(serverGroup, clientGroup);
525 b.channel(serverChannelClass);
526 if (enableNettyTls) {
527 b.childHandler(new SslServerCommunicationChannelInitializer());
528 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700529 b.childHandler(new BasicChannelInitializer());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800530 }
531 b.option(ChannelOption.SO_BACKLOG, 128);
532 b.childOption(ChannelOption.SO_KEEPALIVE, true);
533
534 // Bind and start to accept incoming connections.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700535 b.bind(localEndpoint.port()).sync().addListener(future -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800536 if (future.isSuccess()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700537 log.info("{} accepting incoming connections on port {}",
538 localEndpoint.host(), localEndpoint.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800539 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700540 log.warn("{} failed to bind to port {} due to {}",
541 localEndpoint.host(), localEndpoint.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800542 }
543 });
544 }
545
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700546 private CompletableFuture<Channel> openChannel(Endpoint ep) {
547 Bootstrap bootstrap = bootstrapClient(ep);
548 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
549 ChannelFuture f = bootstrap.connect();
550
551 f.addListener(future -> {
552 if (future.isSuccess()) {
553 retFuture.complete(f.channel());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800554 } else {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700555 retFuture.completeExceptionally(future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800556 }
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700557 });
558 log.debug("Established a new connection to {}", ep);
559 return retFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800560 }
561
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700562 /**
563 * Channel initializer for TLS servers.
564 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800565 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800566 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800567
568 @Override
569 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800570 SSLContext serverContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700571 serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800572
573 SSLEngine serverSslEngine = serverContext.createSSLEngine();
574
575 serverSslEngine.setNeedClientAuth(true);
576 serverSslEngine.setUseClientMode(false);
577 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
578 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
579 serverSslEngine.setEnableSessionCreation(true);
580
581 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700582 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700583 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800584 .addLast("handler", dispatcher);
585 }
586 }
587
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700588 /**
589 * Channel initializer for TLS clients.
590 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800591 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800592 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800593
594 @Override
595 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800596 SSLContext clientContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700597 clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800598
599 SSLEngine clientSslEngine = clientContext.createSSLEngine();
600
601 clientSslEngine.setUseClientMode(true);
602 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
603 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
604 clientSslEngine.setEnableSessionCreation(true);
605
606 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700607 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700608 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800609 .addLast("handler", dispatcher);
610 }
611 }
612
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700613 /**
614 * Channel initializer for basic connections.
615 */
616 private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800617 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800618
619 @Override
620 protected void initChannel(SocketChannel channel) throws Exception {
621 channel.pipeline()
Jordan Haltermane3813a92017-07-29 14:10:31 -0700622 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700623 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800624 .addLast("handler", dispatcher);
625 }
626 }
627
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700628 /**
629 * Channel inbound handler that dispatches messages to the appropriate handler.
630 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800631 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700632 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700633 // Effectively SimpleChannelInboundHandler<InternalMessage>,
634 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800635
636 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700637 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
638 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800639 try {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700640 if (message.isRequest()) {
641 RemoteServerConnection connection =
642 serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700643 connection.dispatch((InternalRequest) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700644 } else {
645 RemoteClientConnection connection =
646 clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700647 connection.dispatch((InternalReply) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700648 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800649 } catch (RejectedExecutionException e) {
650 log.warn("Unable to dispatch message due to {}", e.getMessage());
651 }
652 }
653
654 @Override
655 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
656 log.error("Exception inside channel handling pipeline.", cause);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700657
658 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
659 if (clientConnection != null) {
660 clientConnection.close();
661 }
662
663 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
664 if (serverConnection != null) {
665 serverConnection.close();
666 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800667 context.close();
668 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700669
Jordan Halterman23e73c52018-01-13 14:10:56 -0800670 @Override
671 public void channelInactive(ChannelHandlerContext context) throws Exception {
672 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
673 if (clientConnection != null) {
674 clientConnection.close();
675 }
676
677 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
678 if (serverConnection != null) {
679 serverConnection.close();
680 }
681 context.close();
682 }
683
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700684 /**
685 * Returns true if the given message should be handled.
686 *
687 * @param msg inbound message
688 * @return true if {@code msg} is {@link InternalMessage} instance.
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700689 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
690 */
691 @Override
692 public final boolean acceptInboundMessage(Object msg) {
693 return msg instanceof InternalMessage;
694 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800695 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700696
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700697 /**
698 * Wraps a {@link CompletableFuture} and tracks its type and creation time.
699 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800700 private final class Callback {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700701 private final String type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800702 private final CompletableFuture<byte[]> future;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700703 private final long time = System.currentTimeMillis();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800704
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700705 Callback(String type, CompletableFuture<byte[]> future) {
706 this.type = type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800707 this.future = future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800708 }
709
710 public void complete(byte[] value) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700711 future.complete(value);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800712 }
713
714 public void completeExceptionally(Throwable error) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700715 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800716 }
717 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800718
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700719 /**
720 * Represents the client side of a connection to a local or remote server.
721 */
722 private interface ClientConnection {
723
724 /**
725 * Sends a message to the other side of the connection.
726 *
727 * @param message the message to send
728 * @return a completable future to be completed once the message has been sent
729 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700730 CompletableFuture<Void> sendAsync(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700731
732 /**
733 * Sends a message to the other side of the connection, awaiting a reply.
734 *
735 * @param message the message to send
736 * @return a completable future to be completed once a reply is received or the request times out
737 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700738 CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700739
740 /**
741 * Closes the connection.
742 */
743 default void close() {
744 }
745 }
746
747 /**
748 * Represents the server side of a connection.
749 */
750 private interface ServerConnection {
751
752 /**
753 * Sends a reply to the other side of the connection.
754 *
755 * @param message the message to which to reply
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800756 * @param status the reply status
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700757 * @param payload the response payload
758 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700759 void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700760
761 /**
762 * Closes the connection.
763 */
764 default void close() {
765 }
766 }
767
768 /**
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800769 * Remote connection implementation.
770 */
771 private abstract class AbstractClientConnection implements ClientConnection {
772 private final Map<Long, Callback> futures = Maps.newConcurrentMap();
773 private final AtomicBoolean closed = new AtomicBoolean(false);
Jordan Haltermanef92f192017-12-21 11:59:38 -0800774 private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800775 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
776 .build();
777
778 /**
779 * Times out callbacks for this connection.
780 */
Jordan Haltermanef92f192017-12-21 11:59:38 -0800781 void timeoutCallbacks() {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800782 // Store the current time.
783 long currentTime = System.currentTimeMillis();
784
785 // Iterate through future callbacks and time out callbacks that have been alive
786 // longer than the current timeout according to the message type.
787 Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
788 while (iterator.hasNext()) {
789 Callback callback = iterator.next().getValue();
790 try {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800791 RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
792 long elapsedTime = currentTime - callback.time;
Jordan Halterman111aab72018-01-12 16:28:57 -0800793 if (elapsedTime > MAX_TIMEOUT_MILLIS ||
794 (elapsedTime > MIN_TIMEOUT_MILLIS && requestMonitor.isTimedOut(elapsedTime))) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800795 iterator.remove();
Jordan Haltermanef92f192017-12-21 11:59:38 -0800796 requestMonitor.addReplyTime(elapsedTime);
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800797 callback.completeExceptionally(
798 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
799 }
800 } catch (ExecutionException e) {
801 throw new AssertionError();
802 }
803 }
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800804 }
805
806 protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
807 futures.put(id, new Callback(subject, future));
808 }
809
810 protected Callback completeCallback(long id) {
811 Callback callback = futures.remove(id);
812 if (callback != null) {
813 try {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800814 RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
815 requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800816 } catch (ExecutionException e) {
817 throw new AssertionError();
818 }
819 }
820 return callback;
821 }
822
823 protected Callback failCallback(long id) {
824 return futures.remove(id);
825 }
826
827 @Override
828 public void close() {
829 if (closed.compareAndSet(false, true)) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800830 for (Callback callback : futures.values()) {
831 callback.completeExceptionally(new ConnectException());
832 }
833 }
834 }
835 }
836
837 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700838 * Local connection implementation.
839 */
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800840 private final class LocalClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700841 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700842 public CompletableFuture<Void> sendAsync(InternalRequest message) {
843 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700844 if (handler != null) {
845 handler.accept(message, localServerConnection);
846 } else {
847 log.debug("No handler for message type {} from {}", message.type(), message.sender());
848 }
849 return CompletableFuture.completedFuture(null);
850 }
851
852 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700853 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700854 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800855 future.whenComplete((r, e) -> completeCallback(message.id()));
856 registerCallback(message.id(), message.subject(), future);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700857 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700858 if (handler != null) {
859 handler.accept(message, new LocalServerConnection(future));
860 } else {
861 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700862 new LocalServerConnection(future)
863 .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700864 }
865 return future;
866 }
867 }
868
869 /**
870 * Local server connection.
871 */
872 private final class LocalServerConnection implements ServerConnection {
873 private final CompletableFuture<byte[]> future;
874
875 LocalServerConnection(CompletableFuture<byte[]> future) {
876 this.future = future;
877 }
878
879 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700880 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700881 if (future != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700882 if (status == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700883 future.complete(payload.orElse(EMPTY_PAYLOAD));
Jordan Haltermane3813a92017-07-29 14:10:31 -0700884 } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700885 future.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700886 } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700887 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700888 } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700889 future.completeExceptionally(new MessagingException.ProtocolException());
890 }
891 }
892 }
893 }
894
895 /**
896 * Remote connection implementation.
897 */
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800898 private final class RemoteClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700899 private final Channel channel;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700900
901 RemoteClientConnection(Channel channel) {
902 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800903 }
904
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700905 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700906 public CompletableFuture<Void> sendAsync(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700907 CompletableFuture<Void> future = new CompletableFuture<>();
908 channel.writeAndFlush(message).addListener(channelFuture -> {
909 if (!channelFuture.isSuccess()) {
910 future.completeExceptionally(channelFuture.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800911 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700912 future.complete(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800913 }
914 });
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700915 return future;
916 }
917
918 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700919 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700920 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800921 registerCallback(message.id(), message.subject(), future);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700922 channel.writeAndFlush(message).addListener(channelFuture -> {
923 if (!channelFuture.isSuccess()) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800924 Callback callback = failCallback(message.id());
925 if (callback != null) {
926 callback.completeExceptionally(channelFuture.cause());
927 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700928 }
929 });
930 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800931 }
932
933 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700934 * Dispatches a message to a local handler.
935 *
936 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800937 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700938 private void dispatch(InternalReply message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700939 if (message.preamble() != preamble) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700940 log.debug("Received {} with invalid preamble", message.type());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700941 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800942 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700943
944 clockService.recordEventTime(message.time());
945
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800946 Callback callback = completeCallback(message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700947 if (callback != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700948 if (message.status() == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700949 callback.complete(message.payload());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700950 } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700951 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700952 } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700953 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700954 } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700955 callback.completeExceptionally(new MessagingException.ProtocolException());
956 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700957 } else {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700958 log.debug("Received a reply for message id:[{}] "
959 + "but was unable to locate the"
960 + " request handle", message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700961 }
962 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700963 }
964
965 /**
966 * Remote server connection.
967 */
968 private final class RemoteServerConnection implements ServerConnection {
969 private final Channel channel;
970
971 RemoteServerConnection(Channel channel) {
972 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800973 }
974
975 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700976 * Dispatches a message to a local handler.
977 *
978 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800979 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700980 private void dispatch(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700981 if (message.preamble() != preamble) {
982 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700983 reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700984 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800985 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700986
987 clockService.recordEventTime(message.time());
988
Jordan Haltermane3813a92017-07-29 14:10:31 -0700989 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700990 if (handler != null) {
991 handler.accept(message, this);
992 } else {
993 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700994 reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700995 }
996 }
997
998 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700999 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
1000 InternalReply response = new InternalReply(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001001 clockService.timeNow(),
1002 message.id(),
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001003 payload.orElse(EMPTY_PAYLOAD),
1004 status);
1005 channel.writeAndFlush(response);
1006 }
1007 }
1008
1009 /**
1010 * Request-reply timeout history tracker.
1011 */
Jordan Haltermanef92f192017-12-21 11:59:38 -08001012 private static final class RequestMonitor {
1013 private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
Jordan Halterman05299082018-05-14 14:45:20 -07001014 private final AtomicLong max = new AtomicLong();
1015 private volatile int replyCount;
1016 private volatile long lastUpdate = System.currentTimeMillis();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001017
1018 /**
1019 * Adds a reply time to the history.
1020 *
1021 * @param replyTime the reply time to add to the history
1022 */
1023 void addReplyTime(long replyTime) {
Jordan Halterman05299082018-05-14 14:45:20 -07001024 max.accumulateAndGet(replyTime, Math::max);
1025
1026 // If at least WINDOW_UPDATE_SAMPLE_SIZE response times have been recorded, and at least
1027 // WINDOW_UPDATE_MILLIS have passed since the last update, record the maximum response time in the samples.
1028 int replyCount = ++this.replyCount;
1029 if (replyCount >= WINDOW_UPDATE_SAMPLE_SIZE
1030 && System.currentTimeMillis() - lastUpdate > WINDOW_UPDATE_MILLIS) {
1031 synchronized (this) {
1032 if (System.currentTimeMillis() - lastUpdate > WINDOW_UPDATE_MILLIS) {
1033 long lastMax = max.get();
1034 if (lastMax > 0) {
1035 samples.addValue(lastMax);
1036 lastUpdate = System.currentTimeMillis();
1037 this.replyCount = 0;
1038 max.set(0);
1039 }
1040 }
1041 }
1042 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001043 }
1044
1045 /**
Jordan Haltermanef92f192017-12-21 11:59:38 -08001046 * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
1047 *
1048 * @param elapsedTime the elapsed request time
1049 * @return indicates whether the request should be timed out
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001050 */
Jordan Haltermanef92f192017-12-21 11:59:38 -08001051 boolean isTimedOut(long elapsedTime) {
Jordan Halterman05299082018-05-14 14:45:20 -07001052 return samples.getN() == WINDOW_SIZE && phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
Jordan Haltermanef92f192017-12-21 11:59:38 -08001053 }
1054
1055 /**
1056 * Compute phi for the specified node id.
1057 *
1058 * @param elapsedTime the duration since the request was sent
1059 * @return phi value
1060 */
1061 private double phi(long elapsedTime) {
1062 if (samples.getN() < MIN_SAMPLES) {
1063 return 0.0;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001064 }
Jordan Haltermanef92f192017-12-21 11:59:38 -08001065 return computePhi(samples, elapsedTime);
1066 }
1067
1068 /**
1069 * Computes the phi value from the given samples.
1070 *
1071 * @param samples the samples from which to compute phi
1072 * @param elapsedTime the duration since the request was sent
1073 * @return phi
1074 */
1075 private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
Jordan Halterman05299082018-05-14 14:45:20 -07001076 double meanMillis = samples.getMean();
1077 double y = (elapsedTime - meanMillis) / Math.max(samples.getStandardDeviation(), MIN_STANDARD_DEVIATION);
1078 double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
1079 if (elapsedTime > meanMillis) {
1080 return -Math.log10(e / (1.0 + e));
1081 } else {
1082 return -Math.log10(1.0 - 1.0 / (1.0 + e));
1083 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -08001084 }
1085 }
JunHuy Lam39eb4292015-06-26 17:24:23 +09001086}