blob: 9f09ac4369c020c110fa0043498a57ec327437fa [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import javax.net.ssl.KeyManagerFactory;
19import javax.net.ssl.SSLContext;
20import javax.net.ssl.SSLEngine;
21import javax.net.ssl.TrustManagerFactory;
Brian O'Connor740e98c2017-06-29 17:07:17 -070022import java.io.File;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080023import java.io.FileInputStream;
Brian O'Connor740e98c2017-06-29 17:07:17 -070024import java.io.FileNotFoundException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070025import java.net.ConnectException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080026import java.security.KeyStore;
Brian O'Connor740e98c2017-06-29 17:07:17 -070027import java.security.MessageDigest;
Brian O'Connorf69e3e32018-05-10 02:25:09 -070028import java.security.PublicKey;
Brian O'Connor740e98c2017-06-29 17:07:17 -070029import java.security.cert.Certificate;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070030import java.time.Duration;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070031import java.util.ArrayList;
Brian O'Connor740e98c2017-06-29 17:07:17 -070032import java.util.Enumeration;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070033import java.util.Iterator;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070034import java.util.List;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080035import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080036import java.util.Optional;
Brian O'Connor740e98c2017-06-29 17:07:17 -070037import java.util.StringJoiner;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080038import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.ConcurrentHashMap;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070040import java.util.concurrent.ExecutionException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080041import java.util.concurrent.Executor;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070042import java.util.concurrent.Executors;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080043import java.util.concurrent.RejectedExecutionException;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070044import java.util.concurrent.ScheduledExecutorService;
45import java.util.concurrent.ScheduledFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080046import java.util.concurrent.TimeUnit;
47import java.util.concurrent.TimeoutException;
48import java.util.concurrent.atomic.AtomicBoolean;
49import java.util.concurrent.atomic.AtomicLong;
50import java.util.function.BiConsumer;
51import java.util.function.BiFunction;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070052import java.util.function.Function;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080053
Jordan Halterman23e73c52018-01-13 14:10:56 -080054import com.google.common.base.Throwables;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070055import com.google.common.cache.Cache;
56import com.google.common.cache.CacheBuilder;
57import com.google.common.collect.Lists;
58import com.google.common.collect.Maps;
59import com.google.common.util.concurrent.MoreExecutors;
60import io.netty.bootstrap.Bootstrap;
61import io.netty.bootstrap.ServerBootstrap;
62import io.netty.buffer.PooledByteBufAllocator;
63import io.netty.channel.Channel;
64import io.netty.channel.ChannelFuture;
65import io.netty.channel.ChannelHandler;
66import io.netty.channel.ChannelHandlerContext;
67import io.netty.channel.ChannelInitializer;
68import io.netty.channel.ChannelOption;
69import io.netty.channel.EventLoopGroup;
70import io.netty.channel.ServerChannel;
71import io.netty.channel.SimpleChannelInboundHandler;
72import io.netty.channel.WriteBufferWaterMark;
73import io.netty.channel.epoll.EpollEventLoopGroup;
74import io.netty.channel.epoll.EpollServerSocketChannel;
75import io.netty.channel.epoll.EpollSocketChannel;
76import io.netty.channel.nio.NioEventLoopGroup;
77import io.netty.channel.socket.SocketChannel;
78import io.netty.channel.socket.nio.NioServerSocketChannel;
79import io.netty.channel.socket.nio.NioSocketChannel;
80import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
81import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
82import org.apache.felix.scr.annotations.Activate;
83import org.apache.felix.scr.annotations.Component;
84import org.apache.felix.scr.annotations.Deactivate;
85import org.apache.felix.scr.annotations.Reference;
86import org.apache.felix.scr.annotations.ReferenceCardinality;
87import org.apache.felix.scr.annotations.Service;
88import org.onosproject.cluster.ClusterMetadataService;
89import org.onosproject.cluster.ControllerNode;
90import org.onosproject.core.HybridLogicalClockService;
91import org.onosproject.store.cluster.messaging.Endpoint;
92import org.onosproject.store.cluster.messaging.MessagingException;
93import org.onosproject.store.cluster.messaging.MessagingService;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -070094import org.slf4j.Logger;
95import org.slf4j.LoggerFactory;
96
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070097import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090098import static org.onosproject.security.AppGuard.checkPermission;
99import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
100
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700101/**
102 * Netty based MessagingService.
103 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -0700104@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700105@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800106public class NettyMessagingManager implements MessagingService {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700107 private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700108 private static final long TIMEOUT_INTERVAL = 50;
109 private static final int WINDOW_SIZE = 100;
Jordan Haltermanef92f192017-12-21 11:59:38 -0800110 private static final int MIN_SAMPLES = 25;
111 private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
112 private static final int PHI_FAILURE_THRESHOLD = 5;
Jordan Halterman111aab72018-01-12 16:28:57 -0800113 private static final long MIN_TIMEOUT_MILLIS = 100;
Jordan Haltermanef92f192017-12-21 11:59:38 -0800114 private static final long MAX_TIMEOUT_MILLIS = 15000;
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700115 private static final int CHANNEL_POOL_SIZE = 8;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700116
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700117 private static final byte[] EMPTY_PAYLOAD = new byte[0];
118
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700119 private final Logger log = LoggerFactory.getLogger(getClass());
120
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800121 private final LocalClientConnection localClientConnection = new LocalClientConnection();
122 private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800123
Brian O'Connor740e98c2017-06-29 17:07:17 -0700124 //TODO CONFIG_DIR is duplicated from ConfigFileBasedClusterMetadataProvider
125 private static final String CONFIG_DIR = "../config";
126 private static final String KS_FILE_NAME = "onos.jks";
127 private static final File DEFAULT_KS_FILE = new File(CONFIG_DIR, KS_FILE_NAME);
128 private static final String DEFAULT_KS_PASSWORD = "changeit";
129
Madan Jampani05833872016-07-12 23:01:39 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected HybridLogicalClockService clockService;
132
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700133 private Endpoint localEndpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800134 private int preamble;
135 private final AtomicBoolean started = new AtomicBoolean(false);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700136 private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700137 private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
138 private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800139 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800140
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700141 private ScheduledFuture<?> timeoutFuture;
142
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700143 private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800144
145 private EventLoopGroup serverGroup;
146 private EventLoopGroup clientGroup;
147 private Class<? extends ServerChannel> serverChannelClass;
148 private Class<? extends Channel> clientChannelClass;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700149 private ScheduledExecutorService timeoutExecutor;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800150
Brian O'Connor740e98c2017-06-29 17:07:17 -0700151 protected static final boolean TLS_ENABLED = true;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800152 protected static final boolean TLS_DISABLED = false;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700153 protected boolean enableNettyTls = TLS_ENABLED;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800154
Brian O'Connor740e98c2017-06-29 17:07:17 -0700155 protected TrustManagerFactory trustManager;
156 protected KeyManagerFactory keyManager;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900157
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700158 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700159 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700160
161 @Activate
Ray Milkey986a47a2018-01-25 11:38:51 -0800162 public void activate() throws InterruptedException {
Madan Jampaniec1df022015-10-13 21:23:03 -0700163 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800164 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800165
166 if (started.get()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700167 log.warn("Already running at local endpoint: {}", localEndpoint);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800168 return;
169 }
170 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700171 this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800172 initEventLoopGroup();
173 startAcceptingConnections();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700174 timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
175 groupedThreads("NettyMessagingEvt", "timeout", log));
176 timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
177 this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800178 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700179 log.info("Started");
180 }
181
182 @Deactivate
Ray Milkey986a47a2018-01-25 11:38:51 -0800183 public void deactivate() {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800184 if (started.get()) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800185 serverGroup.shutdownGracefully();
186 clientGroup.shutdownGracefully();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700187 timeoutFuture.cancel(false);
188 timeoutExecutor.shutdown();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800189 started.set(false);
190 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700191 log.info("Stopped");
192 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900193
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800194 private void getTlsParameters() {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700195 // default is TLS enabled unless key stores cannot be loaded
196 enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(TLS_ENABLED)));
197
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800198 if (enableNettyTls) {
Brian O'Connor740e98c2017-06-29 17:07:17 -0700199 enableNettyTls = loadKeyStores();
200 }
201 }
202
203 private boolean loadKeyStores() {
204 // Maintain a local copy of the trust and key managers in case anything goes wrong
205 TrustManagerFactory tmf;
206 KeyManagerFactory kmf;
207 try {
208 String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
209 String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
210 char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
211 char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
212
213 tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
214 KeyStore ts = KeyStore.getInstance("JKS");
215 ts.load(new FileInputStream(tsLocation), tsPwd);
216 tmf.init(ts);
217
218 kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
219 KeyStore ks = KeyStore.getInstance("JKS");
220 ks.load(new FileInputStream(ksLocation), ksPwd);
221 kmf.init(ks, ksPwd);
222 if (log.isInfoEnabled()) {
223 logKeyStore(ks, ksLocation, ksPwd);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900224 }
Brian O'Connor740e98c2017-06-29 17:07:17 -0700225 } catch (FileNotFoundException e) {
226 log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", e.getMessage());
227 return TLS_DISABLED;
228 } catch (Exception e) {
229 //TODO we might want to catch exceptions more specifically
230 log.error("Error loading key store; disabling TLS for intra-cluster messaging", e);
231 return TLS_DISABLED;
232 }
233 this.trustManager = tmf;
234 this.keyManager = kmf;
235 return TLS_ENABLED;
236 }
237
238 private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
239 if (log.isInfoEnabled()) {
240 log.info("Loaded cluster key store from: {}", ksLocation);
241 try {
242 for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
243 String alias = e.nextElement();
Brian O'Connorf69e3e32018-05-10 02:25:09 -0700244 Certificate cert = ks.getCertificate(alias);
245 if (cert == null) {
246 log.info("No certificate for alias {}", alias);
247 continue;
Brian O'Connor740e98c2017-06-29 17:07:17 -0700248 }
Brian O'Connorf69e3e32018-05-10 02:25:09 -0700249 PublicKey key = cert.getPublicKey();
Brian O'Connor740e98c2017-06-29 17:07:17 -0700250 // Compute the certificate's fingerprint (use the key if certificate cannot be found)
251 MessageDigest digest = MessageDigest.getInstance("SHA1");
Brian O'Connorf69e3e32018-05-10 02:25:09 -0700252 digest.update(key.getEncoded());
Brian O'Connor740e98c2017-06-29 17:07:17 -0700253 StringJoiner fingerprint = new StringJoiner(":");
254 for (byte b : digest.digest()) {
255 fingerprint.add(String.format("%02X", b));
256 }
257 log.info("{} -> {}", alias, fingerprint);
258 }
259 } catch (Exception e) {
260 log.warn("Unable to print contents of key store: {}", ksLocation, e);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900261 }
262 }
263 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700264
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800265 private void initEventLoopGroup() {
266 // try Epoll first and if that does work, use nio.
267 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700268 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
269 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800270 serverChannelClass = EpollServerSocketChannel.class;
271 clientChannelClass = EpollSocketChannel.class;
272 return;
273 } catch (Throwable e) {
274 log.debug("Failed to initialize native (epoll) transport. "
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700275 + "Reason: {}. Proceeding with nio.", e.getMessage());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800276 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700277 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
278 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800279 serverChannelClass = NioServerSocketChannel.class;
280 clientChannelClass = NioSocketChannel.class;
281 }
282
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700283 /**
284 * Times out response callbacks.
285 */
286 private void timeoutAllCallbacks() {
287 // Iterate through all connections and time out callbacks.
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800288 localClientConnection.timeoutCallbacks();
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700289 for (RemoteClientConnection connection : clientConnections.values()) {
290 connection.timeoutCallbacks();
291 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700292 }
293
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800294 @Override
295 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900296 checkPermission(CLUSTER_WRITE);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700297 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700298 clockService.timeNow(),
299 messageIdGenerator.incrementAndGet(),
300 localEndpoint,
301 type,
302 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700303 return executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800304 }
305
306 @Override
307 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900308 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800309 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
310 }
311
312 @Override
313 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900314 checkPermission(CLUSTER_WRITE);
Jordan Halterman5ceb3892017-08-28 15:35:03 -0700315 long messageId = messageIdGenerator.incrementAndGet();
Jordan Haltermane3813a92017-07-29 14:10:31 -0700316 InternalRequest message = new InternalRequest(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700317 clockService.timeNow(),
318 messageId,
319 localEndpoint,
320 type,
321 payload);
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700322 return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700323 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700324
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700325 private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
326 return channels.computeIfAbsent(endpoint, e -> {
327 List<CompletableFuture<Channel>> defaultList = new ArrayList<>(CHANNEL_POOL_SIZE);
328 for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
329 defaultList.add(null);
330 }
331 return Lists.newCopyOnWriteArrayList(defaultList);
332 });
333 }
334
335 private int getChannelOffset(String messageType) {
336 return Math.abs(messageType.hashCode() % CHANNEL_POOL_SIZE);
337 }
338
Jordan Halterman94db1912018-02-08 14:45:20 -0800339 private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
Jordan Halterman23e73c52018-01-13 14:10:56 -0800340 List<CompletableFuture<Channel>> channelPool = getChannelPool(endpoint);
Jordan Halterman94db1912018-02-08 14:45:20 -0800341 int offset = getChannelOffset(messageType);
Jordan Halterman23e73c52018-01-13 14:10:56 -0800342
Jordan Halterman23e73c52018-01-13 14:10:56 -0800343 CompletableFuture<Channel> channelFuture = channelPool.get(offset);
344 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
345 synchronized (channelPool) {
346 channelFuture = channelPool.get(offset);
347 if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
348 channelFuture = openChannel(endpoint);
349 channelPool.set(offset, channelFuture);
350 }
351 }
352 }
353
Jordan Halterman94db1912018-02-08 14:45:20 -0800354 CompletableFuture<Channel> future = new CompletableFuture<>();
Jordan Halterman23e73c52018-01-13 14:10:56 -0800355 final CompletableFuture<Channel> finalFuture = channelFuture;
356 finalFuture.whenComplete((channel, error) -> {
357 if (error == null) {
358 if (!channel.isActive()) {
Jordan Halterman94db1912018-02-08 14:45:20 -0800359 CompletableFuture<Channel> currentFuture;
Jordan Halterman23e73c52018-01-13 14:10:56 -0800360 synchronized (channelPool) {
361 currentFuture = channelPool.get(offset);
362 if (currentFuture == finalFuture) {
363 channelPool.set(offset, null);
364 }
365 }
Jordan Halterman94db1912018-02-08 14:45:20 -0800366
367 ClientConnection connection = clientConnections.remove(channel);
368 if (connection != null) {
369 connection.close();
370 }
371
Jordan Halterman23e73c52018-01-13 14:10:56 -0800372 if (currentFuture == finalFuture) {
Jordan Halterman94db1912018-02-08 14:45:20 -0800373 getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
374 if (recursiveError == null) {
375 future.complete(recursiveResult);
376 } else {
377 future.completeExceptionally(recursiveError);
378 }
379 });
Jordan Halterman23e73c52018-01-13 14:10:56 -0800380 } else {
381 currentFuture.whenComplete((recursiveResult, recursiveError) -> {
382 if (recursiveError == null) {
Jordan Halterman94db1912018-02-08 14:45:20 -0800383 future.complete(recursiveResult);
Jordan Halterman23e73c52018-01-13 14:10:56 -0800384 } else {
385 future.completeExceptionally(recursiveError);
386 }
387 });
388 }
389 } else {
Jordan Halterman94db1912018-02-08 14:45:20 -0800390 future.complete(channel);
Jordan Halterman23e73c52018-01-13 14:10:56 -0800391 }
Ray Milkey4f350762018-01-23 23:32:03 +0000392 } else {
Jordan Halterman23e73c52018-01-13 14:10:56 -0800393 future.completeExceptionally(error);
Ray Milkey4f350762018-01-23 23:32:03 +0000394 }
395 });
Jordan Halterman94db1912018-02-08 14:45:20 -0800396 return future;
397 }
398
399 private <T> CompletableFuture<T> executeOnPooledConnection(
400 Endpoint endpoint,
401 String type,
402 Function<ClientConnection, CompletableFuture<T>> callback,
403 Executor executor) {
404 CompletableFuture<T> future = new CompletableFuture<T>();
405 executeOnPooledConnection(endpoint, type, callback, executor, future);
406 return future;
407 }
408
409 private <T> void executeOnPooledConnection(
410 Endpoint endpoint,
411 String type,
412 Function<ClientConnection, CompletableFuture<T>> callback,
413 Executor executor,
414 CompletableFuture<T> future) {
415 if (endpoint.equals(localEndpoint)) {
416 callback.apply(localClientConnection).whenComplete((result, error) -> {
417 if (error == null) {
418 executor.execute(() -> future.complete(result));
419 } else {
420 executor.execute(() -> future.completeExceptionally(error));
421 }
422 });
423 return;
424 }
425
426 getChannel(endpoint, type).whenComplete((channel, channelError) -> {
427 if (channelError == null) {
428 ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
429 callback.apply(connection).whenComplete((result, sendError) -> {
430 if (sendError == null) {
431 executor.execute(() -> future.complete(result));
432 } else {
433 Throwable cause = Throwables.getRootCause(sendError);
434 if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
435 channel.close().addListener(f -> {
436 connection.close();
437 clientConnections.remove(channel);
438 });
439 }
440 executor.execute(() -> future.completeExceptionally(sendError));
441 }
442 });
443 } else {
444 executor.execute(() -> future.completeExceptionally(channelError));
445 }
446 });
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800447 }
448
449 @Override
450 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900451 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700452 handlers.put(type, (message, connection) -> executor.execute(() ->
453 handler.accept(message.sender(), message.payload())));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800454 }
455
456 @Override
457 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900458 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700459 handlers.put(type, (message, connection) -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800460 byte[] responsePayload = null;
Jordan Haltermane3813a92017-07-29 14:10:31 -0700461 InternalReply.Status status = InternalReply.Status.OK;
Madan Jampania9e70a62016-03-02 16:28:18 -0800462 try {
463 responsePayload = handler.apply(message.sender(), message.payload());
464 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700465 log.debug("An error occurred in a message handler: {}", e);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700466 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800467 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700468 connection.reply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800469 }));
470 }
471
472 @Override
473 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900474 checkPermission(CLUSTER_WRITE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700475 handlers.put(type, (message, connection) -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800476 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700477 InternalReply.Status status;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700478 if (error == null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700479 status = InternalReply.Status.OK;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700480 } else {
481 log.debug("An error occurred in a message handler: {}", error);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700482 status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700483 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700484 connection.reply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800485 });
486 });
487 }
488
489 @Override
490 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900491 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800492 handlers.remove(type);
493 }
494
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700495 private Bootstrap bootstrapClient(Endpoint endpoint) {
496 Bootstrap bootstrap = new Bootstrap();
497 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
498 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
499 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
500 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
501 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
502 bootstrap.group(clientGroup);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700503 bootstrap.channel(clientChannelClass);
504 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
505 bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700506 if (enableNettyTls) {
507 bootstrap.handler(new SslClientCommunicationChannelInitializer());
508 } else {
509 bootstrap.handler(new BasicChannelInitializer());
510 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700511 return bootstrap;
512 }
513
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800514 private void startAcceptingConnections() throws InterruptedException {
515 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800516 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700517 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800518 b.option(ChannelOption.SO_RCVBUF, 1048576);
Jordan Halterman153dbd52017-12-21 11:08:19 -0800519 b.childOption(ChannelOption.SO_KEEPALIVE, true);
520 b.childOption(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700521 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800522 b.group(serverGroup, clientGroup);
523 b.channel(serverChannelClass);
524 if (enableNettyTls) {
525 b.childHandler(new SslServerCommunicationChannelInitializer());
526 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700527 b.childHandler(new BasicChannelInitializer());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800528 }
529 b.option(ChannelOption.SO_BACKLOG, 128);
530 b.childOption(ChannelOption.SO_KEEPALIVE, true);
531
532 // Bind and start to accept incoming connections.
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700533 b.bind(localEndpoint.port()).sync().addListener(future -> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800534 if (future.isSuccess()) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700535 log.info("{} accepting incoming connections on port {}",
536 localEndpoint.host(), localEndpoint.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800537 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700538 log.warn("{} failed to bind to port {} due to {}",
539 localEndpoint.host(), localEndpoint.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800540 }
541 });
542 }
543
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700544 private CompletableFuture<Channel> openChannel(Endpoint ep) {
545 Bootstrap bootstrap = bootstrapClient(ep);
546 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
547 ChannelFuture f = bootstrap.connect();
548
549 f.addListener(future -> {
550 if (future.isSuccess()) {
551 retFuture.complete(f.channel());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800552 } else {
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700553 retFuture.completeExceptionally(future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800554 }
Jordan Halterman66e6e3b2017-07-10 11:26:34 -0700555 });
556 log.debug("Established a new connection to {}", ep);
557 return retFuture;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800558 }
559
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700560 /**
561 * Channel initializer for TLS servers.
562 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800563 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800564 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800565
566 @Override
567 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800568 SSLContext serverContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700569 serverContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800570
571 SSLEngine serverSslEngine = serverContext.createSSLEngine();
572
573 serverSslEngine.setNeedClientAuth(true);
574 serverSslEngine.setUseClientMode(false);
575 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
576 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
577 serverSslEngine.setEnableSessionCreation(true);
578
579 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700580 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700581 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800582 .addLast("handler", dispatcher);
583 }
584 }
585
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700586 /**
587 * Channel initializer for TLS clients.
588 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800589 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800590 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800591
592 @Override
593 protected void initChannel(SocketChannel channel) throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800594 SSLContext clientContext = SSLContext.getInstance("TLS");
Brian O'Connor740e98c2017-06-29 17:07:17 -0700595 clientContext.init(keyManager.getKeyManagers(), trustManager.getTrustManagers(), null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800596
597 SSLEngine clientSslEngine = clientContext.createSSLEngine();
598
599 clientSslEngine.setUseClientMode(true);
600 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
601 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
602 clientSslEngine.setEnableSessionCreation(true);
603
604 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
Jordan Haltermane3813a92017-07-29 14:10:31 -0700605 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700606 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800607 .addLast("handler", dispatcher);
608 }
609 }
610
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700611 /**
612 * Channel initializer for basic connections.
613 */
614 private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800615 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800616
617 @Override
618 protected void initChannel(SocketChannel channel) throws Exception {
619 channel.pipeline()
Jordan Haltermane3813a92017-07-29 14:10:31 -0700620 .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
Madan Jampanib825aeb2016-04-01 15:18:25 -0700621 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800622 .addLast("handler", dispatcher);
623 }
624 }
625
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700626 /**
627 * Channel inbound handler that dispatches messages to the appropriate handler.
628 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800629 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700630 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700631 // Effectively SimpleChannelInboundHandler<InternalMessage>,
632 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800633
634 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700635 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
636 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800637 try {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700638 if (message.isRequest()) {
639 RemoteServerConnection connection =
640 serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700641 connection.dispatch((InternalRequest) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700642 } else {
643 RemoteClientConnection connection =
644 clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700645 connection.dispatch((InternalReply) message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700646 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800647 } catch (RejectedExecutionException e) {
648 log.warn("Unable to dispatch message due to {}", e.getMessage());
649 }
650 }
651
652 @Override
653 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
654 log.error("Exception inside channel handling pipeline.", cause);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700655
656 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
657 if (clientConnection != null) {
658 clientConnection.close();
659 }
660
661 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
662 if (serverConnection != null) {
663 serverConnection.close();
664 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800665 context.close();
666 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700667
Jordan Halterman23e73c52018-01-13 14:10:56 -0800668 @Override
669 public void channelInactive(ChannelHandlerContext context) throws Exception {
670 RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
671 if (clientConnection != null) {
672 clientConnection.close();
673 }
674
675 RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
676 if (serverConnection != null) {
677 serverConnection.close();
678 }
679 context.close();
680 }
681
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700682 /**
683 * Returns true if the given message should be handled.
684 *
685 * @param msg inbound message
686 * @return true if {@code msg} is {@link InternalMessage} instance.
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700687 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
688 */
689 @Override
690 public final boolean acceptInboundMessage(Object msg) {
691 return msg instanceof InternalMessage;
692 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800693 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700694
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700695 /**
696 * Wraps a {@link CompletableFuture} and tracks its type and creation time.
697 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800698 private final class Callback {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700699 private final String type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800700 private final CompletableFuture<byte[]> future;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700701 private final long time = System.currentTimeMillis();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800702
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700703 Callback(String type, CompletableFuture<byte[]> future) {
704 this.type = type;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800705 this.future = future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800706 }
707
708 public void complete(byte[] value) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700709 future.complete(value);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800710 }
711
712 public void completeExceptionally(Throwable error) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700713 future.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800714 }
715 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800716
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700717 /**
718 * Represents the client side of a connection to a local or remote server.
719 */
720 private interface ClientConnection {
721
722 /**
723 * Sends a message to the other side of the connection.
724 *
725 * @param message the message to send
726 * @return a completable future to be completed once the message has been sent
727 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700728 CompletableFuture<Void> sendAsync(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700729
730 /**
731 * Sends a message to the other side of the connection, awaiting a reply.
732 *
733 * @param message the message to send
734 * @return a completable future to be completed once a reply is received or the request times out
735 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700736 CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700737
738 /**
739 * Closes the connection.
740 */
741 default void close() {
742 }
743 }
744
745 /**
746 * Represents the server side of a connection.
747 */
748 private interface ServerConnection {
749
750 /**
751 * Sends a reply to the other side of the connection.
752 *
753 * @param message the message to which to reply
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800754 * @param status the reply status
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700755 * @param payload the response payload
756 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700757 void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700758
759 /**
760 * Closes the connection.
761 */
762 default void close() {
763 }
764 }
765
766 /**
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800767 * Remote connection implementation.
768 */
769 private abstract class AbstractClientConnection implements ClientConnection {
770 private final Map<Long, Callback> futures = Maps.newConcurrentMap();
771 private final AtomicBoolean closed = new AtomicBoolean(false);
Jordan Haltermanef92f192017-12-21 11:59:38 -0800772 private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800773 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
774 .build();
775
776 /**
777 * Times out callbacks for this connection.
778 */
Jordan Haltermanef92f192017-12-21 11:59:38 -0800779 void timeoutCallbacks() {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800780 // Store the current time.
781 long currentTime = System.currentTimeMillis();
782
783 // Iterate through future callbacks and time out callbacks that have been alive
784 // longer than the current timeout according to the message type.
785 Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
786 while (iterator.hasNext()) {
787 Callback callback = iterator.next().getValue();
788 try {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800789 RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
790 long elapsedTime = currentTime - callback.time;
Jordan Halterman111aab72018-01-12 16:28:57 -0800791 if (elapsedTime > MAX_TIMEOUT_MILLIS ||
792 (elapsedTime > MIN_TIMEOUT_MILLIS && requestMonitor.isTimedOut(elapsedTime))) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800793 iterator.remove();
Jordan Haltermanef92f192017-12-21 11:59:38 -0800794 requestMonitor.addReplyTime(elapsedTime);
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800795 callback.completeExceptionally(
796 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
797 }
798 } catch (ExecutionException e) {
799 throw new AssertionError();
800 }
801 }
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800802 }
803
804 protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
805 futures.put(id, new Callback(subject, future));
806 }
807
808 protected Callback completeCallback(long id) {
809 Callback callback = futures.remove(id);
810 if (callback != null) {
811 try {
Jordan Haltermanef92f192017-12-21 11:59:38 -0800812 RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
813 requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800814 } catch (ExecutionException e) {
815 throw new AssertionError();
816 }
817 }
818 return callback;
819 }
820
821 protected Callback failCallback(long id) {
822 return futures.remove(id);
823 }
824
825 @Override
826 public void close() {
827 if (closed.compareAndSet(false, true)) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800828 for (Callback callback : futures.values()) {
829 callback.completeExceptionally(new ConnectException());
830 }
831 }
832 }
833 }
834
835 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700836 * Local connection implementation.
837 */
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800838 private final class LocalClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700839 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700840 public CompletableFuture<Void> sendAsync(InternalRequest message) {
841 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700842 if (handler != null) {
843 handler.accept(message, localServerConnection);
844 } else {
845 log.debug("No handler for message type {} from {}", message.type(), message.sender());
846 }
847 return CompletableFuture.completedFuture(null);
848 }
849
850 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700851 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700852 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800853 future.whenComplete((r, e) -> completeCallback(message.id()));
854 registerCallback(message.id(), message.subject(), future);
Jordan Haltermane3813a92017-07-29 14:10:31 -0700855 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700856 if (handler != null) {
857 handler.accept(message, new LocalServerConnection(future));
858 } else {
859 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700860 new LocalServerConnection(future)
861 .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700862 }
863 return future;
864 }
865 }
866
867 /**
868 * Local server connection.
869 */
870 private final class LocalServerConnection implements ServerConnection {
871 private final CompletableFuture<byte[]> future;
872
873 LocalServerConnection(CompletableFuture<byte[]> future) {
874 this.future = future;
875 }
876
877 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700878 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700879 if (future != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700880 if (status == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700881 future.complete(payload.orElse(EMPTY_PAYLOAD));
Jordan Haltermane3813a92017-07-29 14:10:31 -0700882 } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700883 future.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700884 } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700885 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700886 } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700887 future.completeExceptionally(new MessagingException.ProtocolException());
888 }
889 }
890 }
891 }
892
893 /**
894 * Remote connection implementation.
895 */
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800896 private final class RemoteClientConnection extends AbstractClientConnection {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700897 private final Channel channel;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700898
899 RemoteClientConnection(Channel channel) {
900 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800901 }
902
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700903 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700904 public CompletableFuture<Void> sendAsync(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700905 CompletableFuture<Void> future = new CompletableFuture<>();
906 channel.writeAndFlush(message).addListener(channelFuture -> {
907 if (!channelFuture.isSuccess()) {
908 future.completeExceptionally(channelFuture.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800909 } else {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700910 future.complete(null);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800911 }
912 });
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700913 return future;
914 }
915
916 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700917 public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700918 CompletableFuture<byte[]> future = new CompletableFuture<>();
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800919 registerCallback(message.id(), message.subject(), future);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700920 channel.writeAndFlush(message).addListener(channelFuture -> {
921 if (!channelFuture.isSuccess()) {
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800922 Callback callback = failCallback(message.id());
923 if (callback != null) {
924 callback.completeExceptionally(channelFuture.cause());
925 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700926 }
927 });
928 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800929 }
930
931 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700932 * Dispatches a message to a local handler.
933 *
934 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800935 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700936 private void dispatch(InternalReply message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700937 if (message.preamble() != preamble) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700938 log.debug("Received {} with invalid preamble", message.type());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700939 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800940 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700941
942 clockService.recordEventTime(message.time());
943
Jordan Halterman1cf233c2017-12-08 23:52:54 -0800944 Callback callback = completeCallback(message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700945 if (callback != null) {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700946 if (message.status() == InternalReply.Status.OK) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700947 callback.complete(message.payload());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700948 } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700949 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700950 } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700951 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700952 } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700953 callback.completeExceptionally(new MessagingException.ProtocolException());
954 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700955 } else {
Jordan Haltermane3813a92017-07-29 14:10:31 -0700956 log.debug("Received a reply for message id:[{}] "
957 + "but was unable to locate the"
958 + " request handle", message.id());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700959 }
960 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700961 }
962
963 /**
964 * Remote server connection.
965 */
966 private final class RemoteServerConnection implements ServerConnection {
967 private final Channel channel;
968
969 RemoteServerConnection(Channel channel) {
970 this.channel = channel;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800971 }
972
973 /**
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700974 * Dispatches a message to a local handler.
975 *
976 * @param message the message to dispatch
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800977 */
Jordan Haltermane3813a92017-07-29 14:10:31 -0700978 private void dispatch(InternalRequest message) {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700979 if (message.preamble() != preamble) {
980 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700981 reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700982 return;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800983 }
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700984
985 clockService.recordEventTime(message.time());
986
Jordan Haltermane3813a92017-07-29 14:10:31 -0700987 BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700988 if (handler != null) {
989 handler.accept(message, this);
990 } else {
991 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Jordan Haltermane3813a92017-07-29 14:10:31 -0700992 reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700993 }
994 }
995
996 @Override
Jordan Haltermane3813a92017-07-29 14:10:31 -0700997 public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
998 InternalReply response = new InternalReply(preamble,
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700999 clockService.timeNow(),
1000 message.id(),
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001001 payload.orElse(EMPTY_PAYLOAD),
1002 status);
1003 channel.writeAndFlush(response);
1004 }
1005 }
1006
1007 /**
1008 * Request-reply timeout history tracker.
1009 */
Jordan Haltermanef92f192017-12-21 11:59:38 -08001010 private static final class RequestMonitor {
1011 private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001012
1013 /**
1014 * Adds a reply time to the history.
1015 *
1016 * @param replyTime the reply time to add to the history
1017 */
1018 void addReplyTime(long replyTime) {
Jordan Haltermanef92f192017-12-21 11:59:38 -08001019 samples.addValue(replyTime);
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001020 }
1021
1022 /**
Jordan Haltermanef92f192017-12-21 11:59:38 -08001023 * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
1024 *
1025 * @param elapsedTime the elapsed request time
1026 * @return indicates whether the request should be timed out
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001027 */
Jordan Haltermanef92f192017-12-21 11:59:38 -08001028 boolean isTimedOut(long elapsedTime) {
1029 return phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
1030 }
1031
1032 /**
1033 * Compute phi for the specified node id.
1034 *
1035 * @param elapsedTime the duration since the request was sent
1036 * @return phi value
1037 */
1038 private double phi(long elapsedTime) {
1039 if (samples.getN() < MIN_SAMPLES) {
1040 return 0.0;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -07001041 }
Jordan Haltermanef92f192017-12-21 11:59:38 -08001042 return computePhi(samples, elapsedTime);
1043 }
1044
1045 /**
1046 * Computes the phi value from the given samples.
1047 *
1048 * @param samples the samples from which to compute phi
1049 * @param elapsedTime the duration since the request was sent
1050 * @return phi
1051 */
1052 private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
1053 return samples.getN() > 0 ? PHI_FACTOR * elapsedTime / samples.getMean() : 100;
Aaron Kruglikov1b727382016-02-09 16:17:47 -08001054 }
1055 }
JunHuy Lam39eb4292015-06-26 17:24:23 +09001056}