blob: 8a609bde21f36a56cccc805fbcda101bc862171f [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
14import io.netty.channel.ChannelHandlerContext;
15import io.netty.channel.ChannelInitializer;
16import io.netty.channel.ChannelOption;
17import io.netty.channel.EventLoopGroup;
18import io.netty.channel.SimpleChannelInboundHandler;
19import io.netty.channel.nio.NioEventLoopGroup;
20import io.netty.channel.socket.SocketChannel;
21import io.netty.channel.socket.nio.NioServerSocketChannel;
22import io.netty.channel.socket.nio.NioSocketChannel;
23
24import org.apache.commons.lang.math.RandomUtils;
Madan Jampaniab6d3112014-10-02 16:30:14 -070025import org.apache.commons.pool.KeyedPoolableObjectFactory;
26import org.apache.commons.pool.impl.GenericKeyedObjectPool;
27import org.slf4j.Logger;
28import org.slf4j.LoggerFactory;
29
30import com.google.common.cache.Cache;
31import com.google.common.cache.CacheBuilder;
32
33/**
34 * A Netty based implementation of MessagingService.
35 */
36public class NettyMessagingService implements MessagingService {
37
38 private final Logger log = LoggerFactory.getLogger(getClass());
39
Madan Jampani86ed0552014-10-03 16:45:42 -070040 private GenericKeyedObjectPool<Endpoint, Channel> channels;
41
Madan Jampaniab6d3112014-10-02 16:30:14 -070042 private final int port;
43 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
44 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
45 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
46 private Cache<Long, AsyncResponse<?>> responseFutures;
47 private final Endpoint localEp;
48
49 protected Serializer serializer;
50
51 public NettyMessagingService() {
52 // TODO: Default port should be configurable.
53 this(8080);
54 }
55
56 // FIXME: Constructor should not throw exceptions.
57 public NettyMessagingService(int port) {
58 this.port = port;
59 try {
60 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
61 } catch (UnknownHostException e) {
62 // bailing out.
63 throw new RuntimeException(e);
64 }
65 }
66
67 public void activate() throws Exception {
Madan Jampani86ed0552014-10-03 16:45:42 -070068 channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
69 channels.setTestOnBorrow(true);
70 channels.setTestOnReturn(true);
Madan Jampaniab6d3112014-10-02 16:30:14 -070071 responseFutures = CacheBuilder.newBuilder()
72 .maximumSize(100000)
73 .weakValues()
74 // TODO: Once the entry expires, notify blocking threads (if any).
75 .expireAfterWrite(10, TimeUnit.MINUTES)
76 .build();
77 startAcceptingConnections();
78 }
79
80 public void deactivate() throws Exception {
81 channels.close();
82 bossGroup.shutdownGracefully();
83 workerGroup.shutdownGracefully();
84 }
85
86 @Override
87 public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
88 InternalMessage message = new InternalMessage.Builder(this)
89 .withId(RandomUtils.nextLong())
90 .withSender(localEp)
91 .withType(type)
92 .withPayload(payload)
93 .build();
94 sendAsync(ep, message);
95 }
96
97 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
98 Channel channel = null;
99 try {
Madan Jampani86ed0552014-10-03 16:45:42 -0700100 try {
101 channel = channels.borrowObject(ep);
102 channel.eventLoop().execute(new WriteTask(channel, message));
103 } finally {
104 channels.returnObject(ep, channel);
105 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700106 } catch (Exception e) {
107 throw new IOException(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700108 }
109 }
110
111 @Override
112 public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
113 throws IOException {
114 AsyncResponse<T> futureResponse = new AsyncResponse<T>();
115 Long messageId = RandomUtils.nextLong();
116 responseFutures.put(messageId, futureResponse);
117 InternalMessage message = new InternalMessage.Builder(this)
118 .withId(messageId)
119 .withSender(localEp)
120 .withType(type)
121 .withPayload(payload)
122 .build();
123 sendAsync(ep, message);
124 return futureResponse;
125 }
126
127 @Override
128 public void registerHandler(String type, MessageHandler handler) {
129 // TODO: Is this the right semantics for handler registration?
130 handlers.putIfAbsent(type, handler);
131 }
132
133 public void unregisterHandler(String type) {
134 handlers.remove(type);
135 }
136
Madan Jampani938aa432014-10-04 17:37:23 -0700137 public void setSerializer(Serializer serializer) {
138 this.serializer = serializer;
139 }
140
Madan Jampaniab6d3112014-10-02 16:30:14 -0700141 private MessageHandler getMessageHandler(String type) {
142 return handlers.get(type);
143 }
144
145 private void startAcceptingConnections() throws InterruptedException {
146 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700147 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
148 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700149 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
150 b.group(bossGroup, workerGroup)
151 .channel(NioServerSocketChannel.class)
152 .childHandler(new OnosCommunicationChannelInitializer())
153 .option(ChannelOption.SO_BACKLOG, 128)
154 .childOption(ChannelOption.SO_KEEPALIVE, true);
155
156 // Bind and start to accept incoming connections.
157 b.bind(port).sync();
158 }
159
160 private class OnosCommunicationChannelFactory
161 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
162
163 @Override
164 public void activateObject(Endpoint endpoint, Channel channel)
165 throws Exception {
166 }
167
168 @Override
169 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
170 channel.close();
171 }
172
173 @Override
174 public Channel makeObject(Endpoint ep) throws Exception {
175 Bootstrap b = new Bootstrap();
176 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Madan Jampani86ed0552014-10-03 16:45:42 -0700177 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
178 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700179 b.group(workerGroup);
180 // TODO: Make this faster:
181 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
182 b.channel(NioSocketChannel.class);
183 b.option(ChannelOption.SO_KEEPALIVE, true);
184 b.handler(new OnosCommunicationChannelInitializer());
185
186 // Start the client.
187 ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
188 return f.channel();
189 }
190
191 @Override
192 public void passivateObject(Endpoint ep, Channel channel)
193 throws Exception {
194 }
195
196 @Override
197 public boolean validateObject(Endpoint ep, Channel channel) {
198 return channel.isOpen();
199 }
200 }
201
202 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
203
204 @Override
205 protected void initChannel(SocketChannel channel) throws Exception {
206 channel.pipeline()
Madan Jampani86ed0552014-10-03 16:45:42 -0700207 .addLast("encoder", new MessageEncoder(serializer))
208 .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
209 .addLast("handler", new InboundMessageDispatcher());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700210 }
211 }
212
213 private class WriteTask implements Runnable {
214
Madan Jampani86ed0552014-10-03 16:45:42 -0700215 private final InternalMessage message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700216 private final Channel channel;
217
Madan Jampani86ed0552014-10-03 16:45:42 -0700218 public WriteTask(Channel channel, InternalMessage message) {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700219 this.channel = channel;
Madan Jampani86ed0552014-10-03 16:45:42 -0700220 this.message = message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700221 }
222
223 @Override
224 public void run() {
225 channel.writeAndFlush(message);
226 }
227 }
228
229 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
230
231 @Override
232 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
233 String type = message.type();
234 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
235 try {
236 AsyncResponse<?> futureResponse =
237 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
238 if (futureResponse != null) {
239 futureResponse.setResponse(message.payload());
240 }
241 log.warn("Received a reply. But was unable to locate the request handle");
242 } finally {
243 NettyMessagingService.this.responseFutures.invalidate(message.id());
244 }
245 return;
246 }
247 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
248 handler.handle(message);
249 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700250
251
252 @Override
253 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
254 context.close();
255 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700256 }
257}