blob: 2f883e1e517ed81a8f451af6af8ef2459d928206 [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
JunHuy Lam39eb4292015-06-26 17:24:23 +090018import com.google.common.base.Strings;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.util.concurrent.MoreExecutors;
Madan Jampania9e70a62016-03-02 16:28:18 -080024
Aaron Kruglikov1b727382016-02-09 16:17:47 -080025import io.netty.bootstrap.Bootstrap;
26import io.netty.bootstrap.ServerBootstrap;
27import io.netty.buffer.PooledByteBufAllocator;
28import io.netty.channel.Channel;
29import io.netty.channel.ChannelFuture;
30import io.netty.channel.ChannelHandler;
31import io.netty.channel.ChannelHandlerContext;
32import io.netty.channel.ChannelInitializer;
33import io.netty.channel.ChannelOption;
34import io.netty.channel.EventLoopGroup;
35import io.netty.channel.ServerChannel;
36import io.netty.channel.SimpleChannelInboundHandler;
37import io.netty.channel.epoll.EpollEventLoopGroup;
38import io.netty.channel.epoll.EpollServerSocketChannel;
39import io.netty.channel.epoll.EpollSocketChannel;
40import io.netty.channel.nio.NioEventLoopGroup;
41import io.netty.channel.socket.SocketChannel;
42import io.netty.channel.socket.nio.NioServerSocketChannel;
43import io.netty.channel.socket.nio.NioSocketChannel;
Madan Jampania9e70a62016-03-02 16:28:18 -080044
Aaron Kruglikov1b727382016-02-09 16:17:47 -080045import org.apache.commons.pool.KeyedPoolableObjectFactory;
46import org.apache.commons.pool.impl.GenericKeyedObjectPool;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070047import org.apache.felix.scr.annotations.Activate;
48import org.apache.felix.scr.annotations.Component;
49import org.apache.felix.scr.annotations.Deactivate;
50import org.apache.felix.scr.annotations.Reference;
51import org.apache.felix.scr.annotations.ReferenceCardinality;
52import org.apache.felix.scr.annotations.Service;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080053import org.onlab.util.Tools;
Madan Jampaniec1df022015-10-13 21:23:03 -070054import org.onosproject.cluster.ClusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070055import org.onosproject.cluster.ControllerNode;
56import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampania9e70a62016-03-02 16:28:18 -080057import org.onosproject.store.cluster.messaging.MessagingException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080058import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampania9e70a62016-03-02 16:28:18 -080059import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Aaron Kruglikov1b727382016-02-09 16:17:47 -080063import javax.net.ssl.KeyManagerFactory;
64import javax.net.ssl.SSLContext;
65import javax.net.ssl.SSLEngine;
66import javax.net.ssl.TrustManagerFactory;
Madan Jampania9e70a62016-03-02 16:28:18 -080067
Aaron Kruglikov1b727382016-02-09 16:17:47 -080068import java.io.FileInputStream;
69import java.io.IOException;
70import java.security.KeyStore;
71import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080072import java.util.Optional;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080073import java.util.concurrent.CompletableFuture;
74import java.util.concurrent.ConcurrentHashMap;
75import java.util.concurrent.Executor;
76import java.util.concurrent.RejectedExecutionException;
77import java.util.concurrent.TimeUnit;
78import java.util.concurrent.TimeoutException;
79import java.util.concurrent.atomic.AtomicBoolean;
80import java.util.concurrent.atomic.AtomicLong;
81import java.util.function.BiConsumer;
82import java.util.function.BiFunction;
83import java.util.function.Consumer;
84
Madan Jampaniafeebbd2015-05-19 15:26:01 -070085/**
86 * Netty based MessagingService.
87 */
88@Component(immediate = true, enabled = true)
89@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -080090public class NettyMessagingManager implements MessagingService {
91
Madan Jampanif5c38a72016-02-17 18:26:15 -080092 private static final int REPLY_TIME_OUT_SEC = 2;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080093 private static final short MIN_KS_LENGTH = 6;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070094
95 private final Logger log = LoggerFactory.getLogger(getClass());
96
Aaron Kruglikov1b727382016-02-09 16:17:47 -080097 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
98
99 private Endpoint localEp;
100 private int preamble;
101 private final AtomicBoolean started = new AtomicBoolean(false);
102 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
103 private final AtomicLong messageIdGenerator = new AtomicLong(0);
104 private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
Madan Jampanif5c38a72016-02-17 18:26:15 -0800105 .expireAfterWrite(REPLY_TIME_OUT_SEC, TimeUnit.SECONDS)
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800106 .removalListener(new RemovalListener<Long, Callback>() {
107 @Override
108 public void onRemoval(RemovalNotification<Long, Callback> entry) {
109 if (entry.wasEvicted()) {
110 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
111 }
112 }
113 })
114 .build();
115
116 private final GenericKeyedObjectPool<Endpoint, Connection> channels
117 = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
118
119 private EventLoopGroup serverGroup;
120 private EventLoopGroup clientGroup;
121 private Class<? extends ServerChannel> serverChannelClass;
122 private Class<? extends Channel> clientChannelClass;
123
124 protected static final boolean TLS_DISABLED = false;
125 protected boolean enableNettyTls = TLS_DISABLED;
126
127 protected String ksLocation;
128 protected String tsLocation;
129 protected char[] ksPwd;
130 protected char[] tsPwd;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900131
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700133 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700134
135 @Activate
136 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700137 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800138 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800139
140 if (started.get()) {
141 log.warn("Already running at local endpoint: {}", localEp);
142 return;
143 }
144 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
145 this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
146 channels.setLifo(true);
147 channels.setTestOnBorrow(true);
148 channels.setTestOnReturn(true);
149 channels.setMinEvictableIdleTimeMillis(60_000L);
150 channels.setTimeBetweenEvictionRunsMillis(30_000L);
151 initEventLoopGroup();
152 startAcceptingConnections();
153 started.set(true);
Madan Jampanif5c38a72016-02-17 18:26:15 -0800154 serverGroup.scheduleWithFixedDelay(callbacks::cleanUp, 0, REPLY_TIME_OUT_SEC, TimeUnit.SECONDS);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700155 log.info("Started");
156 }
157
158 @Deactivate
159 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800160 if (started.get()) {
161 channels.close();
162 serverGroup.shutdownGracefully();
163 clientGroup.shutdownGracefully();
164 started.set(false);
165 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700166 log.info("Stopped");
167 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900168
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800169 private void getTlsParameters() {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900170 String tempString = System.getProperty("enableNettyTLS");
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800171 enableNettyTls = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString);
172 log.info("enableNettyTLS = {}", enableNettyTls);
173 if (enableNettyTls) {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900174 ksLocation = System.getProperty("javax.net.ssl.keyStore");
175 if (Strings.isNullOrEmpty(ksLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800176 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900177 return;
178 }
179 tsLocation = System.getProperty("javax.net.ssl.trustStore");
180 if (Strings.isNullOrEmpty(tsLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800181 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900182 return;
183 }
184 ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
185 if (MIN_KS_LENGTH > ksPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800186 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900187 return;
188 }
189 tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
190 if (MIN_KS_LENGTH > tsPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800191 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900192 return;
193 }
194 }
195 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800196 private void initEventLoopGroup() {
197 // try Epoll first and if that does work, use nio.
198 try {
199 clientGroup = new EpollEventLoopGroup();
200 serverGroup = new EpollEventLoopGroup();
201 serverChannelClass = EpollServerSocketChannel.class;
202 clientChannelClass = EpollSocketChannel.class;
203 return;
204 } catch (Throwable e) {
205 log.debug("Failed to initialize native (epoll) transport. "
206 + "Reason: {}. Proceeding with nio.", e.getMessage());
207 }
208 clientGroup = new NioEventLoopGroup();
209 serverGroup = new NioEventLoopGroup();
210 serverChannelClass = NioServerSocketChannel.class;
211 clientChannelClass = NioSocketChannel.class;
212 }
213
214 @Override
215 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
216 InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
217 localEp,
218 type,
219 payload);
220 return sendAsync(ep, message);
221 }
222
223 protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
224 if (ep.equals(localEp)) {
225 try {
226 dispatchLocally(message);
227 } catch (IOException e) {
228 return Tools.exceptionalFuture(e);
229 }
230 return CompletableFuture.completedFuture(null);
231 }
232
233 CompletableFuture<Void> future = new CompletableFuture<>();
234 try {
235 Connection connection = null;
236 try {
237 connection = channels.borrowObject(ep);
238 connection.send(message, future);
239 } finally {
240 channels.returnObject(ep, connection);
241 }
242 } catch (Exception e) {
243 future.completeExceptionally(e);
244 }
245 return future;
246 }
247
248 @Override
249 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
250 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
251 }
252
253 @Override
254 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
255 CompletableFuture<byte[]> response = new CompletableFuture<>();
256 Callback callback = new Callback(response, executor);
257 Long messageId = messageIdGenerator.incrementAndGet();
258 callbacks.put(messageId, callback);
259 InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
260 return sendAsync(ep, message).whenComplete((r, e) -> {
261 if (e != null) {
262 callbacks.invalidate(messageId);
263 }
264 }).thenCompose(v -> response);
265 }
266
267 @Override
268 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
269 handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
270 }
271
272 @Override
273 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
274 handlers.put(type, message -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800275 byte[] responsePayload = null;
276 Status status = Status.OK;
277 try {
278 responsePayload = handler.apply(message.sender(), message.payload());
279 } catch (Exception e) {
280 status = Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800281 }
Madan Jampania9e70a62016-03-02 16:28:18 -0800282 sendReply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800283 }));
284 }
285
286 @Override
287 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
288 handlers.put(type, message -> {
289 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800290 Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
291 sendReply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800292 });
293 });
294 }
295
296 @Override
297 public void unregisterHandler(String type) {
298 handlers.remove(type);
299 }
300
301 private void startAcceptingConnections() throws InterruptedException {
302 ServerBootstrap b = new ServerBootstrap();
303 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
304 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
305 b.option(ChannelOption.SO_RCVBUF, 1048576);
306 b.option(ChannelOption.TCP_NODELAY, true);
307 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
308 b.group(serverGroup, clientGroup);
309 b.channel(serverChannelClass);
310 if (enableNettyTls) {
311 b.childHandler(new SslServerCommunicationChannelInitializer());
312 } else {
313 b.childHandler(new OnosCommunicationChannelInitializer());
314 }
315 b.option(ChannelOption.SO_BACKLOG, 128);
316 b.childOption(ChannelOption.SO_KEEPALIVE, true);
317
318 // Bind and start to accept incoming connections.
319 b.bind(localEp.port()).sync().addListener(future -> {
320 if (future.isSuccess()) {
321 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
322 } else {
323 log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
324 }
325 });
326 }
327
328 private class OnosCommunicationChannelFactory
329 implements KeyedPoolableObjectFactory<Endpoint, Connection> {
330
331 @Override
332 public void activateObject(Endpoint endpoint, Connection connection)
333 throws Exception {
334 }
335
336 @Override
337 public void destroyObject(Endpoint ep, Connection connection) throws Exception {
338 log.debug("Closing connection to {}", ep);
339 //Is this the right way to destroy?
340 connection.destroy();
341 }
342
343 @Override
344 public Connection makeObject(Endpoint ep) throws Exception {
345 Bootstrap bootstrap = new Bootstrap();
346 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
347 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
348 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
349 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
350 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
351 bootstrap.group(clientGroup);
352 // TODO: Make this faster:
353 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
354 bootstrap.channel(clientChannelClass);
355 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
356 if (enableNettyTls) {
357 bootstrap.handler(new SslClientCommunicationChannelInitializer());
358 } else {
359 bootstrap.handler(new OnosCommunicationChannelInitializer());
360 }
361 // Start the client.
362 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
363 ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
364
365 f.addListener(future -> {
366 if (future.isSuccess()) {
367 retFuture.complete(f.channel());
368 } else {
369 retFuture.completeExceptionally(future.cause());
370 }
371 });
372 log.debug("Established a new connection to {}", ep);
373 return new Connection(retFuture);
374 }
375
376 @Override
377 public void passivateObject(Endpoint ep, Connection connection)
378 throws Exception {
379 }
380
381 @Override
382 public boolean validateObject(Endpoint ep, Connection connection) {
383 return connection.validate();
384 }
385 }
386
387 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
388
389 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
390 private final ChannelHandler encoder = new MessageEncoder(preamble);
391
392 @Override
393 protected void initChannel(SocketChannel channel) throws Exception {
394 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
395 KeyStore ts = KeyStore.getInstance("JKS");
396 ts.load(new FileInputStream(tsLocation), tsPwd);
397 tmFactory.init(ts);
398
399 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
400 KeyStore ks = KeyStore.getInstance("JKS");
401 ks.load(new FileInputStream(ksLocation), ksPwd);
402 kmf.init(ks, ksPwd);
403
404 SSLContext serverContext = SSLContext.getInstance("TLS");
405 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
406
407 SSLEngine serverSslEngine = serverContext.createSSLEngine();
408
409 serverSslEngine.setNeedClientAuth(true);
410 serverSslEngine.setUseClientMode(false);
411 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
412 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
413 serverSslEngine.setEnableSessionCreation(true);
414
415 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
416 .addLast("encoder", encoder)
417 .addLast("decoder", new MessageDecoder(preamble))
418 .addLast("handler", dispatcher);
419 }
420 }
421
422 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
423
424 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
425 private final ChannelHandler encoder = new MessageEncoder(preamble);
426
427 @Override
428 protected void initChannel(SocketChannel channel) throws Exception {
429 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
430 KeyStore ts = KeyStore.getInstance("JKS");
431 ts.load(new FileInputStream(tsLocation), tsPwd);
432 tmFactory.init(ts);
433
434 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
435 KeyStore ks = KeyStore.getInstance("JKS");
436 ks.load(new FileInputStream(ksLocation), ksPwd);
437 kmf.init(ks, ksPwd);
438
439 SSLContext clientContext = SSLContext.getInstance("TLS");
440 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
441
442 SSLEngine clientSslEngine = clientContext.createSSLEngine();
443
444 clientSslEngine.setUseClientMode(true);
445 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
446 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
447 clientSslEngine.setEnableSessionCreation(true);
448
449 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
450 .addLast("encoder", encoder)
451 .addLast("decoder", new MessageDecoder(preamble))
452 .addLast("handler", dispatcher);
453 }
454 }
455
456 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
457
458 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
459 private final ChannelHandler encoder = new MessageEncoder(preamble);
460
461 @Override
462 protected void initChannel(SocketChannel channel) throws Exception {
463 channel.pipeline()
464 .addLast("encoder", encoder)
465 .addLast("decoder", new MessageDecoder(preamble))
466 .addLast("handler", dispatcher);
467 }
468 }
469
470 @ChannelHandler.Sharable
471 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
472
473 @Override
474 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
475 try {
476 dispatchLocally(message);
477 } catch (RejectedExecutionException e) {
478 log.warn("Unable to dispatch message due to {}", e.getMessage());
479 }
480 }
481
482 @Override
483 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
484 log.error("Exception inside channel handling pipeline.", cause);
485 context.close();
486 }
487 }
488 private void dispatchLocally(InternalMessage message) throws IOException {
489 String type = message.type();
490 if (REPLY_MESSAGE_TYPE.equals(type)) {
491 try {
492 Callback callback =
493 callbacks.getIfPresent(message.id());
494 if (callback != null) {
Madan Jampania9e70a62016-03-02 16:28:18 -0800495 if (message.status() == Status.OK) {
496 callback.complete(message.payload());
497 } else if (message.status() == Status.ERROR_NO_HANDLER) {
498 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
499 } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
500 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
501 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800502 } else {
Madan Jampania9e70a62016-03-02 16:28:18 -0800503 log.debug("Received a reply for message id:[{}]. "
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800504 + " from {}. But was unable to locate the"
505 + " request handle", message.id(), message.sender());
506 }
507 } finally {
508 callbacks.invalidate(message.id());
509 }
510 return;
511 }
512 Consumer<InternalMessage> handler = handlers.get(type);
513 if (handler != null) {
514 handler.accept(message);
515 } else {
Madan Jampania9e70a62016-03-02 16:28:18 -0800516 log.debug("No handler for message type {}", message.type(), message.sender());
517 sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800518 }
519 }
520
Madan Jampania9e70a62016-03-02 16:28:18 -0800521 private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
522 InternalMessage response = new InternalMessage(message.id(),
523 localEp,
524 REPLY_MESSAGE_TYPE,
525 responsePayload.orElse(new byte[0]),
526 status);
527 sendAsync(message.sender(), response).whenComplete((result, error) -> {
528 if (error != null) {
529 log.debug("Failed to respond", error);
530 }
531 });
532 }
533
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800534 private final class Callback {
535 private final CompletableFuture<byte[]> future;
536 private final Executor executor;
537
538 public Callback(CompletableFuture<byte[]> future, Executor executor) {
539 this.future = future;
540 this.executor = executor;
541 }
542
543 public void complete(byte[] value) {
544 executor.execute(() -> future.complete(value));
545 }
546
547 public void completeExceptionally(Throwable error) {
548 executor.execute(() -> future.completeExceptionally(error));
549 }
550 }
551 private final class Connection {
552 private final CompletableFuture<Channel> internalFuture;
553
554 public Connection(CompletableFuture<Channel> internalFuture) {
555 this.internalFuture = internalFuture;
556 }
557
558 /**
559 * Sends a message out on its channel and associated the message with a
560 * completable future used for signaling.
561 * @param message the message to be sent
562 * @param future a future that is completed normally or exceptionally if
563 * message sending succeeds or fails respectively
564 */
565 public void send(Object message, CompletableFuture<Void> future) {
566 internalFuture.whenComplete((channel, throwable) -> {
567 if (throwable == null) {
568 channel.writeAndFlush(message).addListener(channelFuture -> {
569 if (!channelFuture.isSuccess()) {
570 future.completeExceptionally(channelFuture.cause());
571 } else {
572 future.complete(null);
573 }
574 });
575 } else {
576 future.completeExceptionally(throwable);
577 }
578 });
579 }
580
581 /**
582 * Destroys a channel by closing its channel (if it exists) and
583 * cancelling its future.
584 */
585 public void destroy() {
586 Channel channel = internalFuture.getNow(null);
587 if (channel != null) {
588 channel.close();
589 }
590 internalFuture.cancel(false);
591 }
592
593 /**
594 * Determines whether the connection is valid meaning it is either
595 * complete with and active channel
596 * or it has not yet completed.
597 * @return true if the channel has an active connection or has not
598 * yet completed
599 */
600 public boolean validate() {
601 if (internalFuture.isCompletedExceptionally()) {
602 return false;
603 }
604 Channel channel = internalFuture.getNow(null);
605 return channel == null || channel.isActive();
606 }
607 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900608}