blob: 5a51ad438faa6394c0fe6b90f2004da7a3e7a317 [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070014import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070015import io.netty.channel.ChannelHandlerContext;
16import io.netty.channel.ChannelInitializer;
17import io.netty.channel.ChannelOption;
18import io.netty.channel.EventLoopGroup;
19import io.netty.channel.SimpleChannelInboundHandler;
20import io.netty.channel.nio.NioEventLoopGroup;
21import io.netty.channel.socket.SocketChannel;
22import io.netty.channel.socket.nio.NioServerSocketChannel;
23import io.netty.channel.socket.nio.NioSocketChannel;
24
25import org.apache.commons.lang.math.RandomUtils;
Madan Jampaniab6d3112014-10-02 16:30:14 -070026import org.apache.commons.pool.KeyedPoolableObjectFactory;
27import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import com.google.common.cache.Cache;
32import com.google.common.cache.CacheBuilder;
33
34/**
35 * A Netty based implementation of MessagingService.
36 */
37public class NettyMessagingService implements MessagingService {
38
39 private final Logger log = LoggerFactory.getLogger(getClass());
40
Madan Jampaniab6d3112014-10-02 16:30:14 -070041 private final int port;
Madan Jampaniddf76222014-10-04 23:48:44 -070042 private final Endpoint localEp;
Madan Jampaniab6d3112014-10-02 16:30:14 -070043 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
44 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
45 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
Madan Jampani53e44e62014-10-07 12:39:51 -070046 private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
Madan Jampaniddf76222014-10-04 23:48:44 -070047 .maximumSize(100000)
48 .weakValues()
49 // TODO: Once the entry expires, notify blocking threads (if any).
50 .expireAfterWrite(10, TimeUnit.MINUTES)
51 .build();
52 private final GenericKeyedObjectPool<Endpoint, Channel> channels
53 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -070054
Madan Jampaniab6d3112014-10-02 16:30:14 -070055 public NettyMessagingService() {
56 // TODO: Default port should be configurable.
57 this(8080);
58 }
59
60 // FIXME: Constructor should not throw exceptions.
61 public NettyMessagingService(int port) {
62 this.port = port;
63 try {
64 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
65 } catch (UnknownHostException e) {
66 // bailing out.
67 throw new RuntimeException(e);
68 }
69 }
70
71 public void activate() throws Exception {
Madan Jampani86ed0552014-10-03 16:45:42 -070072 channels.setTestOnBorrow(true);
73 channels.setTestOnReturn(true);
Madan Jampaniab6d3112014-10-02 16:30:14 -070074 startAcceptingConnections();
75 }
76
77 public void deactivate() throws Exception {
78 channels.close();
79 bossGroup.shutdownGracefully();
80 workerGroup.shutdownGracefully();
81 }
82
83 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070084 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
Madan Jampaniab6d3112014-10-02 16:30:14 -070085 InternalMessage message = new InternalMessage.Builder(this)
86 .withId(RandomUtils.nextLong())
87 .withSender(localEp)
88 .withType(type)
89 .withPayload(payload)
90 .build();
91 sendAsync(ep, message);
92 }
93
94 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
95 Channel channel = null;
96 try {
Madan Jampani86ed0552014-10-03 16:45:42 -070097 try {
98 channel = channels.borrowObject(ep);
99 channel.eventLoop().execute(new WriteTask(channel, message));
100 } finally {
101 channels.returnObject(ep, channel);
102 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700103 } catch (Exception e) {
104 throw new IOException(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700105 }
106 }
107
108 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700109 public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
Madan Jampaniab6d3112014-10-02 16:30:14 -0700110 throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700111 AsyncResponse futureResponse = new AsyncResponse();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700112 Long messageId = RandomUtils.nextLong();
113 responseFutures.put(messageId, futureResponse);
114 InternalMessage message = new InternalMessage.Builder(this)
115 .withId(messageId)
116 .withSender(localEp)
117 .withType(type)
118 .withPayload(payload)
119 .build();
120 sendAsync(ep, message);
121 return futureResponse;
122 }
123
124 @Override
125 public void registerHandler(String type, MessageHandler handler) {
126 // TODO: Is this the right semantics for handler registration?
127 handlers.putIfAbsent(type, handler);
128 }
129
130 public void unregisterHandler(String type) {
131 handlers.remove(type);
132 }
133
134 private MessageHandler getMessageHandler(String type) {
135 return handlers.get(type);
136 }
137
138 private void startAcceptingConnections() throws InterruptedException {
139 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700140 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700141 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
142 // TODO: Need JVM options to configure PooledByteBufAllocator.
Madan Jampaniab6d3112014-10-02 16:30:14 -0700143 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
144 b.group(bossGroup, workerGroup)
145 .channel(NioServerSocketChannel.class)
146 .childHandler(new OnosCommunicationChannelInitializer())
147 .option(ChannelOption.SO_BACKLOG, 128)
148 .childOption(ChannelOption.SO_KEEPALIVE, true);
149
150 // Bind and start to accept incoming connections.
151 b.bind(port).sync();
152 }
153
154 private class OnosCommunicationChannelFactory
155 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
156
157 @Override
158 public void activateObject(Endpoint endpoint, Channel channel)
159 throws Exception {
160 }
161
162 @Override
163 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
164 channel.close();
165 }
166
167 @Override
168 public Channel makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700169 Bootstrap bootstrap = new Bootstrap();
170 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
171 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
172 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
173 bootstrap.group(workerGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700174 // TODO: Make this faster:
175 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampaniddf76222014-10-04 23:48:44 -0700176 bootstrap.channel(NioSocketChannel.class);
177 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
178 bootstrap.handler(new OnosCommunicationChannelInitializer());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700179 // Start the client.
Madan Jampaniddf76222014-10-04 23:48:44 -0700180 ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700181 return f.channel();
182 }
183
184 @Override
185 public void passivateObject(Endpoint ep, Channel channel)
186 throws Exception {
187 }
188
189 @Override
190 public boolean validateObject(Endpoint ep, Channel channel) {
191 return channel.isOpen();
192 }
193 }
194
195 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
196
Madan Jampaniddf76222014-10-04 23:48:44 -0700197 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Madan Jampani53e44e62014-10-07 12:39:51 -0700198 private final ChannelHandler encoder = new MessageEncoder();
Madan Jampaniddf76222014-10-04 23:48:44 -0700199
Madan Jampaniab6d3112014-10-02 16:30:14 -0700200 @Override
201 protected void initChannel(SocketChannel channel) throws Exception {
202 channel.pipeline()
Madan Jampaniddf76222014-10-04 23:48:44 -0700203 .addLast("encoder", encoder)
Madan Jampani53e44e62014-10-07 12:39:51 -0700204 .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
Madan Jampaniddf76222014-10-04 23:48:44 -0700205 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700206 }
207 }
208
209 private class WriteTask implements Runnable {
210
Madan Jampani86ed0552014-10-03 16:45:42 -0700211 private final InternalMessage message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700212 private final Channel channel;
213
Madan Jampani86ed0552014-10-03 16:45:42 -0700214 public WriteTask(Channel channel, InternalMessage message) {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700215 this.channel = channel;
Madan Jampani86ed0552014-10-03 16:45:42 -0700216 this.message = message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700217 }
218
219 @Override
220 public void run() {
Madan Jampaniddf76222014-10-04 23:48:44 -0700221 channel.writeAndFlush(message, channel.voidPromise());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700222 }
223 }
224
Madan Jampaniddf76222014-10-04 23:48:44 -0700225 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700226 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
227
228 @Override
229 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
230 String type = message.type();
231 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
232 try {
Madan Jampani53e44e62014-10-07 12:39:51 -0700233 AsyncResponse futureResponse =
Madan Jampaniab6d3112014-10-02 16:30:14 -0700234 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
235 if (futureResponse != null) {
236 futureResponse.setResponse(message.payload());
Madan Jampanif1d425a2014-10-07 09:52:36 -0700237 } else {
238 log.warn("Received a reply. But was unable to locate the request handle");
Madan Jampaniab6d3112014-10-02 16:30:14 -0700239 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700240 } finally {
241 NettyMessagingService.this.responseFutures.invalidate(message.id());
242 }
243 return;
244 }
245 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
246 handler.handle(message);
247 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700248
Madan Jampani86ed0552014-10-03 16:45:42 -0700249 @Override
250 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700251 log.error("Exception inside channel handling pipeline.", cause);
Madan Jampani86ed0552014-10-03 16:45:42 -0700252 context.close();
253 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700254 }
255}