blob: 7c875efb0d3b93fb73d49cbf7ebb590a79cd69da [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
Madan Jampaniddf76222014-10-04 23:48:44 -070014import io.netty.channel.ChannelHandler;
Madan Jampaniab6d3112014-10-02 16:30:14 -070015import io.netty.channel.ChannelHandlerContext;
16import io.netty.channel.ChannelInitializer;
17import io.netty.channel.ChannelOption;
18import io.netty.channel.EventLoopGroup;
19import io.netty.channel.SimpleChannelInboundHandler;
20import io.netty.channel.nio.NioEventLoopGroup;
21import io.netty.channel.socket.SocketChannel;
22import io.netty.channel.socket.nio.NioServerSocketChannel;
23import io.netty.channel.socket.nio.NioSocketChannel;
24
25import org.apache.commons.lang.math.RandomUtils;
Madan Jampaniab6d3112014-10-02 16:30:14 -070026import org.apache.commons.pool.KeyedPoolableObjectFactory;
27import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import com.google.common.cache.Cache;
32import com.google.common.cache.CacheBuilder;
33
34/**
35 * A Netty based implementation of MessagingService.
36 */
37public class NettyMessagingService implements MessagingService {
38
39 private final Logger log = LoggerFactory.getLogger(getClass());
40
Madan Jampaniab6d3112014-10-02 16:30:14 -070041 private final int port;
Madan Jampaniddf76222014-10-04 23:48:44 -070042 private final Endpoint localEp;
Madan Jampaniab6d3112014-10-02 16:30:14 -070043 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
Madan Jampani5e83f332014-10-20 15:35:09 -070044 private EventLoopGroup workerGroup;
45 private Class<? extends Channel> channelClass;
Madan Jampaniab6d3112014-10-02 16:30:14 -070046 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
Madan Jampani53e44e62014-10-07 12:39:51 -070047 private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
Madan Jampaniddf76222014-10-04 23:48:44 -070048 .maximumSize(100000)
49 .weakValues()
50 // TODO: Once the entry expires, notify blocking threads (if any).
51 .expireAfterWrite(10, TimeUnit.MINUTES)
52 .build();
53 private final GenericKeyedObjectPool<Endpoint, Channel> channels
54 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
Madan Jampaniab6d3112014-10-02 16:30:14 -070055
Madan Jampani5e83f332014-10-20 15:35:09 -070056 // TODO: make this configurable.
57 private void initEventLoopGroup() {
Madan Jampanie602bb42014-10-20 16:03:41 -070058 workerGroup = new NioEventLoopGroup();
59 channelClass = NioSocketChannel.class;
Madan Jampani5e83f332014-10-20 15:35:09 -070060 }
61
Madan Jampaniab6d3112014-10-02 16:30:14 -070062 public NettyMessagingService() {
63 // TODO: Default port should be configurable.
64 this(8080);
65 }
66
67 // FIXME: Constructor should not throw exceptions.
68 public NettyMessagingService(int port) {
69 this.port = port;
70 try {
71 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
72 } catch (UnknownHostException e) {
73 // bailing out.
74 throw new RuntimeException(e);
75 }
76 }
77
78 public void activate() throws Exception {
Madan Jampani86ed0552014-10-03 16:45:42 -070079 channels.setTestOnBorrow(true);
80 channels.setTestOnReturn(true);
Madan Jampani5e83f332014-10-20 15:35:09 -070081 initEventLoopGroup();
Madan Jampaniab6d3112014-10-02 16:30:14 -070082 startAcceptingConnections();
83 }
84
85 public void deactivate() throws Exception {
86 channels.close();
87 bossGroup.shutdownGracefully();
88 workerGroup.shutdownGracefully();
89 }
90
91 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070092 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
Madan Jampaniab6d3112014-10-02 16:30:14 -070093 InternalMessage message = new InternalMessage.Builder(this)
94 .withId(RandomUtils.nextLong())
95 .withSender(localEp)
96 .withType(type)
97 .withPayload(payload)
98 .build();
99 sendAsync(ep, message);
100 }
101
102 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
103 Channel channel = null;
104 try {
Madan Jampani86ed0552014-10-03 16:45:42 -0700105 try {
106 channel = channels.borrowObject(ep);
107 channel.eventLoop().execute(new WriteTask(channel, message));
108 } finally {
109 channels.returnObject(ep, channel);
110 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700111 } catch (Exception e) {
112 throw new IOException(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700113 }
114 }
115
116 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700117 public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
Madan Jampaniab6d3112014-10-02 16:30:14 -0700118 throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700119 AsyncResponse futureResponse = new AsyncResponse();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700120 Long messageId = RandomUtils.nextLong();
121 responseFutures.put(messageId, futureResponse);
122 InternalMessage message = new InternalMessage.Builder(this)
123 .withId(messageId)
124 .withSender(localEp)
125 .withType(type)
126 .withPayload(payload)
127 .build();
128 sendAsync(ep, message);
129 return futureResponse;
130 }
131
132 @Override
133 public void registerHandler(String type, MessageHandler handler) {
134 // TODO: Is this the right semantics for handler registration?
135 handlers.putIfAbsent(type, handler);
136 }
137
138 public void unregisterHandler(String type) {
139 handlers.remove(type);
140 }
141
142 private MessageHandler getMessageHandler(String type) {
143 return handlers.get(type);
144 }
145
146 private void startAcceptingConnections() throws InterruptedException {
147 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700148 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
Madan Jampaniddf76222014-10-04 23:48:44 -0700149 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
150 // TODO: Need JVM options to configure PooledByteBufAllocator.
Madan Jampaniab6d3112014-10-02 16:30:14 -0700151 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
152 b.group(bossGroup, workerGroup)
153 .channel(NioServerSocketChannel.class)
154 .childHandler(new OnosCommunicationChannelInitializer())
155 .option(ChannelOption.SO_BACKLOG, 128)
156 .childOption(ChannelOption.SO_KEEPALIVE, true);
157
158 // Bind and start to accept incoming connections.
159 b.bind(port).sync();
160 }
161
162 private class OnosCommunicationChannelFactory
163 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
164
165 @Override
166 public void activateObject(Endpoint endpoint, Channel channel)
167 throws Exception {
168 }
169
170 @Override
171 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
172 channel.close();
173 }
174
175 @Override
176 public Channel makeObject(Endpoint ep) throws Exception {
Madan Jampaniddf76222014-10-04 23:48:44 -0700177 Bootstrap bootstrap = new Bootstrap();
178 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
179 bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
180 bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
181 bootstrap.group(workerGroup);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700182 // TODO: Make this faster:
183 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
Madan Jampani5e83f332014-10-20 15:35:09 -0700184 bootstrap.channel(channelClass);
Madan Jampaniddf76222014-10-04 23:48:44 -0700185 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
186 bootstrap.handler(new OnosCommunicationChannelInitializer());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700187 // Start the client.
Madan Jampaniddf76222014-10-04 23:48:44 -0700188 ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
Madan Jampaniab6d3112014-10-02 16:30:14 -0700189 return f.channel();
190 }
191
192 @Override
193 public void passivateObject(Endpoint ep, Channel channel)
194 throws Exception {
195 }
196
197 @Override
198 public boolean validateObject(Endpoint ep, Channel channel) {
199 return channel.isOpen();
200 }
201 }
202
203 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
204
Madan Jampaniddf76222014-10-04 23:48:44 -0700205 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
Madan Jampani53e44e62014-10-07 12:39:51 -0700206 private final ChannelHandler encoder = new MessageEncoder();
Madan Jampaniddf76222014-10-04 23:48:44 -0700207
Madan Jampaniab6d3112014-10-02 16:30:14 -0700208 @Override
209 protected void initChannel(SocketChannel channel) throws Exception {
210 channel.pipeline()
Madan Jampaniddf76222014-10-04 23:48:44 -0700211 .addLast("encoder", encoder)
Madan Jampani53e44e62014-10-07 12:39:51 -0700212 .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
Madan Jampaniddf76222014-10-04 23:48:44 -0700213 .addLast("handler", dispatcher);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700214 }
215 }
216
217 private class WriteTask implements Runnable {
218
Madan Jampani86ed0552014-10-03 16:45:42 -0700219 private final InternalMessage message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700220 private final Channel channel;
221
Madan Jampani86ed0552014-10-03 16:45:42 -0700222 public WriteTask(Channel channel, InternalMessage message) {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700223 this.channel = channel;
Madan Jampani86ed0552014-10-03 16:45:42 -0700224 this.message = message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700225 }
226
227 @Override
228 public void run() {
Madan Jampaniddf76222014-10-04 23:48:44 -0700229 channel.writeAndFlush(message, channel.voidPromise());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700230 }
231 }
232
Madan Jampaniddf76222014-10-04 23:48:44 -0700233 @ChannelHandler.Sharable
Madan Jampaniab6d3112014-10-02 16:30:14 -0700234 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
235
236 @Override
237 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
238 String type = message.type();
239 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
240 try {
Madan Jampani53e44e62014-10-07 12:39:51 -0700241 AsyncResponse futureResponse =
Madan Jampaniab6d3112014-10-02 16:30:14 -0700242 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
243 if (futureResponse != null) {
244 futureResponse.setResponse(message.payload());
Madan Jampanif1d425a2014-10-07 09:52:36 -0700245 } else {
246 log.warn("Received a reply. But was unable to locate the request handle");
Madan Jampaniab6d3112014-10-02 16:30:14 -0700247 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700248 } finally {
249 NettyMessagingService.this.responseFutures.invalidate(message.id());
250 }
251 return;
252 }
253 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
254 handler.handle(message);
255 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700256
Madan Jampani86ed0552014-10-03 16:45:42 -0700257 @Override
258 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
Madan Jampani29e5dfd2014-10-07 17:26:25 -0700259 log.error("Exception inside channel handling pipeline.", cause);
Madan Jampani86ed0552014-10-03 16:45:42 -0700260 context.close();
261 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700262 }
263}