blob: b6b3857b3d56952e35f4a675bf1e484063e26472 [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.messaging.impl;
2
3import java.io.IOException;
4import java.net.UnknownHostException;
5import java.util.concurrent.ConcurrentHashMap;
6import java.util.concurrent.ConcurrentMap;
7import java.util.concurrent.TimeUnit;
8
9import io.netty.bootstrap.Bootstrap;
10import io.netty.bootstrap.ServerBootstrap;
11import io.netty.buffer.PooledByteBufAllocator;
12import io.netty.channel.Channel;
13import io.netty.channel.ChannelFuture;
14import io.netty.channel.ChannelHandlerContext;
15import io.netty.channel.ChannelInitializer;
16import io.netty.channel.ChannelOption;
17import io.netty.channel.EventLoopGroup;
18import io.netty.channel.SimpleChannelInboundHandler;
19import io.netty.channel.nio.NioEventLoopGroup;
20import io.netty.channel.socket.SocketChannel;
21import io.netty.channel.socket.nio.NioServerSocketChannel;
22import io.netty.channel.socket.nio.NioSocketChannel;
23
24import org.apache.commons.lang.math.RandomUtils;
25import org.apache.commons.pool.KeyedObjectPool;
26import org.apache.commons.pool.KeyedPoolableObjectFactory;
27import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onlab.onos.store.cluster.messaging.SerializationService;
35import org.onlab.onos.store.messaging.Endpoint;
36import org.onlab.onos.store.messaging.MessageHandler;
37import org.onlab.onos.store.messaging.MessagingService;
38import org.onlab.onos.store.messaging.Response;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42import com.google.common.cache.Cache;
43import com.google.common.cache.CacheBuilder;
44
45/**
46 * A Netty based implementation of MessagingService.
47 */
48@Component(immediate = true)
49@Service
50public class NettyMessagingService implements MessagingService {
51
52 private final Logger log = LoggerFactory.getLogger(getClass());
53
54 private KeyedObjectPool<Endpoint, Channel> channels =
55 new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
56 private final int port;
57 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
58 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
59 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
60 private Cache<Long, AsyncResponse<?>> responseFutures;
61 private final Endpoint localEp;
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected SerializationService serializationService;
65
66 public NettyMessagingService() {
67 // TODO: Default port should be configurable.
68 this(8080);
69 }
70
71 // FIXME: Constructor should not throw exceptions.
72 public NettyMessagingService(int port) {
73 this.port = port;
74 try {
75 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
76 } catch (UnknownHostException e) {
77 // bailing out.
78 throw new RuntimeException(e);
79 }
80 }
81
82 @Activate
83 public void activate() throws Exception {
84 responseFutures = CacheBuilder.newBuilder()
85 .maximumSize(100000)
86 .weakValues()
87 // TODO: Once the entry expires, notify blocking threads (if any).
88 .expireAfterWrite(10, TimeUnit.MINUTES)
89 .build();
90 startAcceptingConnections();
91 }
92
93 @Deactivate
94 public void deactivate() throws Exception {
95 channels.close();
96 bossGroup.shutdownGracefully();
97 workerGroup.shutdownGracefully();
98 }
99
100 @Override
101 public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
102 InternalMessage message = new InternalMessage.Builder(this)
103 .withId(RandomUtils.nextLong())
104 .withSender(localEp)
105 .withType(type)
106 .withPayload(payload)
107 .build();
108 sendAsync(ep, message);
109 }
110
111 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
112 Channel channel = null;
113 try {
114 channel = channels.borrowObject(ep);
115 channel.eventLoop().execute(new WriteTask(channel, message));
116 } catch (Exception e) {
117 throw new IOException(e);
118 } finally {
119 try {
120 channels.returnObject(ep, channel);
121 } catch (Exception e) {
122 log.warn("Error returning object back to the pool", e);
123 // ignored.
124 }
125 }
126 }
127
128 @Override
129 public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
130 throws IOException {
131 AsyncResponse<T> futureResponse = new AsyncResponse<T>();
132 Long messageId = RandomUtils.nextLong();
133 responseFutures.put(messageId, futureResponse);
134 InternalMessage message = new InternalMessage.Builder(this)
135 .withId(messageId)
136 .withSender(localEp)
137 .withType(type)
138 .withPayload(payload)
139 .build();
140 sendAsync(ep, message);
141 return futureResponse;
142 }
143
144 @Override
145 public void registerHandler(String type, MessageHandler handler) {
146 // TODO: Is this the right semantics for handler registration?
147 handlers.putIfAbsent(type, handler);
148 }
149
150 public void unregisterHandler(String type) {
151 handlers.remove(type);
152 }
153
154 private MessageHandler getMessageHandler(String type) {
155 return handlers.get(type);
156 }
157
158 private void startAcceptingConnections() throws InterruptedException {
159 ServerBootstrap b = new ServerBootstrap();
160 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
161 b.group(bossGroup, workerGroup)
162 .channel(NioServerSocketChannel.class)
163 .childHandler(new OnosCommunicationChannelInitializer())
164 .option(ChannelOption.SO_BACKLOG, 128)
165 .childOption(ChannelOption.SO_KEEPALIVE, true);
166
167 // Bind and start to accept incoming connections.
168 b.bind(port).sync();
169 }
170
171 private class OnosCommunicationChannelFactory
172 implements KeyedPoolableObjectFactory<Endpoint, Channel> {
173
174 @Override
175 public void activateObject(Endpoint endpoint, Channel channel)
176 throws Exception {
177 }
178
179 @Override
180 public void destroyObject(Endpoint ep, Channel channel) throws Exception {
181 channel.close();
182 }
183
184 @Override
185 public Channel makeObject(Endpoint ep) throws Exception {
186 Bootstrap b = new Bootstrap();
187 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
188 b.group(workerGroup);
189 // TODO: Make this faster:
190 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
191 b.channel(NioSocketChannel.class);
192 b.option(ChannelOption.SO_KEEPALIVE, true);
193 b.handler(new OnosCommunicationChannelInitializer());
194
195 // Start the client.
196 ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
197 return f.channel();
198 }
199
200 @Override
201 public void passivateObject(Endpoint ep, Channel channel)
202 throws Exception {
203 }
204
205 @Override
206 public boolean validateObject(Endpoint ep, Channel channel) {
207 return channel.isOpen();
208 }
209 }
210
211 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
212
213 @Override
214 protected void initChannel(SocketChannel channel) throws Exception {
215 channel.pipeline()
216 .addLast(new MessageEncoder(serializationService))
217 .addLast(new MessageDecoder(NettyMessagingService.this, serializationService))
218 .addLast(new NettyMessagingService.InboundMessageDispatcher());
219 }
220 }
221
222 private class WriteTask implements Runnable {
223
224 private final Object message;
225 private final Channel channel;
226
227 public WriteTask(Channel channel, Object message) {
228 this.message = message;
229 this.channel = channel;
230 }
231
232 @Override
233 public void run() {
234 channel.writeAndFlush(message);
235 }
236 }
237
238 private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
239
240 @Override
241 protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
242 String type = message.type();
243 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
244 try {
245 AsyncResponse<?> futureResponse =
246 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
247 if (futureResponse != null) {
248 futureResponse.setResponse(message.payload());
249 }
250 log.warn("Received a reply. But was unable to locate the request handle");
251 } finally {
252 NettyMessagingService.this.responseFutures.invalidate(message.id());
253 }
254 return;
255 }
256 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
257 handler.handle(message);
258 }
259 }
260}