blob: b378f3ace68aa60c109083efb208ab7260ff5c8c [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070014import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070015import io.netty.channel.ChannelHandlerContext;
16import io.netty.channel.ChannelInitializer;
17import io.netty.channel.ChannelOption;
18import io.netty.channel.EventLoopGroup;
Madan Jampani824a7c12014-10-21 09:46:15 -070019import io.netty.channel.ServerChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070020import io.netty.channel.SimpleChannelInboundHandler;
Madan Jampani824a7c12014-10-21 09:46:15 -070021import io.netty.channel.epoll.Epoll;
22import io.netty.channel.epoll.EpollEventLoopGroup;
23import io.netty.channel.epoll.EpollServerSocketChannel;
24import io.netty.channel.epoll.EpollSocketChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070025import io.netty.channel.nio.NioEventLoopGroup;
26import io.netty.channel.socket.SocketChannel;
27import io.netty.channel.socket.nio.NioServerSocketChannel;
28import io.netty.channel.socket.nio.NioSocketChannel;
29
30import org.apache.commons.lang.math.RandomUtils;
Madan Jampaniab6d3112014-10-02 16:30:14 -070031import org.apache.commons.pool.KeyedPoolableObjectFactory;
32import org.apache.commons.pool.impl.GenericKeyedObjectPool;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
36import com.google.common.cache.Cache;
37import com.google.common.cache.CacheBuilder;
38
39/**
40 * A Netty based implementation of MessagingService.
41 */
42public class NettyMessagingService implements MessagingService {
43
44 private final Logger log = LoggerFactory.getLogger(getClass());
45
Madan Jampaniab6d3112014-10-02 16:30:14 -070046 private final int port;
Madan Jampaniddf76222014-10-04 23:48:44 -070047 private final Endpoint localEp;
Madan Jampaniab6d3112014-10-02 16:30:14 -070048 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
Madan Jampani53e44e62014-10-07 12:39:51 -070049 private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
Madan Jampaniddf76222014-10-04 23:48:44 -070050 .maximumSize(100000)
51 .weakValues()
52 // TODO: Once the entry expires, notify blocking threads (if any).
53 .expireAfterWrite(10, TimeUnit.MINUTES)
54 .build();
55 private final GenericKeyedObjectPool<Endpoint, Channel> channels
56 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -070057
Madan Jampani824a7c12014-10-21 09:46:15 -070058 private EventLoopGroup serverGroup;
59 private EventLoopGroup clientGroup;
60 private Class<? extends ServerChannel> serverChannelClass;
61 private Class<? extends Channel> clientChannelClass;
62
Madan Jampani5e83f332014-10-20 15:35:09 -070063 private void initEventLoopGroup() {
Madan Jampani824a7c12014-10-21 09:46:15 -070064 // try Epoll first and if that does work, use nio.
65 // TODO: make this configurable.
66 try {
67 if (Epoll.isAvailable()) {
68 clientGroup = new EpollEventLoopGroup();
69 serverGroup = new EpollEventLoopGroup();
70 serverChannelClass = EpollServerSocketChannel.class;
71 clientChannelClass = EpollSocketChannel.class;
72 return;
73 } else {
74 log.info("Netty epoll support is not available. Proceeding with nio.");
75 }
76
77 } catch (Throwable t) {
78 log.warn("Failed to initialize epoll sockets. Proceeding with nio.", t);
79 }
80 clientGroup = new NioEventLoopGroup();
81 serverGroup = new NioEventLoopGroup();
82 serverChannelClass = NioServerSocketChannel.class;
83 clientChannelClass = NioSocketChannel.class;
Madan Jampani5e83f332014-10-20 15:35:09 -070084 }
85
Madan Jampaniab6d3112014-10-02 16:30:14 -070086 public NettyMessagingService() {
87 // TODO: Default port should be configurable.
88 this(8080);
89 }
90
91 // FIXME: Constructor should not throw exceptions.
92 public NettyMessagingService(int port) {
93 this.port = port;
94 try {
95 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
96 } catch (UnknownHostException e) {
97 // bailing out.
98 throw new RuntimeException(e);
99 }
100 }
101
102 public void activate() throws Exception {
Madan Jampani86ed0552014-10-03 16:45:42 -0700103 channels.setTestOnBorrow(true);
104 channels.setTestOnReturn(true);
Madan Jampani5e83f332014-10-20 15:35:09 -0700105 initEventLoopGroup();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700106 startAcceptingConnections();
107 }
108
109 public void deactivate() throws Exception {
110 channels.close();
Madan Jampani824a7c12014-10-21 09:46:15 -0700111 serverGroup.shutdownGracefully();
112 clientGroup.shutdownGracefully();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700113 }
114
115 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700116 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700117 InternalMessage message = new InternalMessage.Builder(this)
118 .withId(RandomUtils.nextLong())
119 .withSender(localEp)
120 .withType(type)
121 .withPayload(payload)
122 .build();
123 sendAsync(ep, message);
124 }
125
126 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
127 Channel channel = null;
128 try {
Madan Jampani86ed0552014-10-03 16:45:42 -0700129 try {
130 channel = channels.borrowObject(ep);
131 channel.eventLoop().execute(new WriteTask(channel, message));
132 } finally {
133 channels.returnObject(ep, channel);
134 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700135 } catch (Exception e) {
136 throw new IOException(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700137 }
138 }
139
140 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700141 public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
Madan Jampaniab6d3112014-10-02 16:30:14 -0700142 throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700143 AsyncResponse futureResponse = new AsyncResponse();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700144 Long messageId = RandomUtils.nextLong();
145 responseFutures.put(messageId, futureResponse);
146 InternalMessage message = new InternalMessage.Builder(this)
147 .withId(messageId)
148 .withSender(localEp)
149 .withType(type)
150 .withPayload(payload)
151 .build();
152 sendAsync(ep, message);
153 return futureResponse;
154 }
155
156 @Override
157 public void registerHandler(String type, MessageHandler handler) {
158 // TODO: Is this the right semantics for handler registration?
159 handlers.putIfAbsent(type, handler);
160 }
161
162 public void unregisterHandler(String type) {
163 handlers.remove(type);
164 }
165
166 private MessageHandler getMessageHandler(String type) {
167 return handlers.get(type);
168 }
169
170 private void startAcceptingConnections() throws InterruptedException {
171 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700172 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700173 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
174 // TODO: Need JVM options to configure PooledByteBufAllocator.
Madan Jampaniab6d3112014-10-02 16:30:14 -0700175 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Madan Jampani824a7c12014-10-21 09:46:15 -0700176 b.group(serverGroup, clientGroup)
177 .channel(serverChannelClass)
Madan Jampaniab6d3112014-10-02 16:30:14 -0700178 .childHandler(new OnosCommunicationChannelInitializer())
179 .option(ChannelOption.SO_BACKLOG, 128)
180 .childOption(ChannelOption.SO_KEEPALIVE, true);
181
182 // Bind and start to accept incoming connections.
183 b.bind(port).sync();
184 }
185
186 private class OnosCommunicationChannelFactory
187 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
188
189 @Override
190 public void activateObject(Endpoint endpoint, Channel channel)
191 throws Exception {
192 }
193
194 @Override
195 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
196 channel.close();
197 }
198
199 @Override
200 public Channel makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700201 Bootstrap bootstrap = new Bootstrap();
202 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
203 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
204 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
Madan Jampani824a7c12014-10-21 09:46:15 -0700205 bootstrap.group(clientGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700206 // TODO: Make this faster:
207 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampani824a7c12014-10-21 09:46:15 -0700208 bootstrap.channel(clientChannelClass);
Madan Jampaniddf76222014-10-04 23:48:44 -0700209 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
210 bootstrap.handler(new OnosCommunicationChannelInitializer());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700211 // Start the client.
Madan Jampaniddf76222014-10-04 23:48:44 -0700212 ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700213 return f.channel();
214 }
215
216 @Override
217 public void passivateObject(Endpoint ep, Channel channel)
218 throws Exception {
219 }
220
221 @Override
222 public boolean validateObject(Endpoint ep, Channel channel) {
223 return channel.isOpen();
224 }
225 }
226
227 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
228
Madan Jampaniddf76222014-10-04 23:48:44 -0700229 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Madan Jampani53e44e62014-10-07 12:39:51 -0700230 private final ChannelHandler encoder = new MessageEncoder();
Madan Jampaniddf76222014-10-04 23:48:44 -0700231
Madan Jampaniab6d3112014-10-02 16:30:14 -0700232 @Override
233 protected void initChannel(SocketChannel channel) throws Exception {
234 channel.pipeline()
Madan Jampaniddf76222014-10-04 23:48:44 -0700235 .addLast("encoder", encoder)
Madan Jampani53e44e62014-10-07 12:39:51 -0700236 .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
Madan Jampaniddf76222014-10-04 23:48:44 -0700237 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700238 }
239 }
240
241 private class WriteTask implements Runnable {
242
Madan Jampani86ed0552014-10-03 16:45:42 -0700243 private final InternalMessage message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700244 private final Channel channel;
245
Madan Jampani86ed0552014-10-03 16:45:42 -0700246 public WriteTask(Channel channel, InternalMessage message) {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700247 this.channel = channel;
Madan Jampani86ed0552014-10-03 16:45:42 -0700248 this.message = message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700249 }
250
251 @Override
252 public void run() {
Madan Jampaniddf76222014-10-04 23:48:44 -0700253 channel.writeAndFlush(message, channel.voidPromise());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700254 }
255 }
256
Madan Jampaniddf76222014-10-04 23:48:44 -0700257 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700258 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
259
260 @Override
261 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
262 String type = message.type();
263 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
264 try {
Madan Jampani53e44e62014-10-07 12:39:51 -0700265 AsyncResponse futureResponse =
Madan Jampaniab6d3112014-10-02 16:30:14 -0700266 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
267 if (futureResponse != null) {
268 futureResponse.setResponse(message.payload());
Madan Jampanif1d425a2014-10-07 09:52:36 -0700269 } else {
270 log.warn("Received a reply. But was unable to locate the request handle");
Madan Jampaniab6d3112014-10-02 16:30:14 -0700271 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700272 } finally {
273 NettyMessagingService.this.responseFutures.invalidate(message.id());
274 }
275 return;
276 }
277 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
278 handler.handle(message);
279 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700280
Madan Jampani86ed0552014-10-03 16:45:42 -0700281 @Override
282 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700283 log.error("Exception inside channel handling pipeline.", cause);
Madan Jampani86ed0552014-10-03 16:45:42 -0700284 context.close();
285 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700286 }
287}