blob: 4a755cc8a93eabe6ad677b3b0d4178beb2611294 [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
14import io.netty.channel.ChannelHandlerContext;
15import io.netty.channel.ChannelInitializer;
16import io.netty.channel.ChannelOption;
17import io.netty.channel.EventLoopGroup;
18import io.netty.channel.SimpleChannelInboundHandler;
19import io.netty.channel.nio.NioEventLoopGroup;
20import io.netty.channel.socket.SocketChannel;
21import io.netty.channel.socket.nio.NioServerSocketChannel;
22import io.netty.channel.socket.nio.NioSocketChannel;
23
24import org.apache.commons.lang.math.RandomUtils;
Madan Jampaniab6d3112014-10-02 16:30:14 -070025import org.apache.commons.pool.KeyedPoolableObjectFactory;
26import org.apache.commons.pool.impl.GenericKeyedObjectPool;
27import org.slf4j.Logger;
28import org.slf4j.LoggerFactory;
29
30import com.google.common.cache.Cache;
31import com.google.common.cache.CacheBuilder;
32
33/**
34 * A Netty based implementation of MessagingService.
35 */
36public class NettyMessagingService implements MessagingService {
37
38 private final Logger log = LoggerFactory.getLogger(getClass());
39
Madan Jampani86ed0552014-10-03 16:45:42 -070040 private GenericKeyedObjectPool<Endpoint, Channel> channels;
41
Madan Jampaniab6d3112014-10-02 16:30:14 -070042 private final int port;
43 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
44 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
45 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
46 private Cache<Long, AsyncResponse<?>> responseFutures;
47 private final Endpoint localEp;
48
49 protected Serializer serializer;
50
51 public NettyMessagingService() {
52 // TODO: Default port should be configurable.
53 this(8080);
54 }
55
56 // FIXME: Constructor should not throw exceptions.
57 public NettyMessagingService(int port) {
58 this.port = port;
59 try {
60 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
61 } catch (UnknownHostException e) {
62 // bailing out.
63 throw new RuntimeException(e);
64 }
65 }
66
67 public void activate() throws Exception {
Madan Jampani86ed0552014-10-03 16:45:42 -070068 channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
69 channels.setTestOnBorrow(true);
70 channels.setTestOnReturn(true);
Madan Jampaniab6d3112014-10-02 16:30:14 -070071 responseFutures = CacheBuilder.newBuilder()
72 .maximumSize(100000)
73 .weakValues()
74 // TODO: Once the entry expires, notify blocking threads (if any).
75 .expireAfterWrite(10, TimeUnit.MINUTES)
76 .build();
77 startAcceptingConnections();
78 }
79
80 public void deactivate() throws Exception {
81 channels.close();
82 bossGroup.shutdownGracefully();
83 workerGroup.shutdownGracefully();
84 }
85
86 @Override
87 public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
88 InternalMessage message = new InternalMessage.Builder(this)
89 .withId(RandomUtils.nextLong())
90 .withSender(localEp)
91 .withType(type)
92 .withPayload(payload)
93 .build();
94 sendAsync(ep, message);
95 }
96
97 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
98 Channel channel = null;
99 try {
Madan Jampani86ed0552014-10-03 16:45:42 -0700100 try {
101 channel = channels.borrowObject(ep);
102 channel.eventLoop().execute(new WriteTask(channel, message));
103 } finally {
104 channels.returnObject(ep, channel);
105 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700106 } catch (Exception e) {
107 throw new IOException(e);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700108 }
109 }
110
111 @Override
112 public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
113 throws IOException {
114 AsyncResponse<T> futureResponse = new AsyncResponse<T>();
115 Long messageId = RandomUtils.nextLong();
116 responseFutures.put(messageId, futureResponse);
117 InternalMessage message = new InternalMessage.Builder(this)
118 .withId(messageId)
119 .withSender(localEp)
120 .withType(type)
121 .withPayload(payload)
122 .build();
123 sendAsync(ep, message);
124 return futureResponse;
125 }
126
127 @Override
128 public void registerHandler(String type, MessageHandler handler) {
129 // TODO: Is this the right semantics for handler registration?
130 handlers.putIfAbsent(type, handler);
131 }
132
133 public void unregisterHandler(String type) {
134 handlers.remove(type);
135 }
136
137 private MessageHandler getMessageHandler(String type) {
138 return handlers.get(type);
139 }
140
141 private void startAcceptingConnections() throws InterruptedException {
142 ServerBootstrap b = new ServerBootstrap();
Madan Jampani86ed0552014-10-03 16:45:42 -0700143 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
144 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700145 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
146 b.group(bossGroup, workerGroup)
147 .channel(NioServerSocketChannel.class)
148 .childHandler(new OnosCommunicationChannelInitializer())
149 .option(ChannelOption.SO_BACKLOG, 128)
150 .childOption(ChannelOption.SO_KEEPALIVE, true);
151
152 // Bind and start to accept incoming connections.
153 b.bind(port).sync();
154 }
155
156 private class OnosCommunicationChannelFactory
157 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
158
159 @Override
160 public void activateObject(Endpoint endpoint, Channel channel)
161 throws Exception {
162 }
163
164 @Override
165 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
166 channel.close();
167 }
168
169 @Override
170 public Channel makeObject(Endpoint ep) throws Exception {
171 Bootstrap b = new Bootstrap();
172 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Madan Jampani86ed0552014-10-03 16:45:42 -0700173 b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
174 b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
Madan Jampaniab6d3112014-10-02 16:30:14 -0700175 b.group(workerGroup);
176 // TODO: Make this faster:
177 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
178 b.channel(NioSocketChannel.class);
179 b.option(ChannelOption.SO_KEEPALIVE, true);
180 b.handler(new OnosCommunicationChannelInitializer());
181
182 // Start the client.
183 ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
184 return f.channel();
185 }
186
187 @Override
188 public void passivateObject(Endpoint ep, Channel channel)
189 throws Exception {
190 }
191
192 @Override
193 public boolean validateObject(Endpoint ep, Channel channel) {
194 return channel.isOpen();
195 }
196 }
197
198 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
199
200 @Override
201 protected void initChannel(SocketChannel channel) throws Exception {
202 channel.pipeline()
Madan Jampani86ed0552014-10-03 16:45:42 -0700203 .addLast("encoder", new MessageEncoder(serializer))
204 .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
205 .addLast("handler", new InboundMessageDispatcher());
Madan Jampaniab6d3112014-10-02 16:30:14 -0700206 }
207 }
208
209 private class WriteTask implements Runnable {
210
Madan Jampani86ed0552014-10-03 16:45:42 -0700211 private final InternalMessage message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700212 private final Channel channel;
213
Madan Jampani86ed0552014-10-03 16:45:42 -0700214 public WriteTask(Channel channel, InternalMessage message) {
Madan Jampaniab6d3112014-10-02 16:30:14 -0700215 this.channel = channel;
Madan Jampani86ed0552014-10-03 16:45:42 -0700216 this.message = message;
Madan Jampaniab6d3112014-10-02 16:30:14 -0700217 }
218
219 @Override
220 public void run() {
221 channel.writeAndFlush(message);
222 }
223 }
224
225 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
226
227 @Override
228 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
229 String type = message.type();
230 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
231 try {
232 AsyncResponse<?> futureResponse =
233 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
234 if (futureResponse != null) {
235 futureResponse.setResponse(message.payload());
236 }
237 log.warn("Received a reply. But was unable to locate the request handle");
238 } finally {
239 NettyMessagingService.this.responseFutures.invalidate(message.id());
240 }
241 return;
242 }
243 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
244 handler.handle(message);
245 }
Madan Jampani86ed0552014-10-03 16:45:42 -0700246
247
248 @Override
249 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
250 context.close();
251 }
Madan Jampaniab6d3112014-10-02 16:30:14 -0700252 }
253}