blob: 710665f1e9d6756b1024ff28566acc0dc99b1ef8 [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
JunHuy Lam39eb4292015-06-26 17:24:23 +090018import com.google.common.base.Strings;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.util.concurrent.MoreExecutors;
Madan Jampania9e70a62016-03-02 16:28:18 -080024
Aaron Kruglikov1b727382016-02-09 16:17:47 -080025import io.netty.bootstrap.Bootstrap;
26import io.netty.bootstrap.ServerBootstrap;
27import io.netty.buffer.PooledByteBufAllocator;
28import io.netty.channel.Channel;
29import io.netty.channel.ChannelFuture;
30import io.netty.channel.ChannelHandler;
31import io.netty.channel.ChannelHandlerContext;
32import io.netty.channel.ChannelInitializer;
33import io.netty.channel.ChannelOption;
34import io.netty.channel.EventLoopGroup;
35import io.netty.channel.ServerChannel;
36import io.netty.channel.SimpleChannelInboundHandler;
37import io.netty.channel.epoll.EpollEventLoopGroup;
38import io.netty.channel.epoll.EpollServerSocketChannel;
39import io.netty.channel.epoll.EpollSocketChannel;
40import io.netty.channel.nio.NioEventLoopGroup;
41import io.netty.channel.socket.SocketChannel;
42import io.netty.channel.socket.nio.NioServerSocketChannel;
43import io.netty.channel.socket.nio.NioSocketChannel;
Madan Jampania9e70a62016-03-02 16:28:18 -080044
Aaron Kruglikov1b727382016-02-09 16:17:47 -080045import org.apache.commons.pool.KeyedPoolableObjectFactory;
46import org.apache.commons.pool.impl.GenericKeyedObjectPool;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070047import org.apache.felix.scr.annotations.Activate;
48import org.apache.felix.scr.annotations.Component;
49import org.apache.felix.scr.annotations.Deactivate;
50import org.apache.felix.scr.annotations.Reference;
51import org.apache.felix.scr.annotations.ReferenceCardinality;
52import org.apache.felix.scr.annotations.Service;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080053import org.onlab.util.Tools;
Madan Jampaniec1df022015-10-13 21:23:03 -070054import org.onosproject.cluster.ClusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070055import org.onosproject.cluster.ControllerNode;
56import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampania9e70a62016-03-02 16:28:18 -080057import org.onosproject.store.cluster.messaging.MessagingException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080058import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampania9e70a62016-03-02 16:28:18 -080059import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Aaron Kruglikov1b727382016-02-09 16:17:47 -080063import javax.net.ssl.KeyManagerFactory;
64import javax.net.ssl.SSLContext;
65import javax.net.ssl.SSLEngine;
66import javax.net.ssl.TrustManagerFactory;
Madan Jampania9e70a62016-03-02 16:28:18 -080067
Aaron Kruglikov1b727382016-02-09 16:17:47 -080068import java.io.FileInputStream;
69import java.io.IOException;
70import java.security.KeyStore;
71import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080072import java.util.Optional;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080073import java.util.concurrent.CompletableFuture;
74import java.util.concurrent.ConcurrentHashMap;
75import java.util.concurrent.Executor;
76import java.util.concurrent.RejectedExecutionException;
77import java.util.concurrent.TimeUnit;
78import java.util.concurrent.TimeoutException;
79import java.util.concurrent.atomic.AtomicBoolean;
80import java.util.concurrent.atomic.AtomicLong;
81import java.util.function.BiConsumer;
82import java.util.function.BiFunction;
83import java.util.function.Consumer;
84
Heedo Kang4a47a302016-02-29 17:40:23 +090085import static org.onosproject.security.AppGuard.checkPermission;
86import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
87
Madan Jampaniafeebbd2015-05-19 15:26:01 -070088/**
89 * Netty based MessagingService.
90 */
91@Component(immediate = true, enabled = true)
92@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -080093public class NettyMessagingManager implements MessagingService {
94
Madan Jampanif5c38a72016-02-17 18:26:15 -080095 private static final int REPLY_TIME_OUT_SEC = 2;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080096 private static final short MIN_KS_LENGTH = 6;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070097
98 private final Logger log = LoggerFactory.getLogger(getClass());
99
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800100 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
101
102 private Endpoint localEp;
103 private int preamble;
104 private final AtomicBoolean started = new AtomicBoolean(false);
105 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
106 private final AtomicLong messageIdGenerator = new AtomicLong(0);
107 private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
Madan Jampanif5c38a72016-02-17 18:26:15 -0800108 .expireAfterWrite(REPLY_TIME_OUT_SEC, TimeUnit.SECONDS)
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800109 .removalListener(new RemovalListener<Long, Callback>() {
110 @Override
111 public void onRemoval(RemovalNotification<Long, Callback> entry) {
112 if (entry.wasEvicted()) {
113 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
114 }
115 }
116 })
117 .build();
118
119 private final GenericKeyedObjectPool<Endpoint, Connection> channels
120 = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
121
122 private EventLoopGroup serverGroup;
123 private EventLoopGroup clientGroup;
124 private Class<? extends ServerChannel> serverChannelClass;
125 private Class<? extends Channel> clientChannelClass;
126
127 protected static final boolean TLS_DISABLED = false;
128 protected boolean enableNettyTls = TLS_DISABLED;
129
130 protected String ksLocation;
131 protected String tsLocation;
132 protected char[] ksPwd;
133 protected char[] tsPwd;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900134
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700136 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700137
138 @Activate
139 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700140 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800141 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800142
143 if (started.get()) {
144 log.warn("Already running at local endpoint: {}", localEp);
145 return;
146 }
147 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
148 this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
149 channels.setLifo(true);
150 channels.setTestOnBorrow(true);
151 channels.setTestOnReturn(true);
152 channels.setMinEvictableIdleTimeMillis(60_000L);
153 channels.setTimeBetweenEvictionRunsMillis(30_000L);
154 initEventLoopGroup();
155 startAcceptingConnections();
156 started.set(true);
Madan Jampanif5c38a72016-02-17 18:26:15 -0800157 serverGroup.scheduleWithFixedDelay(callbacks::cleanUp, 0, REPLY_TIME_OUT_SEC, TimeUnit.SECONDS);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700158 log.info("Started");
159 }
160
161 @Deactivate
162 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800163 if (started.get()) {
164 channels.close();
165 serverGroup.shutdownGracefully();
166 clientGroup.shutdownGracefully();
167 started.set(false);
168 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700169 log.info("Stopped");
170 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900171
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800172 private void getTlsParameters() {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900173 String tempString = System.getProperty("enableNettyTLS");
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800174 enableNettyTls = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString);
175 log.info("enableNettyTLS = {}", enableNettyTls);
176 if (enableNettyTls) {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900177 ksLocation = System.getProperty("javax.net.ssl.keyStore");
178 if (Strings.isNullOrEmpty(ksLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800179 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900180 return;
181 }
182 tsLocation = System.getProperty("javax.net.ssl.trustStore");
183 if (Strings.isNullOrEmpty(tsLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800184 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900185 return;
186 }
187 ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
188 if (MIN_KS_LENGTH > ksPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800189 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900190 return;
191 }
192 tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
193 if (MIN_KS_LENGTH > tsPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800194 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900195 return;
196 }
197 }
198 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800199 private void initEventLoopGroup() {
200 // try Epoll first and if that does work, use nio.
201 try {
202 clientGroup = new EpollEventLoopGroup();
203 serverGroup = new EpollEventLoopGroup();
204 serverChannelClass = EpollServerSocketChannel.class;
205 clientChannelClass = EpollSocketChannel.class;
206 return;
207 } catch (Throwable e) {
208 log.debug("Failed to initialize native (epoll) transport. "
209 + "Reason: {}. Proceeding with nio.", e.getMessage());
210 }
211 clientGroup = new NioEventLoopGroup();
212 serverGroup = new NioEventLoopGroup();
213 serverChannelClass = NioServerSocketChannel.class;
214 clientChannelClass = NioSocketChannel.class;
215 }
216
217 @Override
218 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900219 checkPermission(CLUSTER_WRITE);
Madan Jampanib825aeb2016-04-01 15:18:25 -0700220 InternalMessage message = new InternalMessage(preamble,
221 messageIdGenerator.incrementAndGet(),
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800222 localEp,
223 type,
224 payload);
225 return sendAsync(ep, message);
226 }
227
228 protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900229 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800230 if (ep.equals(localEp)) {
231 try {
232 dispatchLocally(message);
233 } catch (IOException e) {
234 return Tools.exceptionalFuture(e);
235 }
236 return CompletableFuture.completedFuture(null);
237 }
238
239 CompletableFuture<Void> future = new CompletableFuture<>();
240 try {
241 Connection connection = null;
242 try {
243 connection = channels.borrowObject(ep);
244 connection.send(message, future);
245 } finally {
246 channels.returnObject(ep, connection);
247 }
248 } catch (Exception e) {
249 future.completeExceptionally(e);
250 }
251 return future;
252 }
253
254 @Override
255 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900256 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800257 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
258 }
259
260 @Override
261 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900262 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800263 CompletableFuture<byte[]> response = new CompletableFuture<>();
264 Callback callback = new Callback(response, executor);
265 Long messageId = messageIdGenerator.incrementAndGet();
266 callbacks.put(messageId, callback);
Madan Jampanib825aeb2016-04-01 15:18:25 -0700267 InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800268 return sendAsync(ep, message).whenComplete((r, e) -> {
269 if (e != null) {
270 callbacks.invalidate(messageId);
271 }
272 }).thenCompose(v -> response);
273 }
274
275 @Override
276 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900277 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800278 handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
279 }
280
281 @Override
282 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900283 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800284 handlers.put(type, message -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800285 byte[] responsePayload = null;
286 Status status = Status.OK;
287 try {
288 responsePayload = handler.apply(message.sender(), message.payload());
289 } catch (Exception e) {
290 status = Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800291 }
Madan Jampania9e70a62016-03-02 16:28:18 -0800292 sendReply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800293 }));
294 }
295
296 @Override
297 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900298 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800299 handlers.put(type, message -> {
300 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800301 Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
302 sendReply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800303 });
304 });
305 }
306
307 @Override
308 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900309 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800310 handlers.remove(type);
311 }
312
313 private void startAcceptingConnections() throws InterruptedException {
314 ServerBootstrap b = new ServerBootstrap();
315 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
316 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
317 b.option(ChannelOption.SO_RCVBUF, 1048576);
318 b.option(ChannelOption.TCP_NODELAY, true);
319 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
320 b.group(serverGroup, clientGroup);
321 b.channel(serverChannelClass);
322 if (enableNettyTls) {
323 b.childHandler(new SslServerCommunicationChannelInitializer());
324 } else {
325 b.childHandler(new OnosCommunicationChannelInitializer());
326 }
327 b.option(ChannelOption.SO_BACKLOG, 128);
328 b.childOption(ChannelOption.SO_KEEPALIVE, true);
329
330 // Bind and start to accept incoming connections.
331 b.bind(localEp.port()).sync().addListener(future -> {
332 if (future.isSuccess()) {
333 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
334 } else {
335 log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
336 }
337 });
338 }
339
340 private class OnosCommunicationChannelFactory
341 implements KeyedPoolableObjectFactory<Endpoint, Connection> {
342
343 @Override
344 public void activateObject(Endpoint endpoint, Connection connection)
345 throws Exception {
346 }
347
348 @Override
349 public void destroyObject(Endpoint ep, Connection connection) throws Exception {
350 log.debug("Closing connection to {}", ep);
351 //Is this the right way to destroy?
352 connection.destroy();
353 }
354
355 @Override
356 public Connection makeObject(Endpoint ep) throws Exception {
357 Bootstrap bootstrap = new Bootstrap();
358 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
359 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
360 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
361 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
362 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
363 bootstrap.group(clientGroup);
364 // TODO: Make this faster:
365 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
366 bootstrap.channel(clientChannelClass);
367 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
368 if (enableNettyTls) {
369 bootstrap.handler(new SslClientCommunicationChannelInitializer());
370 } else {
371 bootstrap.handler(new OnosCommunicationChannelInitializer());
372 }
373 // Start the client.
374 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
375 ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
376
377 f.addListener(future -> {
378 if (future.isSuccess()) {
379 retFuture.complete(f.channel());
380 } else {
381 retFuture.completeExceptionally(future.cause());
382 }
383 });
384 log.debug("Established a new connection to {}", ep);
385 return new Connection(retFuture);
386 }
387
388 @Override
389 public void passivateObject(Endpoint ep, Connection connection)
390 throws Exception {
391 }
392
393 @Override
394 public boolean validateObject(Endpoint ep, Connection connection) {
395 return connection.validate();
396 }
397 }
398
399 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
400
401 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
402 private final ChannelHandler encoder = new MessageEncoder(preamble);
403
404 @Override
405 protected void initChannel(SocketChannel channel) throws Exception {
406 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
407 KeyStore ts = KeyStore.getInstance("JKS");
408 ts.load(new FileInputStream(tsLocation), tsPwd);
409 tmFactory.init(ts);
410
411 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
412 KeyStore ks = KeyStore.getInstance("JKS");
413 ks.load(new FileInputStream(ksLocation), ksPwd);
414 kmf.init(ks, ksPwd);
415
416 SSLContext serverContext = SSLContext.getInstance("TLS");
417 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
418
419 SSLEngine serverSslEngine = serverContext.createSSLEngine();
420
421 serverSslEngine.setNeedClientAuth(true);
422 serverSslEngine.setUseClientMode(false);
423 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
424 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
425 serverSslEngine.setEnableSessionCreation(true);
426
427 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
428 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700429 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800430 .addLast("handler", dispatcher);
431 }
432 }
433
434 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
435
436 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
437 private final ChannelHandler encoder = new MessageEncoder(preamble);
438
439 @Override
440 protected void initChannel(SocketChannel channel) throws Exception {
441 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
442 KeyStore ts = KeyStore.getInstance("JKS");
443 ts.load(new FileInputStream(tsLocation), tsPwd);
444 tmFactory.init(ts);
445
446 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
447 KeyStore ks = KeyStore.getInstance("JKS");
448 ks.load(new FileInputStream(ksLocation), ksPwd);
449 kmf.init(ks, ksPwd);
450
451 SSLContext clientContext = SSLContext.getInstance("TLS");
452 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
453
454 SSLEngine clientSslEngine = clientContext.createSSLEngine();
455
456 clientSslEngine.setUseClientMode(true);
457 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
458 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
459 clientSslEngine.setEnableSessionCreation(true);
460
461 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
462 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700463 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800464 .addLast("handler", dispatcher);
465 }
466 }
467
468 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
469
470 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
471 private final ChannelHandler encoder = new MessageEncoder(preamble);
472
473 @Override
474 protected void initChannel(SocketChannel channel) throws Exception {
475 channel.pipeline()
476 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700477 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800478 .addLast("handler", dispatcher);
479 }
480 }
481
482 @ChannelHandler.Sharable
483 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
484
485 @Override
486 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
487 try {
488 dispatchLocally(message);
489 } catch (RejectedExecutionException e) {
490 log.warn("Unable to dispatch message due to {}", e.getMessage());
491 }
492 }
493
494 @Override
495 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
496 log.error("Exception inside channel handling pipeline.", cause);
497 context.close();
498 }
499 }
500 private void dispatchLocally(InternalMessage message) throws IOException {
Madan Jampanib825aeb2016-04-01 15:18:25 -0700501 if (message.preamble() != preamble) {
502 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
503 sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
504 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800505 String type = message.type();
506 if (REPLY_MESSAGE_TYPE.equals(type)) {
507 try {
508 Callback callback =
509 callbacks.getIfPresent(message.id());
510 if (callback != null) {
Madan Jampania9e70a62016-03-02 16:28:18 -0800511 if (message.status() == Status.OK) {
512 callback.complete(message.payload());
513 } else if (message.status() == Status.ERROR_NO_HANDLER) {
514 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
515 } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
516 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Madan Jampanib825aeb2016-04-01 15:18:25 -0700517 } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
518 callback.completeExceptionally(new MessagingException.ProcotolException());
Madan Jampania9e70a62016-03-02 16:28:18 -0800519 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800520 } else {
Madan Jampania9e70a62016-03-02 16:28:18 -0800521 log.debug("Received a reply for message id:[{}]. "
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800522 + " from {}. But was unable to locate the"
523 + " request handle", message.id(), message.sender());
524 }
525 } finally {
526 callbacks.invalidate(message.id());
527 }
528 return;
529 }
530 Consumer<InternalMessage> handler = handlers.get(type);
531 if (handler != null) {
532 handler.accept(message);
533 } else {
Madan Jampania9e70a62016-03-02 16:28:18 -0800534 log.debug("No handler for message type {}", message.type(), message.sender());
535 sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800536 }
537 }
538
Madan Jampania9e70a62016-03-02 16:28:18 -0800539 private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
Madan Jampanib825aeb2016-04-01 15:18:25 -0700540 InternalMessage response = new InternalMessage(preamble,
541 message.id(),
Madan Jampania9e70a62016-03-02 16:28:18 -0800542 localEp,
543 REPLY_MESSAGE_TYPE,
544 responsePayload.orElse(new byte[0]),
545 status);
546 sendAsync(message.sender(), response).whenComplete((result, error) -> {
547 if (error != null) {
548 log.debug("Failed to respond", error);
549 }
550 });
551 }
552
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800553 private final class Callback {
554 private final CompletableFuture<byte[]> future;
555 private final Executor executor;
556
557 public Callback(CompletableFuture<byte[]> future, Executor executor) {
558 this.future = future;
559 this.executor = executor;
560 }
561
562 public void complete(byte[] value) {
563 executor.execute(() -> future.complete(value));
564 }
565
566 public void completeExceptionally(Throwable error) {
567 executor.execute(() -> future.completeExceptionally(error));
568 }
569 }
570 private final class Connection {
571 private final CompletableFuture<Channel> internalFuture;
572
573 public Connection(CompletableFuture<Channel> internalFuture) {
574 this.internalFuture = internalFuture;
575 }
576
577 /**
578 * Sends a message out on its channel and associated the message with a
579 * completable future used for signaling.
580 * @param message the message to be sent
581 * @param future a future that is completed normally or exceptionally if
582 * message sending succeeds or fails respectively
583 */
584 public void send(Object message, CompletableFuture<Void> future) {
585 internalFuture.whenComplete((channel, throwable) -> {
586 if (throwable == null) {
587 channel.writeAndFlush(message).addListener(channelFuture -> {
588 if (!channelFuture.isSuccess()) {
589 future.completeExceptionally(channelFuture.cause());
590 } else {
591 future.complete(null);
592 }
593 });
594 } else {
595 future.completeExceptionally(throwable);
596 }
597 });
598 }
599
600 /**
601 * Destroys a channel by closing its channel (if it exists) and
602 * cancelling its future.
603 */
604 public void destroy() {
605 Channel channel = internalFuture.getNow(null);
606 if (channel != null) {
607 channel.close();
608 }
609 internalFuture.cancel(false);
610 }
611
612 /**
613 * Determines whether the connection is valid meaning it is either
614 * complete with and active channel
615 * or it has not yet completed.
616 * @return true if the channel has an active connection or has not
617 * yet completed
618 */
619 public boolean validate() {
620 if (internalFuture.isCompletedExceptionally()) {
621 return false;
622 }
623 Channel channel = internalFuture.getNow(null);
624 return channel == null || channel.isActive();
625 }
626 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900627}