blob: 3d4a4f71ee208309cce33d396c3cc17b5ab396cf [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
JunHuy Lam39eb4292015-06-26 17:24:23 +090018import com.google.common.base.Strings;
Madan Jampaniec1df022015-10-13 21:23:03 -070019
Aaron Kruglikov1b727382016-02-09 16:17:47 -080020import com.google.common.cache.Cache;
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.RemovalListener;
23import com.google.common.cache.RemovalNotification;
24import com.google.common.util.concurrent.MoreExecutors;
25import io.netty.bootstrap.Bootstrap;
26import io.netty.bootstrap.ServerBootstrap;
27import io.netty.buffer.PooledByteBufAllocator;
28import io.netty.channel.Channel;
29import io.netty.channel.ChannelFuture;
30import io.netty.channel.ChannelHandler;
31import io.netty.channel.ChannelHandlerContext;
32import io.netty.channel.ChannelInitializer;
33import io.netty.channel.ChannelOption;
34import io.netty.channel.EventLoopGroup;
35import io.netty.channel.ServerChannel;
36import io.netty.channel.SimpleChannelInboundHandler;
37import io.netty.channel.epoll.EpollEventLoopGroup;
38import io.netty.channel.epoll.EpollServerSocketChannel;
39import io.netty.channel.epoll.EpollSocketChannel;
40import io.netty.channel.nio.NioEventLoopGroup;
41import io.netty.channel.socket.SocketChannel;
42import io.netty.channel.socket.nio.NioServerSocketChannel;
43import io.netty.channel.socket.nio.NioSocketChannel;
44import org.apache.commons.pool.KeyedPoolableObjectFactory;
45import org.apache.commons.pool.impl.GenericKeyedObjectPool;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070046import org.apache.felix.scr.annotations.Activate;
47import org.apache.felix.scr.annotations.Component;
48import org.apache.felix.scr.annotations.Deactivate;
49import org.apache.felix.scr.annotations.Reference;
50import org.apache.felix.scr.annotations.ReferenceCardinality;
51import org.apache.felix.scr.annotations.Service;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080052import org.onlab.netty.InternalMessage;
53import org.onlab.netty.MessageDecoder;
54import org.onlab.netty.MessageEncoder;
55import org.onlab.util.Tools;
Madan Jampaniec1df022015-10-13 21:23:03 -070056import org.onosproject.cluster.ClusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070057import org.onosproject.cluster.ControllerNode;
58import org.onosproject.store.cluster.messaging.Endpoint;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080059import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Aaron Kruglikov1b727382016-02-09 16:17:47 -080063import javax.net.ssl.KeyManagerFactory;
64import javax.net.ssl.SSLContext;
65import javax.net.ssl.SSLEngine;
66import javax.net.ssl.TrustManagerFactory;
67import java.io.FileInputStream;
68import java.io.IOException;
69import java.security.KeyStore;
70import java.util.Map;
71import java.util.concurrent.CompletableFuture;
72import java.util.concurrent.ConcurrentHashMap;
73import java.util.concurrent.Executor;
74import java.util.concurrent.RejectedExecutionException;
75import java.util.concurrent.TimeUnit;
76import java.util.concurrent.TimeoutException;
77import java.util.concurrent.atomic.AtomicBoolean;
78import java.util.concurrent.atomic.AtomicLong;
79import java.util.function.BiConsumer;
80import java.util.function.BiFunction;
81import java.util.function.Consumer;
82
Madan Jampaniafeebbd2015-05-19 15:26:01 -070083/**
84 * Netty based MessagingService.
85 */
86@Component(immediate = true, enabled = true)
87@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -080088public class NettyMessagingManager implements MessagingService {
89
90 private static final short MIN_KS_LENGTH = 6;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070091
92 private final Logger log = LoggerFactory.getLogger(getClass());
93
Aaron Kruglikov1b727382016-02-09 16:17:47 -080094 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
95
96 private Endpoint localEp;
97 private int preamble;
98 private final AtomicBoolean started = new AtomicBoolean(false);
99 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
100 private final AtomicLong messageIdGenerator = new AtomicLong(0);
101 private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
102 .expireAfterWrite(10, TimeUnit.SECONDS)
103 .removalListener(new RemovalListener<Long, Callback>() {
104 @Override
105 public void onRemoval(RemovalNotification<Long, Callback> entry) {
106 if (entry.wasEvicted()) {
107 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
108 }
109 }
110 })
111 .build();
112
113 private final GenericKeyedObjectPool<Endpoint, Connection> channels
114 = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
115
116 private EventLoopGroup serverGroup;
117 private EventLoopGroup clientGroup;
118 private Class<? extends ServerChannel> serverChannelClass;
119 private Class<? extends Channel> clientChannelClass;
120
121 protected static final boolean TLS_DISABLED = false;
122 protected boolean enableNettyTls = TLS_DISABLED;
123
124 protected String ksLocation;
125 protected String tsLocation;
126 protected char[] ksPwd;
127 protected char[] tsPwd;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900128
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700130 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700131
132 @Activate
133 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700134 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800135 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800136
137 if (started.get()) {
138 log.warn("Already running at local endpoint: {}", localEp);
139 return;
140 }
141 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
142 this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
143 channels.setLifo(true);
144 channels.setTestOnBorrow(true);
145 channels.setTestOnReturn(true);
146 channels.setMinEvictableIdleTimeMillis(60_000L);
147 channels.setTimeBetweenEvictionRunsMillis(30_000L);
148 initEventLoopGroup();
149 startAcceptingConnections();
150 started.set(true);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700151 log.info("Started");
152 }
153
154 @Deactivate
155 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800156 if (started.get()) {
157 channels.close();
158 serverGroup.shutdownGracefully();
159 clientGroup.shutdownGracefully();
160 started.set(false);
161 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700162 log.info("Stopped");
163 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900164
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800165 private void getTlsParameters() {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900166 String tempString = System.getProperty("enableNettyTLS");
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800167 enableNettyTls = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString);
168 log.info("enableNettyTLS = {}", enableNettyTls);
169 if (enableNettyTls) {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900170 ksLocation = System.getProperty("javax.net.ssl.keyStore");
171 if (Strings.isNullOrEmpty(ksLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800172 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900173 return;
174 }
175 tsLocation = System.getProperty("javax.net.ssl.trustStore");
176 if (Strings.isNullOrEmpty(tsLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800177 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900178 return;
179 }
180 ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
181 if (MIN_KS_LENGTH > ksPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800182 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900183 return;
184 }
185 tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
186 if (MIN_KS_LENGTH > tsPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800187 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900188 return;
189 }
190 }
191 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800192 private void initEventLoopGroup() {
193 // try Epoll first and if that does work, use nio.
194 try {
195 clientGroup = new EpollEventLoopGroup();
196 serverGroup = new EpollEventLoopGroup();
197 serverChannelClass = EpollServerSocketChannel.class;
198 clientChannelClass = EpollSocketChannel.class;
199 return;
200 } catch (Throwable e) {
201 log.debug("Failed to initialize native (epoll) transport. "
202 + "Reason: {}. Proceeding with nio.", e.getMessage());
203 }
204 clientGroup = new NioEventLoopGroup();
205 serverGroup = new NioEventLoopGroup();
206 serverChannelClass = NioServerSocketChannel.class;
207 clientChannelClass = NioSocketChannel.class;
208 }
209
210 @Override
211 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
212 InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
213 localEp,
214 type,
215 payload);
216 return sendAsync(ep, message);
217 }
218
219 protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
220 if (ep.equals(localEp)) {
221 try {
222 dispatchLocally(message);
223 } catch (IOException e) {
224 return Tools.exceptionalFuture(e);
225 }
226 return CompletableFuture.completedFuture(null);
227 }
228
229 CompletableFuture<Void> future = new CompletableFuture<>();
230 try {
231 Connection connection = null;
232 try {
233 connection = channels.borrowObject(ep);
234 connection.send(message, future);
235 } finally {
236 channels.returnObject(ep, connection);
237 }
238 } catch (Exception e) {
239 future.completeExceptionally(e);
240 }
241 return future;
242 }
243
244 @Override
245 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
246 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
247 }
248
249 @Override
250 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
251 CompletableFuture<byte[]> response = new CompletableFuture<>();
252 Callback callback = new Callback(response, executor);
253 Long messageId = messageIdGenerator.incrementAndGet();
254 callbacks.put(messageId, callback);
255 InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
256 return sendAsync(ep, message).whenComplete((r, e) -> {
257 if (e != null) {
258 callbacks.invalidate(messageId);
259 }
260 }).thenCompose(v -> response);
261 }
262
263 @Override
264 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
265 handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
266 }
267
268 @Override
269 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
270 handlers.put(type, message -> executor.execute(() -> {
271 byte[] responsePayload = handler.apply(message.sender(), message.payload());
272 if (responsePayload != null) {
273 InternalMessage response = new InternalMessage(message.id(),
274 localEp,
275 REPLY_MESSAGE_TYPE,
276 responsePayload);
277 sendAsync(message.sender(), response).whenComplete((result, error) -> {
278 if (error != null) {
279 log.debug("Failed to respond", error);
280 }
281 });
282 }
283 }));
284 }
285
286 @Override
287 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
288 handlers.put(type, message -> {
289 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
290 if (error == null) {
291 InternalMessage response = new InternalMessage(message.id(),
292 localEp,
293 REPLY_MESSAGE_TYPE,
294 result);
295 sendAsync(message.sender(), response).whenComplete((r, e) -> {
296 if (e != null) {
297 log.debug("Failed to respond", e);
298 }
299 });
300 }
301 });
302 });
303 }
304
305 @Override
306 public void unregisterHandler(String type) {
307 handlers.remove(type);
308 }
309
310 private void startAcceptingConnections() throws InterruptedException {
311 ServerBootstrap b = new ServerBootstrap();
312 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
313 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
314 b.option(ChannelOption.SO_RCVBUF, 1048576);
315 b.option(ChannelOption.TCP_NODELAY, true);
316 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
317 b.group(serverGroup, clientGroup);
318 b.channel(serverChannelClass);
319 if (enableNettyTls) {
320 b.childHandler(new SslServerCommunicationChannelInitializer());
321 } else {
322 b.childHandler(new OnosCommunicationChannelInitializer());
323 }
324 b.option(ChannelOption.SO_BACKLOG, 128);
325 b.childOption(ChannelOption.SO_KEEPALIVE, true);
326
327 // Bind and start to accept incoming connections.
328 b.bind(localEp.port()).sync().addListener(future -> {
329 if (future.isSuccess()) {
330 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
331 } else {
332 log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
333 }
334 });
335 }
336
337 private class OnosCommunicationChannelFactory
338 implements KeyedPoolableObjectFactory<Endpoint, Connection> {
339
340 @Override
341 public void activateObject(Endpoint endpoint, Connection connection)
342 throws Exception {
343 }
344
345 @Override
346 public void destroyObject(Endpoint ep, Connection connection) throws Exception {
347 log.debug("Closing connection to {}", ep);
348 //Is this the right way to destroy?
349 connection.destroy();
350 }
351
352 @Override
353 public Connection makeObject(Endpoint ep) throws Exception {
354 Bootstrap bootstrap = new Bootstrap();
355 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
356 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
357 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
358 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
359 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
360 bootstrap.group(clientGroup);
361 // TODO: Make this faster:
362 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
363 bootstrap.channel(clientChannelClass);
364 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
365 if (enableNettyTls) {
366 bootstrap.handler(new SslClientCommunicationChannelInitializer());
367 } else {
368 bootstrap.handler(new OnosCommunicationChannelInitializer());
369 }
370 // Start the client.
371 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
372 ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
373
374 f.addListener(future -> {
375 if (future.isSuccess()) {
376 retFuture.complete(f.channel());
377 } else {
378 retFuture.completeExceptionally(future.cause());
379 }
380 });
381 log.debug("Established a new connection to {}", ep);
382 return new Connection(retFuture);
383 }
384
385 @Override
386 public void passivateObject(Endpoint ep, Connection connection)
387 throws Exception {
388 }
389
390 @Override
391 public boolean validateObject(Endpoint ep, Connection connection) {
392 return connection.validate();
393 }
394 }
395
396 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
397
398 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
399 private final ChannelHandler encoder = new MessageEncoder(preamble);
400
401 @Override
402 protected void initChannel(SocketChannel channel) throws Exception {
403 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
404 KeyStore ts = KeyStore.getInstance("JKS");
405 ts.load(new FileInputStream(tsLocation), tsPwd);
406 tmFactory.init(ts);
407
408 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
409 KeyStore ks = KeyStore.getInstance("JKS");
410 ks.load(new FileInputStream(ksLocation), ksPwd);
411 kmf.init(ks, ksPwd);
412
413 SSLContext serverContext = SSLContext.getInstance("TLS");
414 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
415
416 SSLEngine serverSslEngine = serverContext.createSSLEngine();
417
418 serverSslEngine.setNeedClientAuth(true);
419 serverSslEngine.setUseClientMode(false);
420 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
421 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
422 serverSslEngine.setEnableSessionCreation(true);
423
424 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
425 .addLast("encoder", encoder)
426 .addLast("decoder", new MessageDecoder(preamble))
427 .addLast("handler", dispatcher);
428 }
429 }
430
431 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
432
433 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
434 private final ChannelHandler encoder = new MessageEncoder(preamble);
435
436 @Override
437 protected void initChannel(SocketChannel channel) throws Exception {
438 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
439 KeyStore ts = KeyStore.getInstance("JKS");
440 ts.load(new FileInputStream(tsLocation), tsPwd);
441 tmFactory.init(ts);
442
443 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
444 KeyStore ks = KeyStore.getInstance("JKS");
445 ks.load(new FileInputStream(ksLocation), ksPwd);
446 kmf.init(ks, ksPwd);
447
448 SSLContext clientContext = SSLContext.getInstance("TLS");
449 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
450
451 SSLEngine clientSslEngine = clientContext.createSSLEngine();
452
453 clientSslEngine.setUseClientMode(true);
454 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
455 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
456 clientSslEngine.setEnableSessionCreation(true);
457
458 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
459 .addLast("encoder", encoder)
460 .addLast("decoder", new MessageDecoder(preamble))
461 .addLast("handler", dispatcher);
462 }
463 }
464
465 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
466
467 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
468 private final ChannelHandler encoder = new MessageEncoder(preamble);
469
470 @Override
471 protected void initChannel(SocketChannel channel) throws Exception {
472 channel.pipeline()
473 .addLast("encoder", encoder)
474 .addLast("decoder", new MessageDecoder(preamble))
475 .addLast("handler", dispatcher);
476 }
477 }
478
479 @ChannelHandler.Sharable
480 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
481
482 @Override
483 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
484 try {
485 dispatchLocally(message);
486 } catch (RejectedExecutionException e) {
487 log.warn("Unable to dispatch message due to {}", e.getMessage());
488 }
489 }
490
491 @Override
492 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
493 log.error("Exception inside channel handling pipeline.", cause);
494 context.close();
495 }
496 }
497 private void dispatchLocally(InternalMessage message) throws IOException {
498 String type = message.type();
499 if (REPLY_MESSAGE_TYPE.equals(type)) {
500 try {
501 Callback callback =
502 callbacks.getIfPresent(message.id());
503 if (callback != null) {
504 callback.complete(message.payload());
505 } else {
506 log.warn("Received a reply for message id:[{}]. "
507 + " from {}. But was unable to locate the"
508 + " request handle", message.id(), message.sender());
509 }
510 } finally {
511 callbacks.invalidate(message.id());
512 }
513 return;
514 }
515 Consumer<InternalMessage> handler = handlers.get(type);
516 if (handler != null) {
517 handler.accept(message);
518 } else {
519 log.debug("No handler registered for {}", type);
520 }
521 }
522
523 private final class Callback {
524 private final CompletableFuture<byte[]> future;
525 private final Executor executor;
526
527 public Callback(CompletableFuture<byte[]> future, Executor executor) {
528 this.future = future;
529 this.executor = executor;
530 }
531
532 public void complete(byte[] value) {
533 executor.execute(() -> future.complete(value));
534 }
535
536 public void completeExceptionally(Throwable error) {
537 executor.execute(() -> future.completeExceptionally(error));
538 }
539 }
540 private final class Connection {
541 private final CompletableFuture<Channel> internalFuture;
542
543 public Connection(CompletableFuture<Channel> internalFuture) {
544 this.internalFuture = internalFuture;
545 }
546
547 /**
548 * Sends a message out on its channel and associated the message with a
549 * completable future used for signaling.
550 * @param message the message to be sent
551 * @param future a future that is completed normally or exceptionally if
552 * message sending succeeds or fails respectively
553 */
554 public void send(Object message, CompletableFuture<Void> future) {
555 internalFuture.whenComplete((channel, throwable) -> {
556 if (throwable == null) {
557 channel.writeAndFlush(message).addListener(channelFuture -> {
558 if (!channelFuture.isSuccess()) {
559 future.completeExceptionally(channelFuture.cause());
560 } else {
561 future.complete(null);
562 }
563 });
564 } else {
565 future.completeExceptionally(throwable);
566 }
567 });
568 }
569
570 /**
571 * Destroys a channel by closing its channel (if it exists) and
572 * cancelling its future.
573 */
574 public void destroy() {
575 Channel channel = internalFuture.getNow(null);
576 if (channel != null) {
577 channel.close();
578 }
579 internalFuture.cancel(false);
580 }
581
582 /**
583 * Determines whether the connection is valid meaning it is either
584 * complete with and active channel
585 * or it has not yet completed.
586 * @return true if the channel has an active connection or has not
587 * yet completed
588 */
589 public boolean validate() {
590 if (internalFuture.isCompletedExceptionally()) {
591 return false;
592 }
593 Channel channel = internalFuture.getNow(null);
594 return channel == null || channel.isActive();
595 }
596 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900597}