blob: 1cd7ca7b1c837ea02c36ba7925d43c414f979056 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
Madan Jampaniab6d3112014-10-02 16:30:14 -070016package org.onlab.netty;
17
Jonathan Hartd9df7bd2015-11-10 17:10:25 -080018import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalListener;
21import com.google.common.cache.RemovalNotification;
Madan Jampaniab6d3112014-10-02 16:30:14 -070022import io.netty.bootstrap.Bootstrap;
23import io.netty.bootstrap.ServerBootstrap;
24import io.netty.buffer.PooledByteBufAllocator;
25import io.netty.channel.Channel;
26import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070027import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070028import io.netty.channel.ChannelHandlerContext;
29import io.netty.channel.ChannelInitializer;
30import io.netty.channel.ChannelOption;
31import io.netty.channel.EventLoopGroup;
Madan Jampani824a7c12014-10-21 09:46:15 -070032import io.netty.channel.ServerChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070033import io.netty.channel.SimpleChannelInboundHandler;
Madan Jampani824a7c12014-10-21 09:46:15 -070034import io.netty.channel.epoll.EpollEventLoopGroup;
35import io.netty.channel.epoll.EpollServerSocketChannel;
36import io.netty.channel.epoll.EpollSocketChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070037import io.netty.channel.nio.NioEventLoopGroup;
38import io.netty.channel.socket.SocketChannel;
39import io.netty.channel.socket.nio.NioServerSocketChannel;
40import io.netty.channel.socket.nio.NioSocketChannel;
Jonathan Hartd9df7bd2015-11-10 17:10:25 -080041import org.apache.commons.pool.KeyedPoolableObjectFactory;
42import org.apache.commons.pool.impl.GenericKeyedObjectPool;
43import org.onosproject.store.cluster.messaging.Endpoint;
44import org.onosproject.store.cluster.messaging.MessagingService;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
Madan Jampaniab6d3112014-10-02 16:30:14 -070047
Jonathan Hartd9df7bd2015-11-10 17:10:25 -080048import javax.net.ssl.KeyManagerFactory;
49import javax.net.ssl.SSLContext;
50import javax.net.ssl.SSLEngine;
51import javax.net.ssl.TrustManagerFactory;
JunHuy Lam39eb4292015-06-26 17:24:23 +090052import java.io.FileInputStream;
Madan Jampani348a9fe2014-11-09 01:37:51 -080053import java.io.IOException;
JunHuy Lam39eb4292015-06-26 17:24:23 +090054import java.security.KeyStore;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070055import java.util.Map;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070056import java.util.concurrent.CompletableFuture;
Madan Jampani348a9fe2014-11-09 01:37:51 -080057import java.util.concurrent.ConcurrentHashMap;
Madan Jampaniec5ae342015-04-13 15:43:10 -070058import java.util.concurrent.Executor;
Thomas Vachuskafc52fec2015-05-18 19:13:56 -070059import java.util.concurrent.RejectedExecutionException;
Madan Jampani348a9fe2014-11-09 01:37:51 -080060import java.util.concurrent.TimeUnit;
61import java.util.concurrent.TimeoutException;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070062import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani348a9fe2014-11-09 01:37:51 -080063import java.util.concurrent.atomic.AtomicLong;
Madan Jampanic26eede2015-04-16 11:42:16 -070064import java.util.function.Consumer;
65import java.util.function.Function;
Madan Jampani348a9fe2014-11-09 01:37:51 -080066
Madan Jampaniab6d3112014-10-02 16:30:14 -070067/**
Madan Jampanic26eede2015-04-16 11:42:16 -070068 * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
Madan Jampaniab6d3112014-10-02 16:30:14 -070069 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070070public class NettyMessaging implements MessagingService {
Madan Jampaniab6d3112014-10-02 16:30:14 -070071
72 private final Logger log = LoggerFactory.getLogger(getClass());
73
Madan Jampanic26eede2015-04-16 11:42:16 -070074 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
75
Madan Jampaniafeebbd2015-05-19 15:26:01 -070076 private Endpoint localEp;
77 private final AtomicBoolean started = new AtomicBoolean(false);
78 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
Madan Jampani24f9efb2014-10-24 18:56:23 -070079 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Madan Jampani2bfa94c2015-04-11 05:03:49 -070080 private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070081 .expireAfterWrite(10, TimeUnit.SECONDS)
Madan Jampani2bfa94c2015-04-11 05:03:49 -070082 .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070083 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -070084 public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
Yuta HIGUCHIc611d922015-02-10 16:07:38 -080085 if (entry.wasEvicted()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -070086 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
Yuta HIGUCHIc611d922015-02-10 16:07:38 -080087 }
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070088 }
89 })
Madan Jampaniddf76222014-10-04 23:48:44 -070090 .build();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080091
Madan Jampaniddf76222014-10-04 23:48:44 -070092 private final GenericKeyedObjectPool<Endpoint, Channel> channels
93 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -070094
Madan Jampani824a7c12014-10-21 09:46:15 -070095 private EventLoopGroup serverGroup;
96 private EventLoopGroup clientGroup;
97 private Class<? extends ServerChannel> serverChannelClass;
98 private Class<? extends Channel> clientChannelClass;
99
JunHuy Lam39eb4292015-06-26 17:24:23 +0900100 protected static final boolean TLS_DISABLED = false;
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800101 protected boolean enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900102
103 protected String ksLocation;
104 protected String tsLocation;
105 protected char[] ksPwd;
106 protected char[] tsPwd;
107
Madan Jampani5e83f332014-10-20 15:35:09 -0700108 private void initEventLoopGroup() {
Madan Jampani824a7c12014-10-21 09:46:15 -0700109 // try Epoll first and if that does work, use nio.
Madan Jampani824a7c12014-10-21 09:46:15 -0700110 try {
Madan Jampani99e9fe22014-10-21 13:47:12 -0700111 clientGroup = new EpollEventLoopGroup();
112 serverGroup = new EpollEventLoopGroup();
113 serverChannelClass = EpollServerSocketChannel.class;
114 clientChannelClass = EpollSocketChannel.class;
115 return;
Thomas Vachuska2c2b7682015-05-05 11:59:21 -0700116 } catch (Throwable e) {
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700117 log.debug("Failed to initialize native (epoll) transport. "
118 + "Reason: {}. Proceeding with nio.", e.getMessage());
Madan Jampani824a7c12014-10-21 09:46:15 -0700119 }
120 clientGroup = new NioEventLoopGroup();
121 serverGroup = new NioEventLoopGroup();
122 serverChannelClass = NioServerSocketChannel.class;
123 clientChannelClass = NioSocketChannel.class;
Madan Jampani5e83f332014-10-20 15:35:09 -0700124 }
125
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700126 public void start(Endpoint localEp) throws Exception {
127 if (started.get()) {
128 log.warn("Already running at local endpoint: {}", localEp);
129 return;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700130 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700131 this.localEp = localEp;
Madan Jampanicca4bdb2015-05-22 13:09:33 -0700132 channels.setLifo(true);
Madan Jampani86ed0552014-10-03 16:45:42 -0700133 channels.setTestOnBorrow(true);
134 channels.setTestOnReturn(true);
Madan Jampanicca4bdb2015-05-22 13:09:33 -0700135 channels.setMinEvictableIdleTimeMillis(60_000L);
136 channels.setTimeBetweenEvictionRunsMillis(30_000L);
Madan Jampani5e83f332014-10-20 15:35:09 -0700137 initEventLoopGroup();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700138 startAcceptingConnections();
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700139 started.set(true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700140 }
141
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700142 public void stop() throws Exception {
143 if (started.get()) {
144 channels.close();
145 serverGroup.shutdownGracefully();
146 clientGroup.shutdownGracefully();
147 started.set(false);
148 }
Madan Jampani87100932014-10-21 16:46:12 -0700149 }
150
Madan Jampaniab6d3112014-10-02 16:30:14 -0700151 @Override
Madan Jampani175e8fd2015-05-20 14:10:45 -0700152 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700153 InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
154 localEp,
155 type,
156 payload);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700157 return sendAsync(ep, message);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700158 }
159
Madan Jampani175e8fd2015-05-20 14:10:45 -0700160 protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
161 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700162 try {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700163 if (ep.equals(localEp)) {
164 dispatchLocally(message);
165 future.complete(null);
166 } else {
167 Channel channel = null;
168 try {
169 channel = channels.borrowObject(ep);
170 channel.writeAndFlush(message).addListener(channelFuture -> {
171 if (!channelFuture.isSuccess()) {
172 future.completeExceptionally(channelFuture.cause());
173 } else {
174 future.complete(null);
175 }
176 });
177 } finally {
178 channels.returnObject(ep, channel);
179 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700180 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700181 } catch (Exception e) {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700182 future.completeExceptionally(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700183 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700184 return future;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700185 }
186
187 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700188 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
189 CompletableFuture<byte[]> response = new CompletableFuture<>();
Madan Jampani24f9efb2014-10-24 18:56:23 -0700190 Long messageId = messageIdGenerator.incrementAndGet();
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700191 responseFutures.put(messageId, response);
Madan Jampanic26eede2015-04-16 11:42:16 -0700192 InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
Madan Jampani15cd0b82014-10-28 08:40:23 -0700193 try {
194 sendAsync(ep, message);
Madan Jampani5f9ec9a2014-10-29 13:45:52 -0700195 } catch (Exception e) {
Madan Jampani15cd0b82014-10-28 08:40:23 -0700196 responseFutures.invalidate(messageId);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700197 response.completeExceptionally(e);
Madan Jampani15cd0b82014-10-28 08:40:23 -0700198 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700199 return response;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700200 }
201
202 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700203 public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
204 handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
Madan Jampaniab6d3112014-10-02 16:30:14 -0700205 }
206
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700207 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700208 public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
209 handlers.put(type, message -> executor.execute(() -> {
210 byte[] responsePayload = handler.apply(message.payload());
211 if (responsePayload != null) {
212 InternalMessage response = new InternalMessage(message.id(),
213 localEp,
214 REPLY_MESSAGE_TYPE,
215 responsePayload);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700216 sendAsync(message.sender(), response).whenComplete((result, error) -> {
217 if (error != null) {
218 log.debug("Failed to respond", error);
219 }
220 });
Madan Jampani2af244a2015-02-22 13:12:01 -0800221 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700222 }));
Madan Jampani2af244a2015-02-22 13:12:01 -0800223 }
224
225 @Override
Madan Jampani27b69c62015-05-15 15:49:02 -0700226 public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
227 handlers.put(type, message -> {
228 handler.apply(message.payload()).whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700229 if (error == null) {
230 InternalMessage response = new InternalMessage(message.id(),
JunHuy Lam39eb4292015-06-26 17:24:23 +0900231 localEp,
232 REPLY_MESSAGE_TYPE,
233 result);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700234 sendAsync(message.sender(), response).whenComplete((r, e) -> {
235 if (e != null) {
236 log.debug("Failed to respond", e);
237 }
238 });
Madan Jampani27b69c62015-05-15 15:49:02 -0700239 }
Madan Jampani27b69c62015-05-15 15:49:02 -0700240 });
241 });
242 }
243
244 @Override
Madan Jampaniab6d3112014-10-02 16:30:14 -0700245 public void unregisterHandler(String type) {
Madan Jampani49115e92015-03-14 10:43:33 -0700246 handlers.remove(type);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700247 }
248
Madan Jampaniab6d3112014-10-02 16:30:14 -0700249 private void startAcceptingConnections() throws InterruptedException {
250 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700251 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700252 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
Madan Jampanic26eede2015-04-16 11:42:16 -0700253 b.option(ChannelOption.SO_RCVBUF, 1048576);
254 b.option(ChannelOption.TCP_NODELAY, true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700255 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900256 b.group(serverGroup, clientGroup);
257 b.channel(serverChannelClass);
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800258 if (enableNettyTls) {
259 b.childHandler(new SslServerCommunicationChannelInitializer());
JunHuy Lam39eb4292015-06-26 17:24:23 +0900260 } else {
261 b.childHandler(new OnosCommunicationChannelInitializer());
262 }
263 b.option(ChannelOption.SO_BACKLOG, 128);
264 b.childOption(ChannelOption.SO_KEEPALIVE, true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700265
266 // Bind and start to accept incoming connections.
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700267 b.bind(localEp.port()).sync().addListener(future -> {
268 if (future.isSuccess()) {
269 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
270 } else {
271 log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
272 }
273 });
Madan Jampaniab6d3112014-10-02 16:30:14 -0700274 }
275
276 private class OnosCommunicationChannelFactory
277 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
278
279 @Override
280 public void activateObject(Endpoint endpoint, Channel channel)
281 throws Exception {
282 }
283
284 @Override
285 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
Madan Jampani8a0569e2015-05-26 12:06:00 -0700286 log.debug("Closing connection to {}", ep);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700287 channel.close();
288 }
289
290 @Override
291 public Channel makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700292 Bootstrap bootstrap = new Bootstrap();
293 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Madan Jampanic26eede2015-04-16 11:42:16 -0700294 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
295 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
296 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
Madan Jampani824a7c12014-10-21 09:46:15 -0700297 bootstrap.group(clientGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700298 // TODO: Make this faster:
299 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampani824a7c12014-10-21 09:46:15 -0700300 bootstrap.channel(clientChannelClass);
Madan Jampaniddf76222014-10-04 23:48:44 -0700301 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800302 if (enableNettyTls) {
303 bootstrap.handler(new SslClientCommunicationChannelInitializer());
JunHuy Lam39eb4292015-06-26 17:24:23 +0900304 } else {
305 bootstrap.handler(new OnosCommunicationChannelInitializer());
306 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700307 // Start the client.
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800308 ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
Madan Jampani8a0569e2015-05-26 12:06:00 -0700309 log.debug("Established a new connection to {}", ep);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700310 return f.channel();
311 }
312
313 @Override
314 public void passivateObject(Endpoint ep, Channel channel)
315 throws Exception {
316 }
317
318 @Override
319 public boolean validateObject(Endpoint ep, Channel channel) {
320 return channel.isOpen();
321 }
322 }
323
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800324 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900325
326 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
327 private final ChannelHandler encoder = new MessageEncoder();
328
329 @Override
330 protected void initChannel(SocketChannel channel) throws Exception {
331 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
332 KeyStore ts = KeyStore.getInstance("JKS");
333 ts.load(new FileInputStream(tsLocation), tsPwd);
334 tmFactory.init(ts);
335
336 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
337 KeyStore ks = KeyStore.getInstance("JKS");
338 ks.load(new FileInputStream(ksLocation), ksPwd);
339 kmf.init(ks, ksPwd);
340
341 SSLContext serverContext = SSLContext.getInstance("TLS");
342 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
343
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800344 SSLEngine serverSslEngine = serverContext.createSSLEngine();
JunHuy Lam39eb4292015-06-26 17:24:23 +0900345
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800346 serverSslEngine.setNeedClientAuth(true);
347 serverSslEngine.setUseClientMode(false);
348 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
349 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
350 serverSslEngine.setEnableSessionCreation(true);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900351
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800352 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
JunHuy Lam39eb4292015-06-26 17:24:23 +0900353 .addLast("encoder", encoder)
354 .addLast("decoder", new MessageDecoder())
355 .addLast("handler", dispatcher);
356 }
357
358 }
359
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800360 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900361
362 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
363 private final ChannelHandler encoder = new MessageEncoder();
364
365 @Override
366 protected void initChannel(SocketChannel channel) throws Exception {
367 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
368 KeyStore ts = KeyStore.getInstance("JKS");
369 ts.load(new FileInputStream(tsLocation), tsPwd);
370 tmFactory.init(ts);
371
372 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
373 KeyStore ks = KeyStore.getInstance("JKS");
374 ks.load(new FileInputStream(ksLocation), ksPwd);
375 kmf.init(ks, ksPwd);
376
377 SSLContext clientContext = SSLContext.getInstance("TLS");
378 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
379
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800380 SSLEngine clientSslEngine = clientContext.createSSLEngine();
JunHuy Lam39eb4292015-06-26 17:24:23 +0900381
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800382 clientSslEngine.setUseClientMode(true);
383 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
384 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
385 clientSslEngine.setEnableSessionCreation(true);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900386
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800387 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
JunHuy Lam39eb4292015-06-26 17:24:23 +0900388 .addLast("encoder", encoder)
389 .addLast("decoder", new MessageDecoder())
390 .addLast("handler", dispatcher);
391 }
392
393 }
394
Madan Jampaniab6d3112014-10-02 16:30:14 -0700395 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
396
Madan Jampaniddf76222014-10-04 23:48:44 -0700397 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Madan Jampani53e44e62014-10-07 12:39:51 -0700398 private final ChannelHandler encoder = new MessageEncoder();
Madan Jampaniddf76222014-10-04 23:48:44 -0700399
Madan Jampaniab6d3112014-10-02 16:30:14 -0700400 @Override
401 protected void initChannel(SocketChannel channel) throws Exception {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900402 channel.pipeline()
403 .addLast("encoder", encoder)
404 .addLast("decoder", new MessageDecoder())
405 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700406 }
407 }
408
Madan Jampaniddf76222014-10-04 23:48:44 -0700409 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700410 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
411
412 @Override
413 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
Thomas Vachuskafc52fec2015-05-18 19:13:56 -0700414 try {
415 dispatchLocally(message);
416 } catch (RejectedExecutionException e) {
417 log.warn("Unable to dispatch message due to {}", e.getMessage());
418 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700419 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700420
Madan Jampani86ed0552014-10-03 16:45:42 -0700421 @Override
422 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700423 log.error("Exception inside channel handling pipeline.", cause);
Madan Jampani86ed0552014-10-03 16:45:42 -0700424 context.close();
425 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700426 }
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800427
Madan Jampaniba472232015-03-04 13:00:50 -0800428 private void dispatchLocally(InternalMessage message) throws IOException {
Madan Jampani49115e92015-03-14 10:43:33 -0700429 String type = message.type();
Madan Jampanic26eede2015-04-16 11:42:16 -0700430 if (REPLY_MESSAGE_TYPE.equals(type)) {
Madan Jampaniba472232015-03-04 13:00:50 -0800431 try {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700432 CompletableFuture<byte[]> futureResponse =
Madan Jampanic26eede2015-04-16 11:42:16 -0700433 responseFutures.getIfPresent(message.id());
Madan Jampaniba472232015-03-04 13:00:50 -0800434 if (futureResponse != null) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700435 futureResponse.complete(message.payload());
Madan Jampaniba472232015-03-04 13:00:50 -0800436 } else {
437 log.warn("Received a reply for message id:[{}]. "
438 + " from {}. But was unable to locate the"
439 + " request handle", message.id(), message.sender());
440 }
441 } finally {
Madan Jampanic26eede2015-04-16 11:42:16 -0700442 responseFutures.invalidate(message.id());
Madan Jampaniba472232015-03-04 13:00:50 -0800443 }
444 return;
445 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700446 Consumer<InternalMessage> handler = handlers.get(type);
Madan Jampaniba472232015-03-04 13:00:50 -0800447 if (handler != null) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700448 handler.accept(message);
Madan Jampaniba472232015-03-04 13:00:50 -0800449 } else {
450 log.debug("No handler registered for {}", type);
451 }
452 }
Ray Milkey34c95902015-04-15 09:47:53 -0700453}