blob: f3b028c5d412e4c711ef325eaed0d126681a8d92 [file] [log] [blame]
Thomas Vachuska58de4162015-09-10 16:15:33 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuska58de4162015-09-10 16:15:33 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070016package org.onosproject.store.cluster.messaging.impl;
17
JunHuy Lam39eb4292015-06-26 17:24:23 +090018import com.google.common.base.Strings;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.util.concurrent.MoreExecutors;
Madan Jampania9e70a62016-03-02 16:28:18 -080024
Aaron Kruglikov1b727382016-02-09 16:17:47 -080025import io.netty.bootstrap.Bootstrap;
26import io.netty.bootstrap.ServerBootstrap;
27import io.netty.buffer.PooledByteBufAllocator;
28import io.netty.channel.Channel;
29import io.netty.channel.ChannelFuture;
30import io.netty.channel.ChannelHandler;
31import io.netty.channel.ChannelHandlerContext;
32import io.netty.channel.ChannelInitializer;
33import io.netty.channel.ChannelOption;
34import io.netty.channel.EventLoopGroup;
35import io.netty.channel.ServerChannel;
36import io.netty.channel.SimpleChannelInboundHandler;
Jon Hall9a44d6a2017-03-02 18:14:37 -080037import io.netty.channel.WriteBufferWaterMark;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080038import io.netty.channel.epoll.EpollEventLoopGroup;
39import io.netty.channel.epoll.EpollServerSocketChannel;
40import io.netty.channel.epoll.EpollSocketChannel;
41import io.netty.channel.nio.NioEventLoopGroup;
42import io.netty.channel.socket.SocketChannel;
43import io.netty.channel.socket.nio.NioServerSocketChannel;
44import io.netty.channel.socket.nio.NioSocketChannel;
45import org.apache.commons.pool.KeyedPoolableObjectFactory;
46import org.apache.commons.pool.impl.GenericKeyedObjectPool;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070047import org.apache.felix.scr.annotations.Activate;
48import org.apache.felix.scr.annotations.Component;
49import org.apache.felix.scr.annotations.Deactivate;
50import org.apache.felix.scr.annotations.Reference;
51import org.apache.felix.scr.annotations.ReferenceCardinality;
52import org.apache.felix.scr.annotations.Service;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080053import org.onlab.util.Tools;
Madan Jampaniec1df022015-10-13 21:23:03 -070054import org.onosproject.cluster.ClusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070055import org.onosproject.cluster.ControllerNode;
Madan Jampani05833872016-07-12 23:01:39 -070056import org.onosproject.core.HybridLogicalClockService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070057import org.onosproject.store.cluster.messaging.Endpoint;
Madan Jampania9e70a62016-03-02 16:28:18 -080058import org.onosproject.store.cluster.messaging.MessagingException;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080059import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampania9e70a62016-03-02 16:28:18 -080060import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070061import org.slf4j.Logger;
62import org.slf4j.LoggerFactory;
63
Aaron Kruglikov1b727382016-02-09 16:17:47 -080064import javax.net.ssl.KeyManagerFactory;
65import javax.net.ssl.SSLContext;
66import javax.net.ssl.SSLEngine;
67import javax.net.ssl.TrustManagerFactory;
Madan Jampania9e70a62016-03-02 16:28:18 -080068
Aaron Kruglikov1b727382016-02-09 16:17:47 -080069import java.io.FileInputStream;
70import java.io.IOException;
71import java.security.KeyStore;
72import java.util.Map;
Madan Jampania9e70a62016-03-02 16:28:18 -080073import java.util.Optional;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080074import java.util.concurrent.CompletableFuture;
75import java.util.concurrent.ConcurrentHashMap;
76import java.util.concurrent.Executor;
77import java.util.concurrent.RejectedExecutionException;
78import java.util.concurrent.TimeUnit;
79import java.util.concurrent.TimeoutException;
80import java.util.concurrent.atomic.AtomicBoolean;
81import java.util.concurrent.atomic.AtomicLong;
82import java.util.function.BiConsumer;
83import java.util.function.BiFunction;
84import java.util.function.Consumer;
85
Yuta HIGUCHI90a16892016-07-20 20:36:08 -070086import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090087import static org.onosproject.security.AppGuard.checkPermission;
88import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
89
Madan Jampaniafeebbd2015-05-19 15:26:01 -070090/**
91 * Netty based MessagingService.
92 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070093@Component(immediate = true)
Madan Jampaniafeebbd2015-05-19 15:26:01 -070094@Service
Aaron Kruglikov1b727382016-02-09 16:17:47 -080095public class NettyMessagingManager implements MessagingService {
96
Jordan Halterman0ad49b12017-05-08 16:45:50 -070097 private static final int REPLY_TIME_OUT_MILLIS = 500;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080098 private static final short MIN_KS_LENGTH = 6;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070099
100 private final Logger log = LoggerFactory.getLogger(getClass());
101
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800102 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
103
Madan Jampani05833872016-07-12 23:01:39 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected HybridLogicalClockService clockService;
106
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800107 private Endpoint localEp;
108 private int preamble;
109 private final AtomicBoolean started = new AtomicBoolean(false);
110 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
111 private final AtomicLong messageIdGenerator = new AtomicLong(0);
112 private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
Madan Jampanif3ab4242016-07-27 17:21:47 -0700113 .expireAfterWrite(REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS)
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800114 .removalListener(new RemovalListener<Long, Callback>() {
115 @Override
116 public void onRemoval(RemovalNotification<Long, Callback> entry) {
117 if (entry.wasEvicted()) {
118 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
119 }
120 }
121 })
122 .build();
123
124 private final GenericKeyedObjectPool<Endpoint, Connection> channels
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700125 = new GenericKeyedObjectPool<>(new OnosCommunicationChannelFactory());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800126
127 private EventLoopGroup serverGroup;
128 private EventLoopGroup clientGroup;
129 private Class<? extends ServerChannel> serverChannelClass;
130 private Class<? extends Channel> clientChannelClass;
131
132 protected static final boolean TLS_DISABLED = false;
133 protected boolean enableNettyTls = TLS_DISABLED;
134
135 protected String ksLocation;
136 protected String tsLocation;
137 protected char[] ksPwd;
138 protected char[] tsPwd;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900139
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniec1df022015-10-13 21:23:03 -0700141 protected ClusterMetadataService clusterMetadataService;
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700142
143 @Activate
144 public void activate() throws Exception {
Madan Jampaniec1df022015-10-13 21:23:03 -0700145 ControllerNode localNode = clusterMetadataService.getLocalNode();
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800146 getTlsParameters();
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800147
148 if (started.get()) {
149 log.warn("Already running at local endpoint: {}", localEp);
150 return;
151 }
152 this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
153 this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
154 channels.setLifo(true);
155 channels.setTestOnBorrow(true);
156 channels.setTestOnReturn(true);
157 channels.setMinEvictableIdleTimeMillis(60_000L);
158 channels.setTimeBetweenEvictionRunsMillis(30_000L);
159 initEventLoopGroup();
160 startAcceptingConnections();
161 started.set(true);
Madan Jampanif3ab4242016-07-27 17:21:47 -0700162 serverGroup.scheduleWithFixedDelay(callbacks::cleanUp, 0, REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700163 log.info("Started");
164 }
165
166 @Deactivate
167 public void deactivate() throws Exception {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800168 if (started.get()) {
169 channels.close();
170 serverGroup.shutdownGracefully();
171 clientGroup.shutdownGracefully();
172 started.set(false);
173 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700174 log.info("Stopped");
175 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900176
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800177 private void getTlsParameters() {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900178 String tempString = System.getProperty("enableNettyTLS");
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800179 enableNettyTls = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString);
180 log.info("enableNettyTLS = {}", enableNettyTls);
181 if (enableNettyTls) {
JunHuy Lam39eb4292015-06-26 17:24:23 +0900182 ksLocation = System.getProperty("javax.net.ssl.keyStore");
183 if (Strings.isNullOrEmpty(ksLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800184 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900185 return;
186 }
187 tsLocation = System.getProperty("javax.net.ssl.trustStore");
188 if (Strings.isNullOrEmpty(tsLocation)) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800189 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900190 return;
191 }
192 ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
193 if (MIN_KS_LENGTH > ksPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800194 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900195 return;
196 }
197 tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
198 if (MIN_KS_LENGTH > tsPwd.length) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800199 enableNettyTls = TLS_DISABLED;
JunHuy Lam39eb4292015-06-26 17:24:23 +0900200 return;
201 }
202 }
203 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800204 private void initEventLoopGroup() {
205 // try Epoll first and if that does work, use nio.
206 try {
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700207 clientGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollC-%d", log));
208 serverGroup = new EpollEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "epollS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800209 serverChannelClass = EpollServerSocketChannel.class;
210 clientChannelClass = EpollSocketChannel.class;
211 return;
212 } catch (Throwable e) {
213 log.debug("Failed to initialize native (epoll) transport. "
214 + "Reason: {}. Proceeding with nio.", e.getMessage());
215 }
Yuta HIGUCHI90a16892016-07-20 20:36:08 -0700216 clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
217 serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800218 serverChannelClass = NioServerSocketChannel.class;
219 clientChannelClass = NioSocketChannel.class;
220 }
221
222 @Override
223 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900224 checkPermission(CLUSTER_WRITE);
Madan Jampanib825aeb2016-04-01 15:18:25 -0700225 InternalMessage message = new InternalMessage(preamble,
Madan Jampani05833872016-07-12 23:01:39 -0700226 clockService.timeNow(),
Madan Jampanib825aeb2016-04-01 15:18:25 -0700227 messageIdGenerator.incrementAndGet(),
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800228 localEp,
229 type,
230 payload);
231 return sendAsync(ep, message);
232 }
233
234 protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900235 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800236 if (ep.equals(localEp)) {
237 try {
238 dispatchLocally(message);
239 } catch (IOException e) {
240 return Tools.exceptionalFuture(e);
241 }
242 return CompletableFuture.completedFuture(null);
243 }
244
245 CompletableFuture<Void> future = new CompletableFuture<>();
246 try {
247 Connection connection = null;
248 try {
249 connection = channels.borrowObject(ep);
250 connection.send(message, future);
251 } finally {
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700252 if (connection != null) {
253 channels.returnObject(ep, connection);
254 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800255 }
256 } catch (Exception e) {
257 future.completeExceptionally(e);
258 }
259 return future;
260 }
261
262 @Override
263 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900264 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800265 return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
266 }
267
268 @Override
269 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900270 checkPermission(CLUSTER_WRITE);
Jordan Halterman041534b2017-03-21 10:37:33 -0700271 CompletableFuture<byte[]> future = new CompletableFuture<>();
272 Callback callback = new Callback(future, executor);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800273 Long messageId = messageIdGenerator.incrementAndGet();
274 callbacks.put(messageId, callback);
Madan Jampani05833872016-07-12 23:01:39 -0700275 InternalMessage message = new InternalMessage(preamble,
276 clockService.timeNow(),
277 messageId,
278 localEp,
279 type,
280 payload);
Jordan Halterman041534b2017-03-21 10:37:33 -0700281
282 sendAsync(ep, message).whenComplete((response, error) -> {
283 if (error != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800284 callbacks.invalidate(messageId);
Jordan Halterman041534b2017-03-21 10:37:33 -0700285 callback.completeExceptionally(error);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800286 }
Jordan Halterman041534b2017-03-21 10:37:33 -0700287 });
288 return future;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800289 }
290
291 @Override
292 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900293 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800294 handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
295 }
296
297 @Override
298 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900299 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800300 handlers.put(type, message -> executor.execute(() -> {
Madan Jampania9e70a62016-03-02 16:28:18 -0800301 byte[] responsePayload = null;
302 Status status = Status.OK;
303 try {
304 responsePayload = handler.apply(message.sender(), message.payload());
305 } catch (Exception e) {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700306 log.debug("An error occurred in a message handler: {}", e);
Madan Jampania9e70a62016-03-02 16:28:18 -0800307 status = Status.ERROR_HANDLER_EXCEPTION;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800308 }
Madan Jampania9e70a62016-03-02 16:28:18 -0800309 sendReply(message, status, Optional.ofNullable(responsePayload));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800310 }));
311 }
312
313 @Override
314 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900315 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800316 handlers.put(type, message -> {
317 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700318 Status status;
319 if (error == null) {
320 status = Status.OK;
321 } else {
322 log.debug("An error occurred in a message handler: {}", error);
323 status = Status.ERROR_HANDLER_EXCEPTION;
324 }
Madan Jampania9e70a62016-03-02 16:28:18 -0800325 sendReply(message, status, Optional.ofNullable(result));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800326 });
327 });
328 }
329
330 @Override
331 public void unregisterHandler(String type) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900332 checkPermission(CLUSTER_WRITE);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800333 handlers.remove(type);
334 }
335
336 private void startAcceptingConnections() throws InterruptedException {
337 ServerBootstrap b = new ServerBootstrap();
Jon Hall9a44d6a2017-03-02 18:14:37 -0800338 b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
339 new WriteBufferWaterMark(8 * 1024, 32 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800340 b.option(ChannelOption.SO_RCVBUF, 1048576);
341 b.option(ChannelOption.TCP_NODELAY, true);
Yuta HIGUCHIb47c9532016-08-22 09:41:23 -0700342 b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800343 b.group(serverGroup, clientGroup);
344 b.channel(serverChannelClass);
345 if (enableNettyTls) {
346 b.childHandler(new SslServerCommunicationChannelInitializer());
347 } else {
348 b.childHandler(new OnosCommunicationChannelInitializer());
349 }
350 b.option(ChannelOption.SO_BACKLOG, 128);
351 b.childOption(ChannelOption.SO_KEEPALIVE, true);
352
353 // Bind and start to accept incoming connections.
354 b.bind(localEp.port()).sync().addListener(future -> {
355 if (future.isSuccess()) {
356 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
357 } else {
Jon Hall9a44d6a2017-03-02 18:14:37 -0800358 log.warn("{} failed to bind to port {} due to {}", localEp.host(), localEp.port(), future.cause());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800359 }
360 });
361 }
362
363 private class OnosCommunicationChannelFactory
364 implements KeyedPoolableObjectFactory<Endpoint, Connection> {
365
366 @Override
Jon Hall9a44d6a2017-03-02 18:14:37 -0800367 public void activateObject(Endpoint endpoint, Connection connection)
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800368 throws Exception {
369 }
370
371 @Override
372 public void destroyObject(Endpoint ep, Connection connection) throws Exception {
Jon Hall9a44d6a2017-03-02 18:14:37 -0800373 log.debug("Closing connection {} to {}", connection, ep);
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800374 //Is this the right way to destroy?
375 connection.destroy();
376 }
377
378 @Override
379 public Connection makeObject(Endpoint ep) throws Exception {
380 Bootstrap bootstrap = new Bootstrap();
381 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Jon Hall9a44d6a2017-03-02 18:14:37 -0800382 bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
383 new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800384 bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
385 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
386 bootstrap.group(clientGroup);
387 // TODO: Make this faster:
388 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
389 bootstrap.channel(clientChannelClass);
390 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
391 if (enableNettyTls) {
392 bootstrap.handler(new SslClientCommunicationChannelInitializer());
393 } else {
394 bootstrap.handler(new OnosCommunicationChannelInitializer());
395 }
396 // Start the client.
397 CompletableFuture<Channel> retFuture = new CompletableFuture<>();
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700398 ChannelFuture f = bootstrap.connect(ep.host().toInetAddress(), ep.port());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800399
400 f.addListener(future -> {
401 if (future.isSuccess()) {
402 retFuture.complete(f.channel());
403 } else {
404 retFuture.completeExceptionally(future.cause());
405 }
406 });
407 log.debug("Established a new connection to {}", ep);
408 return new Connection(retFuture);
409 }
410
411 @Override
412 public void passivateObject(Endpoint ep, Connection connection)
413 throws Exception {
414 }
415
416 @Override
417 public boolean validateObject(Endpoint ep, Connection connection) {
418 return connection.validate();
419 }
420 }
421
422 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
423
424 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
425 private final ChannelHandler encoder = new MessageEncoder(preamble);
426
427 @Override
428 protected void initChannel(SocketChannel channel) throws Exception {
429 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
430 KeyStore ts = KeyStore.getInstance("JKS");
431 ts.load(new FileInputStream(tsLocation), tsPwd);
432 tmFactory.init(ts);
433
434 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
435 KeyStore ks = KeyStore.getInstance("JKS");
436 ks.load(new FileInputStream(ksLocation), ksPwd);
437 kmf.init(ks, ksPwd);
438
439 SSLContext serverContext = SSLContext.getInstance("TLS");
440 serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
441
442 SSLEngine serverSslEngine = serverContext.createSSLEngine();
443
444 serverSslEngine.setNeedClientAuth(true);
445 serverSslEngine.setUseClientMode(false);
446 serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
447 serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
448 serverSslEngine.setEnableSessionCreation(true);
449
450 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
451 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700452 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800453 .addLast("handler", dispatcher);
454 }
455 }
456
457 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
458
459 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
460 private final ChannelHandler encoder = new MessageEncoder(preamble);
461
462 @Override
463 protected void initChannel(SocketChannel channel) throws Exception {
464 TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
465 KeyStore ts = KeyStore.getInstance("JKS");
466 ts.load(new FileInputStream(tsLocation), tsPwd);
467 tmFactory.init(ts);
468
469 KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
470 KeyStore ks = KeyStore.getInstance("JKS");
471 ks.load(new FileInputStream(ksLocation), ksPwd);
472 kmf.init(ks, ksPwd);
473
474 SSLContext clientContext = SSLContext.getInstance("TLS");
475 clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
476
477 SSLEngine clientSslEngine = clientContext.createSSLEngine();
478
479 clientSslEngine.setUseClientMode(true);
480 clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
481 clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
482 clientSslEngine.setEnableSessionCreation(true);
483
484 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
485 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700486 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800487 .addLast("handler", dispatcher);
488 }
489 }
490
491 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
492
493 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
494 private final ChannelHandler encoder = new MessageEncoder(preamble);
495
496 @Override
497 protected void initChannel(SocketChannel channel) throws Exception {
498 channel.pipeline()
499 .addLast("encoder", encoder)
Madan Jampanib825aeb2016-04-01 15:18:25 -0700500 .addLast("decoder", new MessageDecoder())
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800501 .addLast("handler", dispatcher);
502 }
503 }
504
505 @ChannelHandler.Sharable
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700506 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
507 // Effectively SimpleChannelInboundHandler<InternalMessage>,
508 // had to specify <Object> to avoid Class Loader not being able to find some classes.
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800509
510 @Override
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700511 protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
512 InternalMessage message = (InternalMessage) rawMessage;
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800513 try {
514 dispatchLocally(message);
515 } catch (RejectedExecutionException e) {
516 log.warn("Unable to dispatch message due to {}", e.getMessage());
517 }
518 }
519
520 @Override
521 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
522 log.error("Exception inside channel handling pipeline.", cause);
523 context.close();
524 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700525
526 /**
527 * Returns true if the given message should be handled.
528 *
529 * @param msg inbound message
530 * @return true if {@code msg} is {@link InternalMessage} instance.
531 *
532 * @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
533 */
534 @Override
535 public final boolean acceptInboundMessage(Object msg) {
536 return msg instanceof InternalMessage;
537 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800538 }
Yuta HIGUCHIc012dda2016-08-17 00:43:46 -0700539
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800540 private void dispatchLocally(InternalMessage message) throws IOException {
Madan Jampanib825aeb2016-04-01 15:18:25 -0700541 if (message.preamble() != preamble) {
542 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
543 sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
544 }
Madan Jampani05833872016-07-12 23:01:39 -0700545 clockService.recordEventTime(message.time());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800546 String type = message.type();
547 if (REPLY_MESSAGE_TYPE.equals(type)) {
548 try {
549 Callback callback =
550 callbacks.getIfPresent(message.id());
551 if (callback != null) {
Madan Jampania9e70a62016-03-02 16:28:18 -0800552 if (message.status() == Status.OK) {
553 callback.complete(message.payload());
554 } else if (message.status() == Status.ERROR_NO_HANDLER) {
555 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
556 } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
557 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
Madan Jampanib825aeb2016-04-01 15:18:25 -0700558 } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
Jonathan Harte255cc42016-09-12 14:50:24 -0700559 callback.completeExceptionally(new MessagingException.ProtocolException());
Madan Jampania9e70a62016-03-02 16:28:18 -0800560 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800561 } else {
Madan Jampania9e70a62016-03-02 16:28:18 -0800562 log.debug("Received a reply for message id:[{}]. "
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800563 + " from {}. But was unable to locate the"
564 + " request handle", message.id(), message.sender());
565 }
566 } finally {
567 callbacks.invalidate(message.id());
568 }
569 return;
570 }
571 Consumer<InternalMessage> handler = handlers.get(type);
572 if (handler != null) {
573 handler.accept(message);
574 } else {
Jon Hall9a44d6a2017-03-02 18:14:37 -0800575 log.debug("No handler for message type {} from {}", message.type(), message.sender());
Madan Jampania9e70a62016-03-02 16:28:18 -0800576 sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800577 }
578 }
579
Madan Jampania9e70a62016-03-02 16:28:18 -0800580 private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
Madan Jampanib825aeb2016-04-01 15:18:25 -0700581 InternalMessage response = new InternalMessage(preamble,
Madan Jampani05833872016-07-12 23:01:39 -0700582 clockService.timeNow(),
Madan Jampanib825aeb2016-04-01 15:18:25 -0700583 message.id(),
Madan Jampania9e70a62016-03-02 16:28:18 -0800584 localEp,
585 REPLY_MESSAGE_TYPE,
586 responsePayload.orElse(new byte[0]),
587 status);
588 sendAsync(message.sender(), response).whenComplete((result, error) -> {
589 if (error != null) {
590 log.debug("Failed to respond", error);
591 }
592 });
593 }
594
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800595 private final class Callback {
596 private final CompletableFuture<byte[]> future;
597 private final Executor executor;
598
599 public Callback(CompletableFuture<byte[]> future, Executor executor) {
600 this.future = future;
601 this.executor = executor;
602 }
603
604 public void complete(byte[] value) {
605 executor.execute(() -> future.complete(value));
606 }
607
608 public void completeExceptionally(Throwable error) {
609 executor.execute(() -> future.completeExceptionally(error));
610 }
611 }
612 private final class Connection {
613 private final CompletableFuture<Channel> internalFuture;
614
615 public Connection(CompletableFuture<Channel> internalFuture) {
616 this.internalFuture = internalFuture;
617 }
618
619 /**
620 * Sends a message out on its channel and associated the message with a
621 * completable future used for signaling.
622 * @param message the message to be sent
623 * @param future a future that is completed normally or exceptionally if
624 * message sending succeeds or fails respectively
625 */
626 public void send(Object message, CompletableFuture<Void> future) {
627 internalFuture.whenComplete((channel, throwable) -> {
628 if (throwable == null) {
629 channel.writeAndFlush(message).addListener(channelFuture -> {
630 if (!channelFuture.isSuccess()) {
631 future.completeExceptionally(channelFuture.cause());
632 } else {
633 future.complete(null);
634 }
635 });
636 } else {
637 future.completeExceptionally(throwable);
638 }
639 });
640 }
641
642 /**
643 * Destroys a channel by closing its channel (if it exists) and
644 * cancelling its future.
645 */
646 public void destroy() {
647 Channel channel = internalFuture.getNow(null);
648 if (channel != null) {
649 channel.close();
650 }
651 internalFuture.cancel(false);
652 }
653
654 /**
655 * Determines whether the connection is valid meaning it is either
656 * complete with and active channel
657 * or it has not yet completed.
658 * @return true if the channel has an active connection or has not
659 * yet completed
660 */
661 public boolean validate() {
662 if (internalFuture.isCompletedExceptionally()) {
663 return false;
664 }
665 Channel channel = internalFuture.getNow(null);
666 return channel == null || channel.isActive();
667 }
668 }
JunHuy Lam39eb4292015-06-26 17:24:23 +0900669}