blob: 54da8cc4a85b255860c81ec2108bb3c2743ab629 [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
14import io.netty.channel.ChannelHandlerContext;
15import io.netty.channel.ChannelInitializer;
16import io.netty.channel.ChannelOption;
17import io.netty.channel.EventLoopGroup;
18import io.netty.channel.SimpleChannelInboundHandler;
19import io.netty.channel.nio.NioEventLoopGroup;
20import io.netty.channel.socket.SocketChannel;
21import io.netty.channel.socket.nio.NioServerSocketChannel;
22import io.netty.channel.socket.nio.NioSocketChannel;
23
24import org.apache.commons.lang.math.RandomUtils;
25import org.apache.commons.pool.KeyedObjectPool;
26import org.apache.commons.pool.KeyedPoolableObjectFactory;
27import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import com.google.common.cache.Cache;
32import com.google.common.cache.CacheBuilder;
33
34/**
35 * A Netty based implementation of MessagingService.
36 */
37public class NettyMessagingService implements MessagingService {
38
39 private final Logger log = LoggerFactory.getLogger(getClass());
40
41 private KeyedObjectPool<Endpoint, Channel> channels =
42 new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
43 private final int port;
44 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
45 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
46 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
47 private Cache<Long, AsyncResponse<?>> responseFutures;
48 private final Endpoint localEp;
49
50 protected Serializer serializer;
51
52 public NettyMessagingService() {
53 // TODO: Default port should be configurable.
54 this(8080);
55 }
56
57 // FIXME: Constructor should not throw exceptions.
58 public NettyMessagingService(int port) {
59 this.port = port;
60 try {
61 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
62 } catch (UnknownHostException e) {
63 // bailing out.
64 throw new RuntimeException(e);
65 }
66 }
67
68 public void activate() throws Exception {
69 responseFutures = CacheBuilder.newBuilder()
70 .maximumSize(100000)
71 .weakValues()
72 // TODO: Once the entry expires, notify blocking threads (if any).
73 .expireAfterWrite(10, TimeUnit.MINUTES)
74 .build();
75 startAcceptingConnections();
76 }
77
78 public void deactivate() throws Exception {
79 channels.close();
80 bossGroup.shutdownGracefully();
81 workerGroup.shutdownGracefully();
82 }
83
84 @Override
85 public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
86 InternalMessage message = new InternalMessage.Builder(this)
87 .withId(RandomUtils.nextLong())
88 .withSender(localEp)
89 .withType(type)
90 .withPayload(payload)
91 .build();
92 sendAsync(ep, message);
93 }
94
95 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
96 Channel channel = null;
97 try {
98 channel = channels.borrowObject(ep);
99 channel.eventLoop().execute(new WriteTask(channel, message));
100 } catch (Exception e) {
101 throw new IOException(e);
102 } finally {
103 try {
104 channels.returnObject(ep, channel);
105 } catch (Exception e) {
106 log.warn("Error returning object back to the pool", e);
107 // ignored.
108 }
109 }
110 }
111
112 @Override
113 public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
114 throws IOException {
115 AsyncResponse<T> futureResponse = new AsyncResponse<T>();
116 Long messageId = RandomUtils.nextLong();
117 responseFutures.put(messageId, futureResponse);
118 InternalMessage message = new InternalMessage.Builder(this)
119 .withId(messageId)
120 .withSender(localEp)
121 .withType(type)
122 .withPayload(payload)
123 .build();
124 sendAsync(ep, message);
125 return futureResponse;
126 }
127
128 @Override
129 public void registerHandler(String type, MessageHandler handler) {
130 // TODO: Is this the right semantics for handler registration?
131 handlers.putIfAbsent(type, handler);
132 }
133
134 public void unregisterHandler(String type) {
135 handlers.remove(type);
136 }
137
138 private MessageHandler getMessageHandler(String type) {
139 return handlers.get(type);
140 }
141
142 private void startAcceptingConnections() throws InterruptedException {
143 ServerBootstrap b = new ServerBootstrap();
144 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
145 b.group(bossGroup, workerGroup)
146 .channel(NioServerSocketChannel.class)
147 .childHandler(new OnosCommunicationChannelInitializer())
148 .option(ChannelOption.SO_BACKLOG, 128)
149 .childOption(ChannelOption.SO_KEEPALIVE, true);
150
151 // Bind and start to accept incoming connections.
152 b.bind(port).sync();
153 }
154
155 private class OnosCommunicationChannelFactory
156 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
157
158 @Override
159 public void activateObject(Endpoint endpoint, Channel channel)
160 throws Exception {
161 }
162
163 @Override
164 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
165 channel.close();
166 }
167
168 @Override
169 public Channel makeObject(Endpoint ep) throws Exception {
170 Bootstrap b = new Bootstrap();
171 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
172 b.group(workerGroup);
173 // TODO: Make this faster:
174 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
175 b.channel(NioSocketChannel.class);
176 b.option(ChannelOption.SO_KEEPALIVE, true);
177 b.handler(new OnosCommunicationChannelInitializer());
178
179 // Start the client.
180 ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
181 return f.channel();
182 }
183
184 @Override
185 public void passivateObject(Endpoint ep, Channel channel)
186 throws Exception {
187 }
188
189 @Override
190 public boolean validateObject(Endpoint ep, Channel channel) {
191 return channel.isOpen();
192 }
193 }
194
195 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
196
197 @Override
198 protected void initChannel(SocketChannel channel) throws Exception {
199 channel.pipeline()
200 .addLast(new MessageEncoder(serializer))
201 .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
202 .addLast(new NettyMessagingService.InboundMessageDispatcher());
203 }
204 }
205
206 private class WriteTask implements Runnable {
207
208 private final Object message;
209 private final Channel channel;
210
211 public WriteTask(Channel channel, Object message) {
212 this.message = message;
213 this.channel = channel;
214 }
215
216 @Override
217 public void run() {
218 channel.writeAndFlush(message);
219 }
220 }
221
222 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
223
224 @Override
225 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
226 String type = message.type();
227 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
228 try {
229 AsyncResponse<?> futureResponse =
230 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
231 if (futureResponse != null) {
232 futureResponse.setResponse(message.payload());
233 }
234 log.warn("Received a reply. But was unable to locate the request handle");
235 } finally {
236 NettyMessagingService.this.responseFutures.invalidate(message.id());
237 }
238 return;
239 }
240 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
241 handler.handle(message);
242 }
243 }
244}