blob: 4c32164d15392f573bbaaea3a1f6f00bf623bce6 [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070014import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070015import io.netty.channel.ChannelHandlerContext;
16import io.netty.channel.ChannelInitializer;
17import io.netty.channel.ChannelOption;
18import io.netty.channel.EventLoopGroup;
19import io.netty.channel.SimpleChannelInboundHandler;
20import io.netty.channel.nio.NioEventLoopGroup;
21import io.netty.channel.socket.SocketChannel;
22import io.netty.channel.socket.nio.NioServerSocketChannel;
23import io.netty.channel.socket.nio.NioSocketChannel;
24
25import org.apache.commons.lang.math.RandomUtils;
Madan Jampaniab6d3112014-10-02 16:30:14 -070026import org.apache.commons.pool.KeyedPoolableObjectFactory;
27import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import com.google.common.cache.Cache;
32import com.google.common.cache.CacheBuilder;
33
34/**
35 * A Netty based implementation of MessagingService.
36 */
37public class NettyMessagingService implements MessagingService {
38
39 private final Logger log = LoggerFactory.getLogger(getClass());
40
Madan Jampaniab6d3112014-10-02 16:30:14 -070041 private final int port;
Madan Jampaniddf76222014-10-04 23:48:44 -070042 private final Endpoint localEp;
Madan Jampaniab6d3112014-10-02 16:30:14 -070043 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
44 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
45 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
Madan Jampaniddf76222014-10-04 23:48:44 -070046 private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder()
47 .maximumSize(100000)
48 .weakValues()
49 // TODO: Once the entry expires, notify blocking threads (if any).
50 .expireAfterWrite(10, TimeUnit.MINUTES)
51 .build();
52 private final GenericKeyedObjectPool<Endpoint, Channel> channels
53 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -070054
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070055 protected PayloadSerializer payloadSerializer;
Madan Jampaniab6d3112014-10-02 16:30:14 -070056
57 public NettyMessagingService() {
58 // TODO: Default port should be configurable.
59 this(8080);
60 }
61
62 // FIXME: Constructor should not throw exceptions.
63 public NettyMessagingService(int port) {
64 this.port = port;
65 try {
66 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
67 } catch (UnknownHostException e) {
68 // bailing out.
69 throw new RuntimeException(e);
70 }
71 }
72
73 public void activate() throws Exception {
Madan Jampani86ed0552014-10-03 16:45:42 -070074 channels.setTestOnBorrow(true);
75 channels.setTestOnReturn(true);
Madan Jampaniab6d3112014-10-02 16:30:14 -070076 startAcceptingConnections();
77 }
78
79 public void deactivate() throws Exception {
80 channels.close();
81 bossGroup.shutdownGracefully();
82 workerGroup.shutdownGracefully();
83 }
84
85 @Override
86 public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
87 InternalMessage message = new InternalMessage.Builder(this)
88 .withId(RandomUtils.nextLong())
89 .withSender(localEp)
90 .withType(type)
91 .withPayload(payload)
92 .build();
93 sendAsync(ep, message);
94 }
95
96 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
97 Channel channel = null;
98 try {
Madan Jampani86ed0552014-10-03 16:45:42 -070099 try {
100 channel = channels.borrowObject(ep);
101 channel.eventLoop().execute(new WriteTask(channel, message));
102 } finally {
103 channels.returnObject(ep, channel);
104 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700105 } catch (Exception e) {
106 throw new IOException(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700107 }
108 }
109
110 @Override
111 public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
112 throws IOException {
113 AsyncResponse<T> futureResponse = new AsyncResponse<T>();
114 Long messageId = RandomUtils.nextLong();
115 responseFutures.put(messageId, futureResponse);
116 InternalMessage message = new InternalMessage.Builder(this)
117 .withId(messageId)
118 .withSender(localEp)
119 .withType(type)
120 .withPayload(payload)
121 .build();
122 sendAsync(ep, message);
123 return futureResponse;
124 }
125
126 @Override
127 public void registerHandler(String type, MessageHandler handler) {
128 // TODO: Is this the right semantics for handler registration?
129 handlers.putIfAbsent(type, handler);
130 }
131
132 public void unregisterHandler(String type) {
133 handlers.remove(type);
134 }
135
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -0700136 @Override
137 public void setPayloadSerializer(PayloadSerializer payloadSerializer) {
138 this.payloadSerializer = payloadSerializer;
Madan Jampani938aa432014-10-04 17:37:23 -0700139 }
140
Madan Jampaniab6d3112014-10-02 16:30:14 -0700141 private MessageHandler getMessageHandler(String type) {
142 return handlers.get(type);
143 }
144
145 private void startAcceptingConnections() throws InterruptedException {
146 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700147 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700148 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
149 // TODO: Need JVM options to configure PooledByteBufAllocator.
Madan Jampaniab6d3112014-10-02 16:30:14 -0700150 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
151 b.group(bossGroup, workerGroup)
152 .channel(NioServerSocketChannel.class)
153 .childHandler(new OnosCommunicationChannelInitializer())
154 .option(ChannelOption.SO_BACKLOG, 128)
155 .childOption(ChannelOption.SO_KEEPALIVE, true);
156
157 // Bind and start to accept incoming connections.
158 b.bind(port).sync();
159 }
160
161 private class OnosCommunicationChannelFactory
162 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
163
164 @Override
165 public void activateObject(Endpoint endpoint, Channel channel)
166 throws Exception {
167 }
168
169 @Override
170 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
171 channel.close();
172 }
173
174 @Override
175 public Channel makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700176 Bootstrap bootstrap = new Bootstrap();
177 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
178 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
179 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
180 bootstrap.group(workerGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700181 // TODO: Make this faster:
182 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampaniddf76222014-10-04 23:48:44 -0700183 bootstrap.channel(NioSocketChannel.class);
184 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
185 bootstrap.handler(new OnosCommunicationChannelInitializer());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700186 // Start the client.
Madan Jampaniddf76222014-10-04 23:48:44 -0700187 ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700188 return f.channel();
189 }
190
191 @Override
192 public void passivateObject(Endpoint ep, Channel channel)
193 throws Exception {
194 }
195
196 @Override
197 public boolean validateObject(Endpoint ep, Channel channel) {
198 return channel.isOpen();
199 }
200 }
201
202 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
203
Madan Jampaniddf76222014-10-04 23:48:44 -0700204 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -0700205 private final ChannelHandler encoder = new MessageEncoder(payloadSerializer);
Madan Jampaniddf76222014-10-04 23:48:44 -0700206
Madan Jampaniab6d3112014-10-02 16:30:14 -0700207 @Override
208 protected void initChannel(SocketChannel channel) throws Exception {
209 channel.pipeline()
Madan Jampaniddf76222014-10-04 23:48:44 -0700210 .addLast("encoder", encoder)
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -0700211 .addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer))
Madan Jampaniddf76222014-10-04 23:48:44 -0700212 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700213 }
214 }
215
216 private class WriteTask implements Runnable {
217
Madan Jampani86ed0552014-10-03 16:45:42 -0700218 private final InternalMessage message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700219 private final Channel channel;
220
Madan Jampani86ed0552014-10-03 16:45:42 -0700221 public WriteTask(Channel channel, InternalMessage message) {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700222 this.channel = channel;
Madan Jampani86ed0552014-10-03 16:45:42 -0700223 this.message = message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700224 }
225
226 @Override
227 public void run() {
Madan Jampaniddf76222014-10-04 23:48:44 -0700228 channel.writeAndFlush(message, channel.voidPromise());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700229 }
230 }
231
Madan Jampaniddf76222014-10-04 23:48:44 -0700232 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700233 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
234
235 @Override
236 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
237 String type = message.type();
238 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
239 try {
240 AsyncResponse<?> futureResponse =
241 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
242 if (futureResponse != null) {
243 futureResponse.setResponse(message.payload());
244 }
245 log.warn("Received a reply. But was unable to locate the request handle");
246 } finally {
247 NettyMessagingService.this.responseFutures.invalidate(message.id());
248 }
249 return;
250 }
251 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
252 handler.handle(message);
253 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700254
Madan Jampani86ed0552014-10-03 16:45:42 -0700255 @Override
256 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
257 context.close();
258 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700259 }
260}