blob: 321e0ef5a1dcb7a955c2eeca235595df0523e51d [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.messaging.impl;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
14import io.netty.channel.ChannelHandlerContext;
15import io.netty.channel.ChannelInitializer;
16import io.netty.channel.ChannelOption;
17import io.netty.channel.EventLoopGroup;
18import io.netty.channel.SimpleChannelInboundHandler;
19import io.netty.channel.nio.NioEventLoopGroup;
20import io.netty.channel.socket.SocketChannel;
21import io.netty.channel.socket.nio.NioServerSocketChannel;
22import io.netty.channel.socket.nio.NioSocketChannel;
23
24import org.apache.commons.lang.math.RandomUtils;
25import org.apache.commons.pool.KeyedObjectPool;
26import org.apache.commons.pool.KeyedPoolableObjectFactory;
27import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
Madan Jampani890bc352014-10-01 22:35:29 -070034import org.onlab.onos.store.messaging.Endpoint;
35import org.onlab.onos.store.messaging.MessageHandler;
36import org.onlab.onos.store.messaging.MessagingService;
37import org.onlab.onos.store.messaging.Response;
38import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
41import com.google.common.cache.Cache;
42import com.google.common.cache.CacheBuilder;
43
44/**
45 * A Netty based implementation of MessagingService.
46 */
47@Component(immediate = true)
48@Service
49public class NettyMessagingService implements MessagingService {
50
51 private final Logger log = LoggerFactory.getLogger(getClass());
52
53 private KeyedObjectPool<Endpoint, Channel> channels =
54 new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
55 private final int port;
56 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
57 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
58 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
59 private Cache<Long, AsyncResponse<?>> responseFutures;
60 private final Endpoint localEp;
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanie4ff21b2014-10-02 15:17:32 -070063 protected Serializer serializer;
Madan Jampani890bc352014-10-01 22:35:29 -070064
65 public NettyMessagingService() {
66 // TODO: Default port should be configurable.
67 this(8080);
68 }
69
70 // FIXME: Constructor should not throw exceptions.
71 public NettyMessagingService(int port) {
72 this.port = port;
73 try {
74 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
75 } catch (UnknownHostException e) {
76 // bailing out.
77 throw new RuntimeException(e);
78 }
79 }
80
81 @Activate
82 public void activate() throws Exception {
83 responseFutures = CacheBuilder.newBuilder()
84 .maximumSize(100000)
85 .weakValues()
86 // TODO: Once the entry expires, notify blocking threads (if any).
87 .expireAfterWrite(10, TimeUnit.MINUTES)
88 .build();
89 startAcceptingConnections();
90 }
91
92 @Deactivate
93 public void deactivate() throws Exception {
94 channels.close();
95 bossGroup.shutdownGracefully();
96 workerGroup.shutdownGracefully();
97 }
98
99 @Override
100 public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
101 InternalMessage message = new InternalMessage.Builder(this)
102 .withId(RandomUtils.nextLong())
103 .withSender(localEp)
104 .withType(type)
105 .withPayload(payload)
106 .build();
107 sendAsync(ep, message);
108 }
109
110 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
111 Channel channel = null;
112 try {
113 channel = channels.borrowObject(ep);
114 channel.eventLoop().execute(new WriteTask(channel, message));
115 } catch (Exception e) {
116 throw new IOException(e);
117 } finally {
118 try {
119 channels.returnObject(ep, channel);
120 } catch (Exception e) {
121 log.warn("Error returning object back to the pool", e);
122 // ignored.
123 }
124 }
125 }
126
127 @Override
128 public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
129 throws IOException {
130 AsyncResponse<T> futureResponse = new AsyncResponse<T>();
131 Long messageId = RandomUtils.nextLong();
132 responseFutures.put(messageId, futureResponse);
133 InternalMessage message = new InternalMessage.Builder(this)
134 .withId(messageId)
135 .withSender(localEp)
136 .withType(type)
137 .withPayload(payload)
138 .build();
139 sendAsync(ep, message);
140 return futureResponse;
141 }
142
143 @Override
144 public void registerHandler(String type, MessageHandler handler) {
145 // TODO: Is this the right semantics for handler registration?
146 handlers.putIfAbsent(type, handler);
147 }
148
149 public void unregisterHandler(String type) {
150 handlers.remove(type);
151 }
152
153 private MessageHandler getMessageHandler(String type) {
154 return handlers.get(type);
155 }
156
157 private void startAcceptingConnections() throws InterruptedException {
158 ServerBootstrap b = new ServerBootstrap();
159 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
160 b.group(bossGroup, workerGroup)
161 .channel(NioServerSocketChannel.class)
162 .childHandler(new OnosCommunicationChannelInitializer())
163 .option(ChannelOption.SO_BACKLOG, 128)
164 .childOption(ChannelOption.SO_KEEPALIVE, true);
165
166 // Bind and start to accept incoming connections.
167 b.bind(port).sync();
168 }
169
170 private class OnosCommunicationChannelFactory
171 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
172
173 @Override
174 public void activateObject(Endpoint endpoint, Channel channel)
175 throws Exception {
176 }
177
178 @Override
179 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
180 channel.close();
181 }
182
183 @Override
184 public Channel makeObject(Endpoint ep) throws Exception {
185 Bootstrap b = new Bootstrap();
186 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
187 b.group(workerGroup);
188 // TODO: Make this faster:
189 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
190 b.channel(NioSocketChannel.class);
191 b.option(ChannelOption.SO_KEEPALIVE, true);
192 b.handler(new OnosCommunicationChannelInitializer());
193
194 // Start the client.
195 ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
196 return f.channel();
197 }
198
199 @Override
200 public void passivateObject(Endpoint ep, Channel channel)
201 throws Exception {
202 }
203
204 @Override
205 public boolean validateObject(Endpoint ep, Channel channel) {
206 return channel.isOpen();
207 }
208 }
209
210 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
211
212 @Override
213 protected void initChannel(SocketChannel channel) throws Exception {
214 channel.pipeline()
Madan Jampanie4ff21b2014-10-02 15:17:32 -0700215 .addLast(new MessageEncoder(serializer))
216 .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
Madan Jampani890bc352014-10-01 22:35:29 -0700217 .addLast(new NettyMessagingService.InboundMessageDispatcher());
218 }
219 }
220
221 private class WriteTask implements Runnable {
222
223 private final Object message;
224 private final Channel channel;
225
226 public WriteTask(Channel channel, Object message) {
227 this.message = message;
228 this.channel = channel;
229 }
230
231 @Override
232 public void run() {
233 channel.writeAndFlush(message);
234 }
235 }
236
237 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
238
239 @Override
240 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
241 String type = message.type();
242 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
243 try {
244 AsyncResponse<?> futureResponse =
245 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
246 if (futureResponse != null) {
247 futureResponse.setResponse(message.payload());
248 }
249 log.warn("Received a reply. But was unable to locate the request handle");
250 } finally {
251 NettyMessagingService.this.responseFutures.invalidate(message.id());
252 }
253 return;
254 }
255 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
256 handler.handle(message);
257 }
258 }
259}