blob: f0c4861418580709f58bdbed5adf88c2adb6342a [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070014import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070015import io.netty.channel.ChannelHandlerContext;
16import io.netty.channel.ChannelInitializer;
17import io.netty.channel.ChannelOption;
18import io.netty.channel.EventLoopGroup;
19import io.netty.channel.SimpleChannelInboundHandler;
20import io.netty.channel.nio.NioEventLoopGroup;
21import io.netty.channel.socket.SocketChannel;
22import io.netty.channel.socket.nio.NioServerSocketChannel;
23import io.netty.channel.socket.nio.NioSocketChannel;
24
25import org.apache.commons.lang.math.RandomUtils;
Madan Jampaniab6d3112014-10-02 16:30:14 -070026import org.apache.commons.pool.KeyedPoolableObjectFactory;
27import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import com.google.common.cache.Cache;
32import com.google.common.cache.CacheBuilder;
33
34/**
35 * A Netty based implementation of MessagingService.
36 */
37public class NettyMessagingService implements MessagingService {
38
39 private final Logger log = LoggerFactory.getLogger(getClass());
40
Madan Jampaniab6d3112014-10-02 16:30:14 -070041 private final int port;
Madan Jampaniddf76222014-10-04 23:48:44 -070042 private final Endpoint localEp;
Madan Jampaniab6d3112014-10-02 16:30:14 -070043 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
44 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
45 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
Madan Jampaniddf76222014-10-04 23:48:44 -070046 private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder()
47 .maximumSize(100000)
48 .weakValues()
49 // TODO: Once the entry expires, notify blocking threads (if any).
50 .expireAfterWrite(10, TimeUnit.MINUTES)
51 .build();
52 private final GenericKeyedObjectPool<Endpoint, Channel> channels
53 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -070054
55 protected Serializer serializer;
56
57 public NettyMessagingService() {
58 // TODO: Default port should be configurable.
59 this(8080);
60 }
61
62 // FIXME: Constructor should not throw exceptions.
63 public NettyMessagingService(int port) {
64 this.port = port;
65 try {
66 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
67 } catch (UnknownHostException e) {
68 // bailing out.
69 throw new RuntimeException(e);
70 }
71 }
72
73 public void activate() throws Exception {
Madan Jampani86ed0552014-10-03 16:45:42 -070074 channels.setTestOnBorrow(true);
75 channels.setTestOnReturn(true);
Madan Jampaniab6d3112014-10-02 16:30:14 -070076 startAcceptingConnections();
77 }
78
79 public void deactivate() throws Exception {
80 channels.close();
81 bossGroup.shutdownGracefully();
82 workerGroup.shutdownGracefully();
83 }
84
85 @Override
86 public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
87 InternalMessage message = new InternalMessage.Builder(this)
88 .withId(RandomUtils.nextLong())
89 .withSender(localEp)
90 .withType(type)
91 .withPayload(payload)
92 .build();
93 sendAsync(ep, message);
94 }
95
96 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
97 Channel channel = null;
98 try {
Madan Jampani86ed0552014-10-03 16:45:42 -070099 try {
100 channel = channels.borrowObject(ep);
101 channel.eventLoop().execute(new WriteTask(channel, message));
102 } finally {
103 channels.returnObject(ep, channel);
104 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700105 } catch (Exception e) {
106 throw new IOException(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700107 }
108 }
109
110 @Override
111 public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
112 throws IOException {
113 AsyncResponse<T> futureResponse = new AsyncResponse<T>();
114 Long messageId = RandomUtils.nextLong();
115 responseFutures.put(messageId, futureResponse);
116 InternalMessage message = new InternalMessage.Builder(this)
117 .withId(messageId)
118 .withSender(localEp)
119 .withType(type)
120 .withPayload(payload)
121 .build();
122 sendAsync(ep, message);
123 return futureResponse;
124 }
125
126 @Override
127 public void registerHandler(String type, MessageHandler handler) {
128 // TODO: Is this the right semantics for handler registration?
129 handlers.putIfAbsent(type, handler);
130 }
131
132 public void unregisterHandler(String type) {
133 handlers.remove(type);
134 }
135
Madan Jampani938aa432014-10-04 17:37:23 -0700136 public void setSerializer(Serializer serializer) {
137 this.serializer = serializer;
138 }
139
Madan Jampaniab6d3112014-10-02 16:30:14 -0700140 private MessageHandler getMessageHandler(String type) {
141 return handlers.get(type);
142 }
143
144 private void startAcceptingConnections() throws InterruptedException {
145 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700146 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700147 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
148 // TODO: Need JVM options to configure PooledByteBufAllocator.
Madan Jampaniab6d3112014-10-02 16:30:14 -0700149 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
150 b.group(bossGroup, workerGroup)
151 .channel(NioServerSocketChannel.class)
152 .childHandler(new OnosCommunicationChannelInitializer())
153 .option(ChannelOption.SO_BACKLOG, 128)
154 .childOption(ChannelOption.SO_KEEPALIVE, true);
155
156 // Bind and start to accept incoming connections.
157 b.bind(port).sync();
158 }
159
160 private class OnosCommunicationChannelFactory
161 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
162
163 @Override
164 public void activateObject(Endpoint endpoint, Channel channel)
165 throws Exception {
166 }
167
168 @Override
169 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
170 channel.close();
171 }
172
173 @Override
174 public Channel makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700175 Bootstrap bootstrap = new Bootstrap();
176 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
177 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
178 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
179 bootstrap.group(workerGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700180 // TODO: Make this faster:
181 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampaniddf76222014-10-04 23:48:44 -0700182 bootstrap.channel(NioSocketChannel.class);
183 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
184 bootstrap.handler(new OnosCommunicationChannelInitializer());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700185 // Start the client.
Madan Jampaniddf76222014-10-04 23:48:44 -0700186 ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700187 return f.channel();
188 }
189
190 @Override
191 public void passivateObject(Endpoint ep, Channel channel)
192 throws Exception {
193 }
194
195 @Override
196 public boolean validateObject(Endpoint ep, Channel channel) {
197 return channel.isOpen();
198 }
199 }
200
201 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
202
Madan Jampaniddf76222014-10-04 23:48:44 -0700203 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
204 private final ChannelHandler encoder = new MessageEncoder(serializer);
205
Madan Jampaniab6d3112014-10-02 16:30:14 -0700206 @Override
207 protected void initChannel(SocketChannel channel) throws Exception {
208 channel.pipeline()
Madan Jampaniddf76222014-10-04 23:48:44 -0700209 .addLast("encoder", encoder)
Madan Jampani86ed0552014-10-03 16:45:42 -0700210 .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
Madan Jampaniddf76222014-10-04 23:48:44 -0700211 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700212 }
213 }
214
215 private class WriteTask implements Runnable {
216
Madan Jampani86ed0552014-10-03 16:45:42 -0700217 private final InternalMessage message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700218 private final Channel channel;
219
Madan Jampani86ed0552014-10-03 16:45:42 -0700220 public WriteTask(Channel channel, InternalMessage message) {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700221 this.channel = channel;
Madan Jampani86ed0552014-10-03 16:45:42 -0700222 this.message = message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700223 }
224
225 @Override
226 public void run() {
Madan Jampaniddf76222014-10-04 23:48:44 -0700227 channel.writeAndFlush(message, channel.voidPromise());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700228 }
229 }
230
Madan Jampaniddf76222014-10-04 23:48:44 -0700231 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700232 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
233
234 @Override
235 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
236 String type = message.type();
237 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
238 try {
239 AsyncResponse<?> futureResponse =
240 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
241 if (futureResponse != null) {
242 futureResponse.setResponse(message.payload());
243 }
244 log.warn("Received a reply. But was unable to locate the request handle");
245 } finally {
246 NettyMessagingService.this.responseFutures.invalidate(message.id());
247 }
248 return;
249 }
250 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
251 handler.handle(message);
252 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700253
Madan Jampani86ed0552014-10-03 16:45:42 -0700254 @Override
255 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
256 context.close();
257 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700258 }
259}