blob: 8c759d1402bc7aabe57d4f941e1fb18627853079 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
Madan Jampaniab6d3112014-10-02 16:30:14 -070016package org.onlab.netty;
17
Madan Jampaniab6d3112014-10-02 16:30:14 -070018import io.netty.bootstrap.Bootstrap;
19import io.netty.bootstrap.ServerBootstrap;
20import io.netty.buffer.PooledByteBufAllocator;
21import io.netty.channel.Channel;
22import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070023import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070024import io.netty.channel.ChannelHandlerContext;
25import io.netty.channel.ChannelInitializer;
26import io.netty.channel.ChannelOption;
27import io.netty.channel.EventLoopGroup;
Madan Jampani824a7c12014-10-21 09:46:15 -070028import io.netty.channel.ServerChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070029import io.netty.channel.SimpleChannelInboundHandler;
Madan Jampani824a7c12014-10-21 09:46:15 -070030import io.netty.channel.epoll.EpollEventLoopGroup;
31import io.netty.channel.epoll.EpollServerSocketChannel;
32import io.netty.channel.epoll.EpollSocketChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070033import io.netty.channel.nio.NioEventLoopGroup;
34import io.netty.channel.socket.SocketChannel;
35import io.netty.channel.socket.nio.NioServerSocketChannel;
36import io.netty.channel.socket.nio.NioSocketChannel;
37
JunHuy Lam39eb4292015-06-26 17:24:23 +090038import java.io.FileInputStream;
Madan Jampani348a9fe2014-11-09 01:37:51 -080039import java.io.IOException;
JunHuy Lam39eb4292015-06-26 17:24:23 +090040import java.security.KeyStore;
41
Madan Jampaniafeebbd2015-05-19 15:26:01 -070042import java.util.Map;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070043import java.util.concurrent.CompletableFuture;
Madan Jampani348a9fe2014-11-09 01:37:51 -080044import java.util.concurrent.ConcurrentHashMap;
Madan Jampaniec5ae342015-04-13 15:43:10 -070045import java.util.concurrent.Executor;
Thomas Vachuskafc52fec2015-05-18 19:13:56 -070046import java.util.concurrent.RejectedExecutionException;
Madan Jampani348a9fe2014-11-09 01:37:51 -080047import java.util.concurrent.TimeUnit;
48import java.util.concurrent.TimeoutException;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070049import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani348a9fe2014-11-09 01:37:51 -080050import java.util.concurrent.atomic.AtomicLong;
Madan Jampanic26eede2015-04-16 11:42:16 -070051import java.util.function.Consumer;
52import java.util.function.Function;
Madan Jampani348a9fe2014-11-09 01:37:51 -080053
Madan Jampaniab6d3112014-10-02 16:30:14 -070054import org.apache.commons.pool.KeyedPoolableObjectFactory;
55import org.apache.commons.pool.impl.GenericKeyedObjectPool;
Madan Jampanic26eede2015-04-16 11:42:16 -070056import org.onosproject.store.cluster.messaging.Endpoint;
57import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampaniab6d3112014-10-02 16:30:14 -070058import org.slf4j.Logger;
59import org.slf4j.LoggerFactory;
60
61import com.google.common.cache.Cache;
62import com.google.common.cache.CacheBuilder;
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070063import com.google.common.cache.RemovalListener;
64import com.google.common.cache.RemovalNotification;
Madan Jampaniab6d3112014-10-02 16:30:14 -070065
JunHuy Lam39eb4292015-06-26 17:24:23 +090066import javax.net.ssl.KeyManagerFactory;
67import javax.net.ssl.SSLContext;
68import javax.net.ssl.SSLEngine;
69import javax.net.ssl.TrustManagerFactory;
70
Madan Jampaniab6d3112014-10-02 16:30:14 -070071/**
Madan Jampanic26eede2015-04-16 11:42:16 -070072 * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
Madan Jampaniab6d3112014-10-02 16:30:14 -070073 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070074public class NettyMessaging implements MessagingService {
Madan Jampaniab6d3112014-10-02 16:30:14 -070075
76 private final Logger log = LoggerFactory.getLogger(getClass());
77
Madan Jampanic26eede2015-04-16 11:42:16 -070078 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
79
Madan Jampaniafeebbd2015-05-19 15:26:01 -070080 private Endpoint localEp;
81 private final AtomicBoolean started = new AtomicBoolean(false);
82 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
Madan Jampani24f9efb2014-10-24 18:56:23 -070083 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Madan Jampani2bfa94c2015-04-11 05:03:49 -070084 private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070085 .expireAfterWrite(10, TimeUnit.SECONDS)
Madan Jampani2bfa94c2015-04-11 05:03:49 -070086 .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070087 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -070088 public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
Yuta HIGUCHIc611d922015-02-10 16:07:38 -080089 if (entry.wasEvicted()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -070090 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
Yuta HIGUCHIc611d922015-02-10 16:07:38 -080091 }
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070092 }
93 })
Madan Jampaniddf76222014-10-04 23:48:44 -070094 .build();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080095
Madan Jampaniddf76222014-10-04 23:48:44 -070096 private final GenericKeyedObjectPool<Endpoint, Channel> channels
97 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -070098
Madan Jampani824a7c12014-10-21 09:46:15 -070099 private EventLoopGroup serverGroup;
100 private EventLoopGroup clientGroup;
101 private Class<? extends ServerChannel> serverChannelClass;
102 private Class<? extends Channel> clientChannelClass;
103
JunHuy Lam39eb4292015-06-26 17:24:23 +0900104 protected static final boolean TLS_DISABLED = false;
105 protected boolean enableNettyTLS = TLS_DISABLED;
106
107 protected String ksLocation;
108 protected String tsLocation;
109 protected char[] ksPwd;
110 protected char[] tsPwd;
111
Madan Jampani5e83f332014-10-20 15:35:09 -0700112 private void initEventLoopGroup() {
Madan Jampani824a7c12014-10-21 09:46:15 -0700113 // try Epoll first and if that does work, use nio.
Madan Jampani824a7c12014-10-21 09:46:15 -0700114 try {
Madan Jampani99e9fe22014-10-21 13:47:12 -0700115 clientGroup = new EpollEventLoopGroup();
116 serverGroup = new EpollEventLoopGroup();
117 serverChannelClass = EpollServerSocketChannel.class;
118 clientChannelClass = EpollSocketChannel.class;
119 return;
Thomas Vachuska2c2b7682015-05-05 11:59:21 -0700120 } catch (Throwable e) {
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700121 log.debug("Failed to initialize native (epoll) transport. "
122 + "Reason: {}. Proceeding with nio.", e.getMessage());
Madan Jampani824a7c12014-10-21 09:46:15 -0700123 }
124 clientGroup = new NioEventLoopGroup();
125 serverGroup = new NioEventLoopGroup();
126 serverChannelClass = NioServerSocketChannel.class;
127 clientChannelClass = NioSocketChannel.class;
Madan Jampani5e83f332014-10-20 15:35:09 -0700128 }
129
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700130 public void start(Endpoint localEp) throws Exception {
131 if (started.get()) {
132 log.warn("Already running at local endpoint: {}", localEp);
133 return;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700134 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700135 this.localEp = localEp;
Madan Jampanicca4bdb2015-05-22 13:09:33 -0700136 channels.setLifo(true);
Madan Jampani86ed0552014-10-03 16:45:42 -0700137 channels.setTestOnBorrow(true);
138 channels.setTestOnReturn(true);
Madan Jampanicca4bdb2015-05-22 13:09:33 -0700139 channels.setMinEvictableIdleTimeMillis(60_000L);
140 channels.setTimeBetweenEvictionRunsMillis(30_000L);
Madan Jampani5e83f332014-10-20 15:35:09 -0700141 initEventLoopGroup();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700142 startAcceptingConnections();
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700143 started.set(true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700144 }
145
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700146 public void stop() throws Exception {
147 if (started.get()) {
148 channels.close();
149 serverGroup.shutdownGracefully();
150 clientGroup.shutdownGracefully();
151 started.set(false);
152 }
Madan Jampani87100932014-10-21 16:46:12 -0700153 }
154
Madan Jampaniab6d3112014-10-02 16:30:14 -0700155 @Override
Madan Jampani175e8fd2015-05-20 14:10:45 -0700156 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700157 InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
158 localEp,
159 type,
160 payload);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700161 return sendAsync(ep, message);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700162 }
163
Madan Jampani175e8fd2015-05-20 14:10:45 -0700164 protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
165 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700166 try {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700167 if (ep.equals(localEp)) {
168 dispatchLocally(message);
169 future.complete(null);
170 } else {
171 Channel channel = null;
172 try {
173 channel = channels.borrowObject(ep);
174 channel.writeAndFlush(message).addListener(channelFuture -> {
175 if (!channelFuture.isSuccess()) {
176 future.completeExceptionally(channelFuture.cause());
177 } else {
178 future.complete(null);
179 }
180 });
181 } finally {
182 channels.returnObject(ep, channel);
183 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700184 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700185 } catch (Exception e) {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700186 future.completeExceptionally(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700187 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700188 return future;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700189 }
190
191 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700192 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
193 CompletableFuture<byte[]> response = new CompletableFuture<>();
Madan Jampani24f9efb2014-10-24 18:56:23 -0700194 Long messageId = messageIdGenerator.incrementAndGet();
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700195 responseFutures.put(messageId, response);
Madan Jampanic26eede2015-04-16 11:42:16 -0700196 InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
Madan Jampani15cd0b82014-10-28 08:40:23 -0700197 try {
198 sendAsync(ep, message);
Madan Jampani5f9ec9a2014-10-29 13:45:52 -0700199 } catch (Exception e) {
Madan Jampani15cd0b82014-10-28 08:40:23 -0700200 responseFutures.invalidate(messageId);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700201 response.completeExceptionally(e);
Madan Jampani15cd0b82014-10-28 08:40:23 -0700202 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700203 return response;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700204 }
205
206 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700207 public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
208 handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
Madan Jampaniab6d3112014-10-02 16:30:14 -0700209 }
210
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700211 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700212 public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
213 handlers.put(type, message -> executor.execute(() -> {
214 byte[] responsePayload = handler.apply(message.payload());
215 if (responsePayload != null) {
216 InternalMessage response = new InternalMessage(message.id(),
217 localEp,
218 REPLY_MESSAGE_TYPE,
219 responsePayload);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700220 sendAsync(message.sender(), response).whenComplete((result, error) -> {
221 if (error != null) {
222 log.debug("Failed to respond", error);
223 }
224 });
Madan Jampani2af244a2015-02-22 13:12:01 -0800225 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700226 }));
Madan Jampani2af244a2015-02-22 13:12:01 -0800227 }
228
229 @Override
Madan Jampani27b69c62015-05-15 15:49:02 -0700230 public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
231 handlers.put(type, message -> {
232 handler.apply(message.payload()).whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700233 if (error == null) {
234 InternalMessage response = new InternalMessage(message.id(),
JunHuy Lam39eb4292015-06-26 17:24:23 +0900235 localEp,
236 REPLY_MESSAGE_TYPE,
237 result);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700238 sendAsync(message.sender(), response).whenComplete((r, e) -> {
239 if (e != null) {
240 log.debug("Failed to respond", e);
241 }
242 });
Madan Jampani27b69c62015-05-15 15:49:02 -0700243 }
Madan Jampani27b69c62015-05-15 15:49:02 -0700244 });
245 });
246 }
247
248 @Override
Madan Jampaniab6d3112014-10-02 16:30:14 -0700249 public void unregisterHandler(String type) {
Madan Jampani49115e92015-03-14 10:43:33 -0700250 handlers.remove(type);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700251 }
252
Madan Jampaniab6d3112014-10-02 16:30:14 -0700253 private void startAcceptingConnections() throws InterruptedException {
254 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700255 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700256 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
Madan Jampanic26eede2015-04-16 11:42:16 -0700257 b.option(ChannelOption.SO_RCVBUF, 1048576);
258 b.option(ChannelOption.TCP_NODELAY, true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700259 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900260 b.group(serverGroup, clientGroup);
261 b.channel(serverChannelClass);
262 if (enableNettyTLS) {
263 b.childHandler(new SSLServerCommunicationChannelInitializer());
264 } else {
265 b.childHandler(new OnosCommunicationChannelInitializer());
266 }
267 b.option(ChannelOption.SO_BACKLOG, 128);
268 b.childOption(ChannelOption.SO_KEEPALIVE, true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700269
270 // Bind and start to accept incoming connections.
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700271 b.bind(localEp.port()).sync().addListener(future -> {
272 if (future.isSuccess()) {
273 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
274 } else {
275 log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
276 }
277 });
Madan Jampaniab6d3112014-10-02 16:30:14 -0700278 }
279
280 private class OnosCommunicationChannelFactory
281 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
282
283 @Override
284 public void activateObject(Endpoint endpoint, Channel channel)
285 throws Exception {
286 }
287
288 @Override
289 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
Madan Jampani8a0569e2015-05-26 12:06:00 -0700290 log.debug("Closing connection to {}", ep);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700291 channel.close();
292 }
293
294 @Override
295 public Channel makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700296 Bootstrap bootstrap = new Bootstrap();
297 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Madan Jampanic26eede2015-04-16 11:42:16 -0700298 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
299 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
300 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
Madan Jampani824a7c12014-10-21 09:46:15 -0700301 bootstrap.group(clientGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700302 // TODO: Make this faster:
303 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampani824a7c12014-10-21 09:46:15 -0700304 bootstrap.channel(clientChannelClass);
Madan Jampaniddf76222014-10-04 23:48:44 -0700305 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900306 if (enableNettyTLS) {
307 bootstrap.handler(new SSLClientCommunicationChannelInitializer());
308 } else {
309 bootstrap.handler(new OnosCommunicationChannelInitializer());
310 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700311 // Start the client.
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800312 ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
Madan Jampani8a0569e2015-05-26 12:06:00 -0700313 log.debug("Established a new connection to {}", ep);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700314 return f.channel();
315 }
316
317 @Override
318 public void passivateObject(Endpoint ep, Channel channel)
319 throws Exception {
320 }
321
322 @Override
323 public boolean validateObject(Endpoint ep, Channel channel) {
324 return channel.isOpen();
325 }
326 }
327
JunHuy Lam39eb4292015-06-26 17:24:23 +0900328 private class SSLServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
329
330 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
331 private final ChannelHandler encoder = new MessageEncoder();
332
333 @Override
334 protected void initChannel(SocketChannel channel) throws Exception {
335 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
336 KeyStore ts = KeyStore.getInstance("JKS");
337 ts.load(new FileInputStream(tsLocation), tsPwd);
338 tmFactory.init(ts);
339
340 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
341 KeyStore ks = KeyStore.getInstance("JKS");
342 ks.load(new FileInputStream(ksLocation), ksPwd);
343 kmf.init(ks, ksPwd);
344
345 SSLContext serverContext = SSLContext.getInstance("TLS");
346 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
347
348 SSLEngine serverSSLEngine = serverContext.createSSLEngine();
349
350 serverSSLEngine.setNeedClientAuth(true);
351 serverSSLEngine.setUseClientMode(false);
352 serverSSLEngine.setEnabledProtocols(serverSSLEngine.getSupportedProtocols());
353 serverSSLEngine.setEnabledCipherSuites(serverSSLEngine.getSupportedCipherSuites());
354 serverSSLEngine.setEnableSessionCreation(true);
355
356 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSSLEngine))
357 .addLast("encoder", encoder)
358 .addLast("decoder", new MessageDecoder())
359 .addLast("handler", dispatcher);
360 }
361
362 }
363
364 private class SSLClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
365
366 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
367 private final ChannelHandler encoder = new MessageEncoder();
368
369 @Override
370 protected void initChannel(SocketChannel channel) throws Exception {
371 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
372 KeyStore ts = KeyStore.getInstance("JKS");
373 ts.load(new FileInputStream(tsLocation), tsPwd);
374 tmFactory.init(ts);
375
376 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
377 KeyStore ks = KeyStore.getInstance("JKS");
378 ks.load(new FileInputStream(ksLocation), ksPwd);
379 kmf.init(ks, ksPwd);
380
381 SSLContext clientContext = SSLContext.getInstance("TLS");
382 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
383
384 SSLEngine clientSSLEngine = clientContext.createSSLEngine();
385
386 clientSSLEngine.setUseClientMode(true);
387 clientSSLEngine.setEnabledProtocols(clientSSLEngine.getSupportedProtocols());
388 clientSSLEngine.setEnabledCipherSuites(clientSSLEngine.getSupportedCipherSuites());
389 clientSSLEngine.setEnableSessionCreation(true);
390
391 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSSLEngine))
392 .addLast("encoder", encoder)
393 .addLast("decoder", new MessageDecoder())
394 .addLast("handler", dispatcher);
395 }
396
397 }
398
Madan Jampaniab6d3112014-10-02 16:30:14 -0700399 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
400
Madan Jampaniddf76222014-10-04 23:48:44 -0700401 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Madan Jampani53e44e62014-10-07 12:39:51 -0700402 private final ChannelHandler encoder = new MessageEncoder();
Madan Jampaniddf76222014-10-04 23:48:44 -0700403
Madan Jampaniab6d3112014-10-02 16:30:14 -0700404 @Override
405 protected void initChannel(SocketChannel channel) throws Exception {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900406 channel.pipeline()
407 .addLast("encoder", encoder)
408 .addLast("decoder", new MessageDecoder())
409 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700410 }
411 }
412
Madan Jampaniddf76222014-10-04 23:48:44 -0700413 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700414 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
415
416 @Override
417 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
Thomas Vachuskafc52fec2015-05-18 19:13:56 -0700418 try {
419 dispatchLocally(message);
420 } catch (RejectedExecutionException e) {
421 log.warn("Unable to dispatch message due to {}", e.getMessage());
422 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700423 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700424
Madan Jampani86ed0552014-10-03 16:45:42 -0700425 @Override
426 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700427 log.error("Exception inside channel handling pipeline.", cause);
Madan Jampani86ed0552014-10-03 16:45:42 -0700428 context.close();
429 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700430 }
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800431
Madan Jampaniba472232015-03-04 13:00:50 -0800432 private void dispatchLocally(InternalMessage message) throws IOException {
Madan Jampani49115e92015-03-14 10:43:33 -0700433 String type = message.type();
Madan Jampanic26eede2015-04-16 11:42:16 -0700434 if (REPLY_MESSAGE_TYPE.equals(type)) {
Madan Jampaniba472232015-03-04 13:00:50 -0800435 try {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700436 CompletableFuture<byte[]> futureResponse =
Madan Jampanic26eede2015-04-16 11:42:16 -0700437 responseFutures.getIfPresent(message.id());
Madan Jampaniba472232015-03-04 13:00:50 -0800438 if (futureResponse != null) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700439 futureResponse.complete(message.payload());
Madan Jampaniba472232015-03-04 13:00:50 -0800440 } else {
441 log.warn("Received a reply for message id:[{}]. "
442 + " from {}. But was unable to locate the"
443 + " request handle", message.id(), message.sender());
444 }
445 } finally {
Madan Jampanic26eede2015-04-16 11:42:16 -0700446 responseFutures.invalidate(message.id());
Madan Jampaniba472232015-03-04 13:00:50 -0800447 }
448 return;
449 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700450 Consumer<InternalMessage> handler = handlers.get(type);
Madan Jampaniba472232015-03-04 13:00:50 -0800451 if (handler != null) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700452 handler.accept(message);
Madan Jampaniba472232015-03-04 13:00:50 -0800453 } else {
454 log.debug("No handler registered for {}", type);
455 }
456 }
Ray Milkey34c95902015-04-15 09:47:53 -0700457}