blob: e9b50c4570c075632391e8c954c0557cd344f2fa [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
JunHuy Lam39eb4292015-06-26 17:24:23 +090018import com.google.common.base.Strings;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.util.concurrent.MoreExecutors;
Madan Jampania9e70a62016-03-02 16:28:18 -080024
Aaron Kruglikov1b727382016-02-09 16:17:47 -080025import io.netty.bootstrap.Bootstrap;
26import io.netty.bootstrap.ServerBootstrap;
27import io.netty.buffer.PooledByteBufAllocator;
28import io.netty.channel.Channel;
29import io.netty.channel.ChannelFuture;
30import io.netty.channel.ChannelHandler;
31import io.netty.channel.ChannelHandlerContext;
32import io.netty.channel.ChannelInitializer;
33import io.netty.channel.ChannelOption;
34import io.netty.channel.EventLoopGroup;
35import io.netty.channel.ServerChannel;
36import io.netty.channel.SimpleChannelInboundHandler;
37import io.netty.channel.epoll.EpollEventLoopGroup;
38import io.netty.channel.epoll.EpollServerSocketChannel;
39import io.netty.channel.epoll.EpollSocketChannel;
40import io.netty.channel.nio.NioEventLoopGroup;
41import io.netty.channel.socket.SocketChannel;
42import io.netty.channel.socket.nio.NioServerSocketChannel;
43import io.netty.channel.socket.nio.NioSocketChannel;
44import org.apache.commons.pool.KeyedPoolableObjectFactory;
45import org.apache.commons.pool.impl.GenericKeyedObjectPool;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070046import org.apache.felix.scr.annotations.Activate;
47import org.apache.felix.scr.annotations.Component;
48import org.apache.felix.scr.annotations.Deactivate;
49import org.apache.felix.scr.annotations.Reference;
50import org.apache.felix.scr.annotations.ReferenceCardinality;
51import org.apache.felix.scr.annotations.Service;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080052import org.onlab.util.Tools;
Madan Jampaniec1df022015-10-13 21:23:03 -070053import org.onosproject.cluster.ClusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070054import org.onosproject.cluster.ControllerNode;
Madan Jampani05833872016-07-12 23:01:39 -070055import org.onosproject.core.HybridLogicalClockService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070056import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampania9e70a62016-03-02 16:28:18 -080057import org.onosproject.store.cluster.messaging.MessagingException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080058import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampania9e70a62016-03-02 16:28:18 -080059import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Aaron Kruglikov1b727382016-02-09 16:17:47 -080063import javax.net.ssl.KeyManagerFactory;
64import javax.net.ssl.SSLContext;
65import javax.net.ssl.SSLEngine;
66import javax.net.ssl.TrustManagerFactory;
Madan Jampania9e70a62016-03-02 16:28:18 -080067
Aaron Kruglikov1b727382016-02-09 16:17:47 -080068import java.io.FileInputStream;
69import java.io.IOException;
70import java.security.KeyStore;
71import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080072import java.util.Optional;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080073import java.util.concurrent.CompletableFuture;
74import java.util.concurrent.ConcurrentHashMap;
75import java.util.concurrent.Executor;
76import java.util.concurrent.RejectedExecutionException;
77import java.util.concurrent.TimeUnit;
78import java.util.concurrent.TimeoutException;
79import java.util.concurrent.atomic.AtomicBoolean;
80import java.util.concurrent.atomic.AtomicLong;
81import java.util.function.BiConsumer;
82import java.util.function.BiFunction;
83import java.util.function.Consumer;
84
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070085import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090086import static org.onosproject.security.AppGuard.checkPermission;
87import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
88
Madan Jampaniafeebbd2015-05-19 15:26:01 -070089/**
90 * Netty based MessagingService.
91 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070092@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -070093@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -080094public class NettyMessagingManager implements MessagingService {
95
Madan Jampanif3ab4242016-07-27 17:21:47 -070096 private static final int REPLY_TIME_OUT_MILLIS = 250;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080097 private static final short MIN_KS_LENGTH = 6;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070098
99 private final Logger log = LoggerFactory.getLogger(getClass());
100
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800101 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
102
Madan Jampani05833872016-07-12 23:01:39 -0700103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected HybridLogicalClockService clockService;
105
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800106 private Endpoint localEp;
107 private int preamble;
108 private final AtomicBoolean started = new AtomicBoolean(false);
109 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
110 private final AtomicLong messageIdGenerator = new AtomicLong(0);
111 private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
Madan Jampanif3ab4242016-07-27 17:21:47 -0700112 .expireAfterWrite(REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS)
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800113 .removalListener(new RemovalListener<Long, Callback>() {
114 @Override
115 public void onRemoval(RemovalNotification<Long, Callback> entry) {
116 if (entry.wasEvicted()) {
117 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
118 }
119 }
120 })
121 .build();
122
123 private final GenericKeyedObjectPool<Endpoint, Connection> channels
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700124 = new GenericKeyedObjectPool<>(new OnosCommunicationChannelFactory());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800125
126 private EventLoopGroup serverGroup;
127 private EventLoopGroup clientGroup;
128 private Class<? extends ServerChannel> serverChannelClass;
129 private Class<? extends Channel> clientChannelClass;
130
131 protected static final boolean TLS_DISABLED = false;
132 protected boolean enableNettyTls = TLS_DISABLED;
133
134 protected String ksLocation;
135 protected String tsLocation;
136 protected char[] ksPwd;
137 protected char[] tsPwd;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900138
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700140 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700141
142 @Activate
143 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700144 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800145 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800146
147 if (started.get()) {
148 log.warn("Already running at local endpoint: {}", localEp);
149 return;
150 }
151 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
152 this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
153 channels.setLifo(true);
154 channels.setTestOnBorrow(true);
155 channels.setTestOnReturn(true);
156 channels.setMinEvictableIdleTimeMillis(60_000L);
157 channels.setTimeBetweenEvictionRunsMillis(30_000L);
158 initEventLoopGroup();
159 startAcceptingConnections();
160 started.set(true);
Madan Jampanif3ab4242016-07-27 17:21:47 -0700161 serverGroup.scheduleWithFixedDelay(callbacks::cleanUp, 0, REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700162 log.info("Started");
163 }
164
165 @Deactivate
166 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800167 if (started.get()) {
168 channels.close();
169 serverGroup.shutdownGracefully();
170 clientGroup.shutdownGracefully();
171 started.set(false);
172 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700173 log.info("Stopped");
174 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900175
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800176 private void getTlsParameters() {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900177 String tempString = System.getProperty("enableNettyTLS");
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800178 enableNettyTls = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString);
179 log.info("enableNettyTLS = {}", enableNettyTls);
180 if (enableNettyTls) {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900181 ksLocation = System.getProperty("javax.net.ssl.keyStore");
182 if (Strings.isNullOrEmpty(ksLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800183 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900184 return;
185 }
186 tsLocation = System.getProperty("javax.net.ssl.trustStore");
187 if (Strings.isNullOrEmpty(tsLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800188 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900189 return;
190 }
191 ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
192 if (MIN_KS_LENGTH > ksPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800193 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900194 return;
195 }
196 tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
197 if (MIN_KS_LENGTH > tsPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800198 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900199 return;
200 }
201 }
202 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800203 private void initEventLoopGroup() {
204 // try Epoll first and if that does work, use nio.
205 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700206 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
207 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800208 serverChannelClass = EpollServerSocketChannel.class;
209 clientChannelClass = EpollSocketChannel.class;
210 return;
211 } catch (Throwable e) {
212 log.debug("Failed to initialize native (epoll) transport. "
213 + "Reason: {}. Proceeding with nio.", e.getMessage());
214 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700215 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
216 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800217 serverChannelClass = NioServerSocketChannel.class;
218 clientChannelClass = NioSocketChannel.class;
219 }
220
221 @Override
222 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900223 checkPermission(CLUSTER_WRITE);
Madan Jampanib825aeb2016-04-01 15:18:25 -0700224 InternalMessage message = new InternalMessage(preamble,
Madan Jampani05833872016-07-12 23:01:39 -0700225 clockService.timeNow(),
Madan Jampanib825aeb2016-04-01 15:18:25 -0700226 messageIdGenerator.incrementAndGet(),
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800227 localEp,
228 type,
229 payload);
230 return sendAsync(ep, message);
231 }
232
233 protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900234 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800235 if (ep.equals(localEp)) {
236 try {
237 dispatchLocally(message);
238 } catch (IOException e) {
239 return Tools.exceptionalFuture(e);
240 }
241 return CompletableFuture.completedFuture(null);
242 }
243
244 CompletableFuture<Void> future = new CompletableFuture<>();
245 try {
246 Connection connection = null;
247 try {
248 connection = channels.borrowObject(ep);
249 connection.send(message, future);
250 } finally {
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700251 if (connection != null) {
252 channels.returnObject(ep, connection);
253 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800254 }
255 } catch (Exception e) {
256 future.completeExceptionally(e);
257 }
258 return future;
259 }
260
261 @Override
262 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900263 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800264 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
265 }
266
267 @Override
268 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900269 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800270 CompletableFuture<byte[]> response = new CompletableFuture<>();
271 Callback callback = new Callback(response, executor);
272 Long messageId = messageIdGenerator.incrementAndGet();
273 callbacks.put(messageId, callback);
Madan Jampani05833872016-07-12 23:01:39 -0700274 InternalMessage message = new InternalMessage(preamble,
275 clockService.timeNow(),
276 messageId,
277 localEp,
278 type,
279 payload);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800280 return sendAsync(ep, message).whenComplete((r, e) -> {
281 if (e != null) {
282 callbacks.invalidate(messageId);
283 }
Yuta HIGUCHIed1ca662016-08-11 11:11:37 -0700284 }).thenComposeAsync(v -> response, executor);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800285 }
286
287 @Override
288 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900289 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800290 handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
291 }
292
293 @Override
294 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900295 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800296 handlers.put(type, message -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800297 byte[] responsePayload = null;
298 Status status = Status.OK;
299 try {
300 responsePayload = handler.apply(message.sender(), message.payload());
301 } catch (Exception e) {
302 status = Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800303 }
Madan Jampania9e70a62016-03-02 16:28:18 -0800304 sendReply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800305 }));
306 }
307
308 @Override
309 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900310 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800311 handlers.put(type, message -> {
312 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800313 Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
314 sendReply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800315 });
316 });
317 }
318
319 @Override
320 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900321 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800322 handlers.remove(type);
323 }
324
325 private void startAcceptingConnections() throws InterruptedException {
326 ServerBootstrap b = new ServerBootstrap();
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700327 b.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
328 b.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800329 b.option(ChannelOption.SO_RCVBUF, 1048576);
330 b.option(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700331 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800332 b.group(serverGroup, clientGroup);
333 b.channel(serverChannelClass);
334 if (enableNettyTls) {
335 b.childHandler(new SslServerCommunicationChannelInitializer());
336 } else {
337 b.childHandler(new OnosCommunicationChannelInitializer());
338 }
339 b.option(ChannelOption.SO_BACKLOG, 128);
340 b.childOption(ChannelOption.SO_KEEPALIVE, true);
341
342 // Bind and start to accept incoming connections.
343 b.bind(localEp.port()).sync().addListener(future -> {
344 if (future.isSuccess()) {
345 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
346 } else {
347 log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
348 }
349 });
350 }
351
352 private class OnosCommunicationChannelFactory
353 implements KeyedPoolableObjectFactory<Endpoint, Connection> {
354
355 @Override
356 public void activateObject(Endpoint endpoint, Connection connection)
357 throws Exception {
358 }
359
360 @Override
361 public void destroyObject(Endpoint ep, Connection connection) throws Exception {
362 log.debug("Closing connection to {}", ep);
363 //Is this the right way to destroy?
364 connection.destroy();
365 }
366
367 @Override
368 public Connection makeObject(Endpoint ep) throws Exception {
369 Bootstrap bootstrap = new Bootstrap();
370 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
371 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
372 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
373 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
374 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
375 bootstrap.group(clientGroup);
376 // TODO: Make this faster:
377 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
378 bootstrap.channel(clientChannelClass);
379 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
380 if (enableNettyTls) {
381 bootstrap.handler(new SslClientCommunicationChannelInitializer());
382 } else {
383 bootstrap.handler(new OnosCommunicationChannelInitializer());
384 }
385 // Start the client.
386 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700387 ChannelFuture f = bootstrap.connect(ep.host().toInetAddress(), ep.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800388
389 f.addListener(future -> {
390 if (future.isSuccess()) {
391 retFuture.complete(f.channel());
392 } else {
393 retFuture.completeExceptionally(future.cause());
394 }
395 });
396 log.debug("Established a new connection to {}", ep);
397 return new Connection(retFuture);
398 }
399
400 @Override
401 public void passivateObject(Endpoint ep, Connection connection)
402 throws Exception {
403 }
404
405 @Override
406 public boolean validateObject(Endpoint ep, Connection connection) {
407 return connection.validate();
408 }
409 }
410
411 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
412
413 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
414 private final ChannelHandler encoder = new MessageEncoder(preamble);
415
416 @Override
417 protected void initChannel(SocketChannel channel) throws Exception {
418 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
419 KeyStore ts = KeyStore.getInstance("JKS");
420 ts.load(new FileInputStream(tsLocation), tsPwd);
421 tmFactory.init(ts);
422
423 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
424 KeyStore ks = KeyStore.getInstance("JKS");
425 ks.load(new FileInputStream(ksLocation), ksPwd);
426 kmf.init(ks, ksPwd);
427
428 SSLContext serverContext = SSLContext.getInstance("TLS");
429 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
430
431 SSLEngine serverSslEngine = serverContext.createSSLEngine();
432
433 serverSslEngine.setNeedClientAuth(true);
434 serverSslEngine.setUseClientMode(false);
435 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
436 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
437 serverSslEngine.setEnableSessionCreation(true);
438
439 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
440 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700441 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800442 .addLast("handler", dispatcher);
443 }
444 }
445
446 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
447
448 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
449 private final ChannelHandler encoder = new MessageEncoder(preamble);
450
451 @Override
452 protected void initChannel(SocketChannel channel) throws Exception {
453 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
454 KeyStore ts = KeyStore.getInstance("JKS");
455 ts.load(new FileInputStream(tsLocation), tsPwd);
456 tmFactory.init(ts);
457
458 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
459 KeyStore ks = KeyStore.getInstance("JKS");
460 ks.load(new FileInputStream(ksLocation), ksPwd);
461 kmf.init(ks, ksPwd);
462
463 SSLContext clientContext = SSLContext.getInstance("TLS");
464 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
465
466 SSLEngine clientSslEngine = clientContext.createSSLEngine();
467
468 clientSslEngine.setUseClientMode(true);
469 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
470 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
471 clientSslEngine.setEnableSessionCreation(true);
472
473 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
474 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700475 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800476 .addLast("handler", dispatcher);
477 }
478 }
479
480 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
481
482 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
483 private final ChannelHandler encoder = new MessageEncoder(preamble);
484
485 @Override
486 protected void initChannel(SocketChannel channel) throws Exception {
487 channel.pipeline()
488 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700489 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800490 .addLast("handler", dispatcher);
491 }
492 }
493
494 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700495 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
496 // Effectively SimpleChannelInboundHandler<InternalMessage>,
497 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800498
499 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700500 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
501 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800502 try {
503 dispatchLocally(message);
504 } catch (RejectedExecutionException e) {
505 log.warn("Unable to dispatch message due to {}", e.getMessage());
506 }
507 }
508
509 @Override
510 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
511 log.error("Exception inside channel handling pipeline.", cause);
512 context.close();
513 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700514
515 /**
516 * Returns true if the given message should be handled.
517 *
518 * @param msg inbound message
519 * @return true if {@code msg} is {@link InternalMessage} instance.
520 *
521 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
522 */
523 @Override
524 public final boolean acceptInboundMessage(Object msg) {
525 return msg instanceof InternalMessage;
526 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800527 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700528
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800529 private void dispatchLocally(InternalMessage message) throws IOException {
Madan Jampanib825aeb2016-04-01 15:18:25 -0700530 if (message.preamble() != preamble) {
531 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
532 sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
533 }
Madan Jampani05833872016-07-12 23:01:39 -0700534 clockService.recordEventTime(message.time());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800535 String type = message.type();
536 if (REPLY_MESSAGE_TYPE.equals(type)) {
537 try {
538 Callback callback =
539 callbacks.getIfPresent(message.id());
540 if (callback != null) {
Madan Jampania9e70a62016-03-02 16:28:18 -0800541 if (message.status() == Status.OK) {
542 callback.complete(message.payload());
543 } else if (message.status() == Status.ERROR_NO_HANDLER) {
544 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
545 } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
546 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Madan Jampanib825aeb2016-04-01 15:18:25 -0700547 } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
548 callback.completeExceptionally(new MessagingException.ProcotolException());
Madan Jampania9e70a62016-03-02 16:28:18 -0800549 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800550 } else {
Madan Jampania9e70a62016-03-02 16:28:18 -0800551 log.debug("Received a reply for message id:[{}]. "
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800552 + " from {}. But was unable to locate the"
553 + " request handle", message.id(), message.sender());
554 }
555 } finally {
556 callbacks.invalidate(message.id());
557 }
558 return;
559 }
560 Consumer<InternalMessage> handler = handlers.get(type);
561 if (handler != null) {
562 handler.accept(message);
563 } else {
Madan Jampania9e70a62016-03-02 16:28:18 -0800564 log.debug("No handler for message type {}", message.type(), message.sender());
565 sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800566 }
567 }
568
Madan Jampania9e70a62016-03-02 16:28:18 -0800569 private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
Madan Jampanib825aeb2016-04-01 15:18:25 -0700570 InternalMessage response = new InternalMessage(preamble,
Madan Jampani05833872016-07-12 23:01:39 -0700571 clockService.timeNow(),
Madan Jampanib825aeb2016-04-01 15:18:25 -0700572 message.id(),
Madan Jampania9e70a62016-03-02 16:28:18 -0800573 localEp,
574 REPLY_MESSAGE_TYPE,
575 responsePayload.orElse(new byte[0]),
576 status);
577 sendAsync(message.sender(), response).whenComplete((result, error) -> {
578 if (error != null) {
579 log.debug("Failed to respond", error);
580 }
581 });
582 }
583
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800584 private final class Callback {
585 private final CompletableFuture<byte[]> future;
586 private final Executor executor;
587
588 public Callback(CompletableFuture<byte[]> future, Executor executor) {
589 this.future = future;
590 this.executor = executor;
591 }
592
593 public void complete(byte[] value) {
594 executor.execute(() -> future.complete(value));
595 }
596
597 public void completeExceptionally(Throwable error) {
598 executor.execute(() -> future.completeExceptionally(error));
599 }
600 }
601 private final class Connection {
602 private final CompletableFuture<Channel> internalFuture;
603
604 public Connection(CompletableFuture<Channel> internalFuture) {
605 this.internalFuture = internalFuture;
606 }
607
608 /**
609 * Sends a message out on its channel and associated the message with a
610 * completable future used for signaling.
611 * @param message the message to be sent
612 * @param future a future that is completed normally or exceptionally if
613 * message sending succeeds or fails respectively
614 */
615 public void send(Object message, CompletableFuture<Void> future) {
616 internalFuture.whenComplete((channel, throwable) -> {
617 if (throwable == null) {
618 channel.writeAndFlush(message).addListener(channelFuture -> {
619 if (!channelFuture.isSuccess()) {
620 future.completeExceptionally(channelFuture.cause());
621 } else {
622 future.complete(null);
623 }
624 });
625 } else {
626 future.completeExceptionally(throwable);
627 }
628 });
629 }
630
631 /**
632 * Destroys a channel by closing its channel (if it exists) and
633 * cancelling its future.
634 */
635 public void destroy() {
636 Channel channel = internalFuture.getNow(null);
637 if (channel != null) {
638 channel.close();
639 }
640 internalFuture.cancel(false);
641 }
642
643 /**
644 * Determines whether the connection is valid meaning it is either
645 * complete with and active channel
646 * or it has not yet completed.
647 * @return true if the channel has an active connection or has not
648 * yet completed
649 */
650 public boolean validate() {
651 if (internalFuture.isCompletedExceptionally()) {
652 return false;
653 }
654 Channel channel = internalFuture.getNow(null);
655 return channel == null || channel.isActive();
656 }
657 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900658}