blob: 8b003f08ea53bb7a0fb3e10a7ac393c69a154a91 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
Madan Jampaniab6d3112014-10-02 16:30:14 -070016package org.onlab.netty;
17
Madan Jampanid36def02016-01-13 11:21:56 -080018import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalListener;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.util.concurrent.MoreExecutors;
23
Madan Jampaniab6d3112014-10-02 16:30:14 -070024import io.netty.bootstrap.Bootstrap;
25import io.netty.bootstrap.ServerBootstrap;
26import io.netty.buffer.PooledByteBufAllocator;
27import io.netty.channel.Channel;
28import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070029import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070030import io.netty.channel.ChannelHandlerContext;
31import io.netty.channel.ChannelInitializer;
32import io.netty.channel.ChannelOption;
33import io.netty.channel.EventLoopGroup;
Madan Jampani824a7c12014-10-21 09:46:15 -070034import io.netty.channel.ServerChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070035import io.netty.channel.SimpleChannelInboundHandler;
Madan Jampani824a7c12014-10-21 09:46:15 -070036import io.netty.channel.epoll.EpollEventLoopGroup;
37import io.netty.channel.epoll.EpollServerSocketChannel;
38import io.netty.channel.epoll.EpollSocketChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070039import io.netty.channel.nio.NioEventLoopGroup;
40import io.netty.channel.socket.SocketChannel;
41import io.netty.channel.socket.nio.NioServerSocketChannel;
42import io.netty.channel.socket.nio.NioSocketChannel;
43
Madan Jampanid36def02016-01-13 11:21:56 -080044import org.apache.commons.pool.KeyedPoolableObjectFactory;
45import org.apache.commons.pool.impl.GenericKeyedObjectPool;
46import org.onlab.util.Tools;
47import org.onosproject.store.cluster.messaging.Endpoint;
48import org.onosproject.store.cluster.messaging.MessagingService;
49import org.slf4j.Logger;
50import org.slf4j.LoggerFactory;
51
52import javax.net.ssl.KeyManagerFactory;
53import javax.net.ssl.SSLContext;
54import javax.net.ssl.SSLEngine;
55import javax.net.ssl.TrustManagerFactory;
56
JunHuy Lam39eb4292015-06-26 17:24:23 +090057import java.io.FileInputStream;
Madan Jampani348a9fe2014-11-09 01:37:51 -080058import java.io.IOException;
JunHuy Lam39eb4292015-06-26 17:24:23 +090059import java.security.KeyStore;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070060import java.util.Map;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070061import java.util.concurrent.CompletableFuture;
Madan Jampani348a9fe2014-11-09 01:37:51 -080062import java.util.concurrent.ConcurrentHashMap;
Madan Jampaniec5ae342015-04-13 15:43:10 -070063import java.util.concurrent.Executor;
Thomas Vachuskafc52fec2015-05-18 19:13:56 -070064import java.util.concurrent.RejectedExecutionException;
Madan Jampani348a9fe2014-11-09 01:37:51 -080065import java.util.concurrent.TimeUnit;
66import java.util.concurrent.TimeoutException;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070067import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani348a9fe2014-11-09 01:37:51 -080068import java.util.concurrent.atomic.AtomicLong;
Madan Jampanid36def02016-01-13 11:21:56 -080069import java.util.function.BiConsumer;
70import java.util.function.BiFunction;
Madan Jampanic26eede2015-04-16 11:42:16 -070071import java.util.function.Consumer;
Ray Milkey9f87e512016-01-05 10:00:22 -080072
Madan Jampaniab6d3112014-10-02 16:30:14 -070073/**
Madan Jampanic26eede2015-04-16 11:42:16 -070074 * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
Madan Jampaniab6d3112014-10-02 16:30:14 -070075 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070076public class NettyMessaging implements MessagingService {
Madan Jampaniab6d3112014-10-02 16:30:14 -070077
78 private final Logger log = LoggerFactory.getLogger(getClass());
79
Madan Jampanic26eede2015-04-16 11:42:16 -070080 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
81
Madan Jampaniafeebbd2015-05-19 15:26:01 -070082 private Endpoint localEp;
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -080083 private int preamble;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070084 private final AtomicBoolean started = new AtomicBoolean(false);
85 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
Madan Jampani24f9efb2014-10-24 18:56:23 -070086 private final AtomicLong messageIdGenerator = new AtomicLong(0);
Madan Jampanid36def02016-01-13 11:21:56 -080087 private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070088 .expireAfterWrite(10, TimeUnit.SECONDS)
Madan Jampanid36def02016-01-13 11:21:56 -080089 .removalListener(new RemovalListener<Long, Callback>() {
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070090 @Override
Madan Jampanid36def02016-01-13 11:21:56 -080091 public void onRemoval(RemovalNotification<Long, Callback> entry) {
Yuta HIGUCHIc611d922015-02-10 16:07:38 -080092 if (entry.wasEvicted()) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -070093 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
Yuta HIGUCHIc611d922015-02-10 16:07:38 -080094 }
Madan Jampani5f9ec9a2014-10-29 13:45:52 -070095 }
96 })
Madan Jampaniddf76222014-10-04 23:48:44 -070097 .build();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080098
Aaron Kruglikov8a352192016-01-22 15:13:08 -080099 private final GenericKeyedObjectPool<Endpoint, Connection> channels
100 = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700101
Madan Jampani824a7c12014-10-21 09:46:15 -0700102 private EventLoopGroup serverGroup;
103 private EventLoopGroup clientGroup;
104 private Class<? extends ServerChannel> serverChannelClass;
105 private Class<? extends Channel> clientChannelClass;
106
JunHuy Lam39eb4292015-06-26 17:24:23 +0900107 protected static final boolean TLS_DISABLED = false;
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800108 protected boolean enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900109
110 protected String ksLocation;
111 protected String tsLocation;
112 protected char[] ksPwd;
113 protected char[] tsPwd;
114
Ray Milkeyaef45852016-01-11 17:13:19 -0800115 @SuppressWarnings("squid:S1181")
Ray Milkey86f20cc2015-12-09 16:54:09 -0800116 // We really need to catch Throwable due to netty native epoll() handling
Madan Jampani5e83f332014-10-20 15:35:09 -0700117 private void initEventLoopGroup() {
Madan Jampani824a7c12014-10-21 09:46:15 -0700118 // try Epoll first and if that does work, use nio.
Madan Jampani824a7c12014-10-21 09:46:15 -0700119 try {
Madan Jampani99e9fe22014-10-21 13:47:12 -0700120 clientGroup = new EpollEventLoopGroup();
121 serverGroup = new EpollEventLoopGroup();
122 serverChannelClass = EpollServerSocketChannel.class;
123 clientChannelClass = EpollSocketChannel.class;
124 return;
Thomas Vachuska2c2b7682015-05-05 11:59:21 -0700125 } catch (Throwable e) {
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700126 log.debug("Failed to initialize native (epoll) transport. "
127 + "Reason: {}. Proceeding with nio.", e.getMessage());
Madan Jampani824a7c12014-10-21 09:46:15 -0700128 }
129 clientGroup = new NioEventLoopGroup();
130 serverGroup = new NioEventLoopGroup();
131 serverChannelClass = NioServerSocketChannel.class;
132 clientChannelClass = NioSocketChannel.class;
Madan Jampani5e83f332014-10-20 15:35:09 -0700133 }
134
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800135 public void start(int preamble, Endpoint localEp) throws Exception {
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700136 if (started.get()) {
137 log.warn("Already running at local endpoint: {}", localEp);
138 return;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700139 }
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800140 this.preamble = preamble;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700141 this.localEp = localEp;
Madan Jampanicca4bdb2015-05-22 13:09:33 -0700142 channels.setLifo(true);
Madan Jampani86ed0552014-10-03 16:45:42 -0700143 channels.setTestOnBorrow(true);
144 channels.setTestOnReturn(true);
Madan Jampanicca4bdb2015-05-22 13:09:33 -0700145 channels.setMinEvictableIdleTimeMillis(60_000L);
146 channels.setTimeBetweenEvictionRunsMillis(30_000L);
Madan Jampani5e83f332014-10-20 15:35:09 -0700147 initEventLoopGroup();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700148 startAcceptingConnections();
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700149 started.set(true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700150 }
151
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700152 public void stop() throws Exception {
153 if (started.get()) {
154 channels.close();
155 serverGroup.shutdownGracefully();
156 clientGroup.shutdownGracefully();
157 started.set(false);
158 }
Madan Jampani87100932014-10-21 16:46:12 -0700159 }
160
Madan Jampaniab6d3112014-10-02 16:30:14 -0700161 @Override
Madan Jampani175e8fd2015-05-20 14:10:45 -0700162 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700163 InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
164 localEp,
165 type,
166 payload);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700167 return sendAsync(ep, message);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700168 }
169
Madan Jampani175e8fd2015-05-20 14:10:45 -0700170 protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
Madan Jampanid36def02016-01-13 11:21:56 -0800171 if (ep.equals(localEp)) {
172 try {
173 dispatchLocally(message);
174 } catch (IOException e) {
175 return Tools.exceptionalFuture(e);
176 }
177 return CompletableFuture.completedFuture(null);
178 }
179
Madan Jampani175e8fd2015-05-20 14:10:45 -0700180 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700181 try {
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800182 Connection connection = null;
Madan Jampanid36def02016-01-13 11:21:56 -0800183 try {
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800184 connection = channels.borrowObject(ep);
185 connection.send(message, future);
186
Madan Jampanid36def02016-01-13 11:21:56 -0800187 } finally {
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800188 channels.returnObject(ep, connection);
Madan Jampani86ed0552014-10-03 16:45:42 -0700189 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700190 } catch (Exception e) {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700191 future.completeExceptionally(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700192 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700193 return future;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700194 }
195
196 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700197 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Madan Jampanid36def02016-01-13 11:21:56 -0800198 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
199 }
200
201 @Override
202 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700203 CompletableFuture<byte[]> response = new CompletableFuture<>();
Madan Jampanid36def02016-01-13 11:21:56 -0800204 Callback callback = new Callback(response, executor);
Madan Jampani24f9efb2014-10-24 18:56:23 -0700205 Long messageId = messageIdGenerator.incrementAndGet();
Madan Jampanid36def02016-01-13 11:21:56 -0800206 callbacks.put(messageId, callback);
Madan Jampanic26eede2015-04-16 11:42:16 -0700207 InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
Madan Jampanid36def02016-01-13 11:21:56 -0800208 return sendAsync(ep, message).whenComplete((r, e) -> {
209 if (e != null) {
210 callbacks.invalidate(messageId);
211 }
212 }).thenCompose(v -> response);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700213 }
214
215 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800216 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
217 handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
Madan Jampaniab6d3112014-10-02 16:30:14 -0700218 }
219
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700220 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800221 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700222 handlers.put(type, message -> executor.execute(() -> {
Madan Jampanid36def02016-01-13 11:21:56 -0800223 byte[] responsePayload = handler.apply(message.sender(), message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700224 if (responsePayload != null) {
225 InternalMessage response = new InternalMessage(message.id(),
226 localEp,
227 REPLY_MESSAGE_TYPE,
228 responsePayload);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700229 sendAsync(message.sender(), response).whenComplete((result, error) -> {
230 if (error != null) {
231 log.debug("Failed to respond", error);
232 }
233 });
Madan Jampani2af244a2015-02-22 13:12:01 -0800234 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700235 }));
Madan Jampani2af244a2015-02-22 13:12:01 -0800236 }
237
238 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800239 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Madan Jampani27b69c62015-05-15 15:49:02 -0700240 handlers.put(type, message -> {
Madan Jampanid36def02016-01-13 11:21:56 -0800241 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Madan Jampani175e8fd2015-05-20 14:10:45 -0700242 if (error == null) {
243 InternalMessage response = new InternalMessage(message.id(),
JunHuy Lam39eb4292015-06-26 17:24:23 +0900244 localEp,
245 REPLY_MESSAGE_TYPE,
246 result);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700247 sendAsync(message.sender(), response).whenComplete((r, e) -> {
248 if (e != null) {
249 log.debug("Failed to respond", e);
250 }
251 });
Madan Jampani27b69c62015-05-15 15:49:02 -0700252 }
Madan Jampani27b69c62015-05-15 15:49:02 -0700253 });
254 });
255 }
256
257 @Override
Madan Jampaniab6d3112014-10-02 16:30:14 -0700258 public void unregisterHandler(String type) {
Madan Jampani49115e92015-03-14 10:43:33 -0700259 handlers.remove(type);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700260 }
261
Madan Jampaniab6d3112014-10-02 16:30:14 -0700262 private void startAcceptingConnections() throws InterruptedException {
263 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700264 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700265 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
Madan Jampanic26eede2015-04-16 11:42:16 -0700266 b.option(ChannelOption.SO_RCVBUF, 1048576);
267 b.option(ChannelOption.TCP_NODELAY, true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700268 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900269 b.group(serverGroup, clientGroup);
270 b.channel(serverChannelClass);
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800271 if (enableNettyTls) {
272 b.childHandler(new SslServerCommunicationChannelInitializer());
JunHuy Lam39eb4292015-06-26 17:24:23 +0900273 } else {
274 b.childHandler(new OnosCommunicationChannelInitializer());
275 }
276 b.option(ChannelOption.SO_BACKLOG, 128);
277 b.childOption(ChannelOption.SO_KEEPALIVE, true);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700278
279 // Bind and start to accept incoming connections.
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700280 b.bind(localEp.port()).sync().addListener(future -> {
281 if (future.isSuccess()) {
282 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
283 } else {
284 log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
285 }
286 });
Madan Jampaniab6d3112014-10-02 16:30:14 -0700287 }
288
289 private class OnosCommunicationChannelFactory
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800290 implements KeyedPoolableObjectFactory<Endpoint, Connection> {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700291
292 @Override
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800293 public void activateObject(Endpoint endpoint, Connection connection)
Madan Jampaniab6d3112014-10-02 16:30:14 -0700294 throws Exception {
295 }
296
297 @Override
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800298 public void destroyObject(Endpoint ep, Connection connection) throws Exception {
Madan Jampani8a0569e2015-05-26 12:06:00 -0700299 log.debug("Closing connection to {}", ep);
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800300 //Is this the right way to destroy?
301 connection.destroy();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700302 }
303
304 @Override
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800305 public Connection makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700306 Bootstrap bootstrap = new Bootstrap();
307 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Madan Jampanic26eede2015-04-16 11:42:16 -0700308 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
309 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
310 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
Aaron Kruglikov53b88a52016-01-19 11:13:23 -0800311 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
Madan Jampani824a7c12014-10-21 09:46:15 -0700312 bootstrap.group(clientGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700313 // TODO: Make this faster:
314 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampani824a7c12014-10-21 09:46:15 -0700315 bootstrap.channel(clientChannelClass);
Madan Jampaniddf76222014-10-04 23:48:44 -0700316 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800317 if (enableNettyTls) {
318 bootstrap.handler(new SslClientCommunicationChannelInitializer());
JunHuy Lam39eb4292015-06-26 17:24:23 +0900319 } else {
320 bootstrap.handler(new OnosCommunicationChannelInitializer());
321 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700322 // Start the client.
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800323 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
324 ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
325
326 f.addListener(future -> {
327 if (future.isSuccess()) {
328 retFuture.complete(f.channel());
329 } else {
330 retFuture.completeExceptionally(future.cause());
331 }
332 });
Madan Jampani8a0569e2015-05-26 12:06:00 -0700333 log.debug("Established a new connection to {}", ep);
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800334 return new Connection(retFuture);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700335 }
336
337 @Override
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800338 public void passivateObject(Endpoint ep, Connection connection)
Madan Jampaniab6d3112014-10-02 16:30:14 -0700339 throws Exception {
340 }
341
342 @Override
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800343 public boolean validateObject(Endpoint ep, Connection connection) {
344 return connection.validate();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700345 }
346 }
347
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800348 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900349
350 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800351 private final ChannelHandler encoder = new MessageEncoder(preamble);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900352
353 @Override
354 protected void initChannel(SocketChannel channel) throws Exception {
355 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
356 KeyStore ts = KeyStore.getInstance("JKS");
357 ts.load(new FileInputStream(tsLocation), tsPwd);
358 tmFactory.init(ts);
359
360 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
361 KeyStore ks = KeyStore.getInstance("JKS");
362 ks.load(new FileInputStream(ksLocation), ksPwd);
363 kmf.init(ks, ksPwd);
364
365 SSLContext serverContext = SSLContext.getInstance("TLS");
366 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
367
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800368 SSLEngine serverSslEngine = serverContext.createSSLEngine();
JunHuy Lam39eb4292015-06-26 17:24:23 +0900369
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800370 serverSslEngine.setNeedClientAuth(true);
371 serverSslEngine.setUseClientMode(false);
372 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
373 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
374 serverSslEngine.setEnableSessionCreation(true);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900375
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800376 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
JunHuy Lam39eb4292015-06-26 17:24:23 +0900377 .addLast("encoder", encoder)
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800378 .addLast("decoder", new MessageDecoder(preamble))
JunHuy Lam39eb4292015-06-26 17:24:23 +0900379 .addLast("handler", dispatcher);
380 }
381
382 }
383
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800384 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900385
386 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800387 private final ChannelHandler encoder = new MessageEncoder(preamble);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900388
389 @Override
390 protected void initChannel(SocketChannel channel) throws Exception {
391 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
392 KeyStore ts = KeyStore.getInstance("JKS");
393 ts.load(new FileInputStream(tsLocation), tsPwd);
394 tmFactory.init(ts);
395
396 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
397 KeyStore ks = KeyStore.getInstance("JKS");
398 ks.load(new FileInputStream(ksLocation), ksPwd);
399 kmf.init(ks, ksPwd);
400
401 SSLContext clientContext = SSLContext.getInstance("TLS");
402 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
403
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800404 SSLEngine clientSslEngine = clientContext.createSSLEngine();
JunHuy Lam39eb4292015-06-26 17:24:23 +0900405
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800406 clientSslEngine.setUseClientMode(true);
407 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
408 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
409 clientSslEngine.setEnableSessionCreation(true);
JunHuy Lam39eb4292015-06-26 17:24:23 +0900410
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800411 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
JunHuy Lam39eb4292015-06-26 17:24:23 +0900412 .addLast("encoder", encoder)
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800413 .addLast("decoder", new MessageDecoder(preamble))
JunHuy Lam39eb4292015-06-26 17:24:23 +0900414 .addLast("handler", dispatcher);
415 }
416
417 }
418
Madan Jampaniab6d3112014-10-02 16:30:14 -0700419 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
420
Madan Jampaniddf76222014-10-04 23:48:44 -0700421 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800422 private final ChannelHandler encoder = new MessageEncoder(preamble);
Madan Jampaniddf76222014-10-04 23:48:44 -0700423
Madan Jampaniab6d3112014-10-02 16:30:14 -0700424 @Override
425 protected void initChannel(SocketChannel channel) throws Exception {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900426 channel.pipeline()
427 .addLast("encoder", encoder)
Aaron Kruglikoveb0ae4e2015-11-10 19:16:16 -0800428 .addLast("decoder", new MessageDecoder(preamble))
JunHuy Lam39eb4292015-06-26 17:24:23 +0900429 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700430 }
431 }
432
Madan Jampaniddf76222014-10-04 23:48:44 -0700433 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700434 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
435
436 @Override
437 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
Thomas Vachuskafc52fec2015-05-18 19:13:56 -0700438 try {
439 dispatchLocally(message);
440 } catch (RejectedExecutionException e) {
441 log.warn("Unable to dispatch message due to {}", e.getMessage());
442 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700443 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700444
Madan Jampani86ed0552014-10-03 16:45:42 -0700445 @Override
446 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700447 log.error("Exception inside channel handling pipeline.", cause);
Madan Jampani86ed0552014-10-03 16:45:42 -0700448 context.close();
449 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700450 }
Madan Jampaniba472232015-03-04 13:00:50 -0800451 private void dispatchLocally(InternalMessage message) throws IOException {
Madan Jampani49115e92015-03-14 10:43:33 -0700452 String type = message.type();
Madan Jampanic26eede2015-04-16 11:42:16 -0700453 if (REPLY_MESSAGE_TYPE.equals(type)) {
Madan Jampaniba472232015-03-04 13:00:50 -0800454 try {
Madan Jampanid36def02016-01-13 11:21:56 -0800455 Callback callback =
456 callbacks.getIfPresent(message.id());
457 if (callback != null) {
458 callback.complete(message.payload());
Madan Jampaniba472232015-03-04 13:00:50 -0800459 } else {
460 log.warn("Received a reply for message id:[{}]. "
461 + " from {}. But was unable to locate the"
462 + " request handle", message.id(), message.sender());
463 }
464 } finally {
Madan Jampanid36def02016-01-13 11:21:56 -0800465 callbacks.invalidate(message.id());
Madan Jampaniba472232015-03-04 13:00:50 -0800466 }
467 return;
468 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700469 Consumer<InternalMessage> handler = handlers.get(type);
Madan Jampaniba472232015-03-04 13:00:50 -0800470 if (handler != null) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700471 handler.accept(message);
Madan Jampaniba472232015-03-04 13:00:50 -0800472 } else {
473 log.debug("No handler registered for {}", type);
474 }
475 }
Madan Jampanid36def02016-01-13 11:21:56 -0800476
477 private final class Callback {
478 private final CompletableFuture<byte[]> future;
479 private final Executor executor;
480
481 public Callback(CompletableFuture<byte[]> future, Executor executor) {
482 this.future = future;
483 this.executor = executor;
484 }
485
486 public void complete(byte[] value) {
487 executor.execute(() -> future.complete(value));
488 }
489
490 public void completeExceptionally(Throwable error) {
491 executor.execute(() -> future.completeExceptionally(error));
492 }
493 }
Aaron Kruglikov8a352192016-01-22 15:13:08 -0800494 private final class Connection {
495 private final CompletableFuture<Channel> internalFuture;
496
497 public Connection(CompletableFuture<Channel> internalFuture) {
498 this.internalFuture = internalFuture;
499 }
500
501 /**
502 * Sends a message out on its channel and associated the message with a
503 * completable future used for signaling.
504 * @param message the message to be sent
505 * @param future a future that is completed normally or exceptionally if
506 * message sending succeeds or fails respectively
507 */
508 public void send(Object message, CompletableFuture<Void> future) {
509 internalFuture.whenComplete((channel, throwable) -> {
510 if (throwable == null) {
511 channel.writeAndFlush(message).addListener(channelFuture -> {
512 if (!channelFuture.isSuccess()) {
513 future.completeExceptionally(channelFuture.cause());
514 } else {
515 future.complete(null);
516 }
517 });
518 } else {
519 future.completeExceptionally(throwable);
520 }
521
522 });
523 }
524
525 /**
526 * Destroys a channel by closing its channel (if it exists) and
527 * cancelling its future.
528 */
529 public void destroy() {
530 Channel channel = internalFuture.getNow(null);
531 if (channel != null) {
532 channel.close();
533 }
534 internalFuture.cancel(false);
535 }
536
537 /**
538 * Determines whether the connection is valid meaning it is either
539 * complete with and active channel
540 * or it has not yet completed.
541 * @return true if the channel has an active connection or has not
542 * yet completed
543 */
544 public boolean validate() {
545 if (internalFuture.isCompletedExceptionally()) {
546 return false;
547 }
548 Channel channel = internalFuture.getNow(null);
549 return channel == null || channel.isActive();
550 }
551 }
Ray Milkey34c95902015-04-15 09:47:53 -0700552}