blob: f88bcdb34b353bda99ce26e93939505e344bde63 [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070014import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070015import io.netty.channel.ChannelHandlerContext;
16import io.netty.channel.ChannelInitializer;
17import io.netty.channel.ChannelOption;
18import io.netty.channel.EventLoopGroup;
19import io.netty.channel.SimpleChannelInboundHandler;
Madan Jampani5e83f332014-10-20 15:35:09 -070020import io.netty.channel.epoll.EpollEventLoopGroup;
21import io.netty.channel.epoll.EpollSocketChannel;
Madan Jampaniab6d3112014-10-02 16:30:14 -070022import io.netty.channel.nio.NioEventLoopGroup;
23import io.netty.channel.socket.SocketChannel;
24import io.netty.channel.socket.nio.NioServerSocketChannel;
25import io.netty.channel.socket.nio.NioSocketChannel;
26
27import org.apache.commons.lang.math.RandomUtils;
Madan Jampaniab6d3112014-10-02 16:30:14 -070028import org.apache.commons.pool.KeyedPoolableObjectFactory;
29import org.apache.commons.pool.impl.GenericKeyedObjectPool;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import com.google.common.cache.Cache;
34import com.google.common.cache.CacheBuilder;
35
36/**
37 * A Netty based implementation of MessagingService.
38 */
39public class NettyMessagingService implements MessagingService {
40
41 private final Logger log = LoggerFactory.getLogger(getClass());
42
Madan Jampaniab6d3112014-10-02 16:30:14 -070043 private final int port;
Madan Jampaniddf76222014-10-04 23:48:44 -070044 private final Endpoint localEp;
Madan Jampaniab6d3112014-10-02 16:30:14 -070045 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
Madan Jampani5e83f332014-10-20 15:35:09 -070046 private EventLoopGroup workerGroup;
47 private Class<? extends Channel> channelClass;
Madan Jampaniab6d3112014-10-02 16:30:14 -070048 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
Madan Jampani53e44e62014-10-07 12:39:51 -070049 private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
Madan Jampaniddf76222014-10-04 23:48:44 -070050 .maximumSize(100000)
51 .weakValues()
52 // TODO: Once the entry expires, notify blocking threads (if any).
53 .expireAfterWrite(10, TimeUnit.MINUTES)
54 .build();
55 private final GenericKeyedObjectPool<Endpoint, Channel> channels
56 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -070057
Madan Jampani5e83f332014-10-20 15:35:09 -070058 // TODO: make this configurable.
59 private void initEventLoopGroup() {
60 try {
61 workerGroup = new EpollEventLoopGroup();
62 channelClass = EpollSocketChannel.class;
63 } catch (Throwable t) {
64 workerGroup = new NioEventLoopGroup();
65 channelClass = NioSocketChannel.class;
66 }
67 }
68
Madan Jampaniab6d3112014-10-02 16:30:14 -070069 public NettyMessagingService() {
70 // TODO: Default port should be configurable.
71 this(8080);
72 }
73
74 // FIXME: Constructor should not throw exceptions.
75 public NettyMessagingService(int port) {
76 this.port = port;
77 try {
78 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
79 } catch (UnknownHostException e) {
80 // bailing out.
81 throw new RuntimeException(e);
82 }
83 }
84
85 public void activate() throws Exception {
Madan Jampani86ed0552014-10-03 16:45:42 -070086 channels.setTestOnBorrow(true);
87 channels.setTestOnReturn(true);
Madan Jampani5e83f332014-10-20 15:35:09 -070088 initEventLoopGroup();
Madan Jampaniab6d3112014-10-02 16:30:14 -070089 startAcceptingConnections();
90 }
91
92 public void deactivate() throws Exception {
93 channels.close();
94 bossGroup.shutdownGracefully();
95 workerGroup.shutdownGracefully();
96 }
97
98 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070099 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700100 InternalMessage message = new InternalMessage.Builder(this)
101 .withId(RandomUtils.nextLong())
102 .withSender(localEp)
103 .withType(type)
104 .withPayload(payload)
105 .build();
106 sendAsync(ep, message);
107 }
108
109 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
110 Channel channel = null;
111 try {
Madan Jampani86ed0552014-10-03 16:45:42 -0700112 try {
113 channel = channels.borrowObject(ep);
114 channel.eventLoop().execute(new WriteTask(channel, message));
115 } finally {
116 channels.returnObject(ep, channel);
117 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700118 } catch (Exception e) {
119 throw new IOException(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700120 }
121 }
122
123 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700124 public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
Madan Jampaniab6d3112014-10-02 16:30:14 -0700125 throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700126 AsyncResponse futureResponse = new AsyncResponse();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700127 Long messageId = RandomUtils.nextLong();
128 responseFutures.put(messageId, futureResponse);
129 InternalMessage message = new InternalMessage.Builder(this)
130 .withId(messageId)
131 .withSender(localEp)
132 .withType(type)
133 .withPayload(payload)
134 .build();
135 sendAsync(ep, message);
136 return futureResponse;
137 }
138
139 @Override
140 public void registerHandler(String type, MessageHandler handler) {
141 // TODO: Is this the right semantics for handler registration?
142 handlers.putIfAbsent(type, handler);
143 }
144
145 public void unregisterHandler(String type) {
146 handlers.remove(type);
147 }
148
149 private MessageHandler getMessageHandler(String type) {
150 return handlers.get(type);
151 }
152
153 private void startAcceptingConnections() throws InterruptedException {
154 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700155 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700156 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
157 // TODO: Need JVM options to configure PooledByteBufAllocator.
Madan Jampaniab6d3112014-10-02 16:30:14 -0700158 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
159 b.group(bossGroup, workerGroup)
160 .channel(NioServerSocketChannel.class)
161 .childHandler(new OnosCommunicationChannelInitializer())
162 .option(ChannelOption.SO_BACKLOG, 128)
163 .childOption(ChannelOption.SO_KEEPALIVE, true);
164
165 // Bind and start to accept incoming connections.
166 b.bind(port).sync();
167 }
168
169 private class OnosCommunicationChannelFactory
170 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
171
172 @Override
173 public void activateObject(Endpoint endpoint, Channel channel)
174 throws Exception {
175 }
176
177 @Override
178 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
179 channel.close();
180 }
181
182 @Override
183 public Channel makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700184 Bootstrap bootstrap = new Bootstrap();
185 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
186 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
187 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
188 bootstrap.group(workerGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700189 // TODO: Make this faster:
190 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampani5e83f332014-10-20 15:35:09 -0700191 bootstrap.channel(channelClass);
Madan Jampaniddf76222014-10-04 23:48:44 -0700192 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
193 bootstrap.handler(new OnosCommunicationChannelInitializer());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700194 // Start the client.
Madan Jampaniddf76222014-10-04 23:48:44 -0700195 ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700196 return f.channel();
197 }
198
199 @Override
200 public void passivateObject(Endpoint ep, Channel channel)
201 throws Exception {
202 }
203
204 @Override
205 public boolean validateObject(Endpoint ep, Channel channel) {
206 return channel.isOpen();
207 }
208 }
209
210 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
211
Madan Jampaniddf76222014-10-04 23:48:44 -0700212 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Madan Jampani53e44e62014-10-07 12:39:51 -0700213 private final ChannelHandler encoder = new MessageEncoder();
Madan Jampaniddf76222014-10-04 23:48:44 -0700214
Madan Jampaniab6d3112014-10-02 16:30:14 -0700215 @Override
216 protected void initChannel(SocketChannel channel) throws Exception {
217 channel.pipeline()
Madan Jampaniddf76222014-10-04 23:48:44 -0700218 .addLast("encoder", encoder)
Madan Jampani53e44e62014-10-07 12:39:51 -0700219 .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
Madan Jampaniddf76222014-10-04 23:48:44 -0700220 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700221 }
222 }
223
224 private class WriteTask implements Runnable {
225
Madan Jampani86ed0552014-10-03 16:45:42 -0700226 private final InternalMessage message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700227 private final Channel channel;
228
Madan Jampani86ed0552014-10-03 16:45:42 -0700229 public WriteTask(Channel channel, InternalMessage message) {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700230 this.channel = channel;
Madan Jampani86ed0552014-10-03 16:45:42 -0700231 this.message = message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700232 }
233
234 @Override
235 public void run() {
Madan Jampaniddf76222014-10-04 23:48:44 -0700236 channel.writeAndFlush(message, channel.voidPromise());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700237 }
238 }
239
Madan Jampaniddf76222014-10-04 23:48:44 -0700240 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700241 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
242
243 @Override
244 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
245 String type = message.type();
246 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
247 try {
Madan Jampani53e44e62014-10-07 12:39:51 -0700248 AsyncResponse futureResponse =
Madan Jampaniab6d3112014-10-02 16:30:14 -0700249 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
250 if (futureResponse != null) {
251 futureResponse.setResponse(message.payload());
Madan Jampanif1d425a2014-10-07 09:52:36 -0700252 } else {
253 log.warn("Received a reply. But was unable to locate the request handle");
Madan Jampaniab6d3112014-10-02 16:30:14 -0700254 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700255 } finally {
256 NettyMessagingService.this.responseFutures.invalidate(message.id());
257 }
258 return;
259 }
260 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
261 handler.handle(message);
262 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700263
Madan Jampani86ed0552014-10-03 16:45:42 -0700264 @Override
265 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700266 log.error("Exception inside channel handling pipeline.", cause);
Madan Jampani86ed0552014-10-03 16:45:42 -0700267 context.close();
268 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700269 }
270}