Added a messaging service implementation on top of IOLoop. Added ability to easily switch between netty and io loop (default is netty)
Change-Id: Id9af0756bf0a542f832f3611b486b2ac680b91e4
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index da72886..eeba05e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -46,10 +46,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,17 +63,18 @@
import com.google.common.cache.RemovalNotification;
/**
- * A Netty based implementation of MessagingService.
+ * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
*/
public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+
private final Endpoint localEp;
- private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
- .maximumSize(100000)
.expireAfterWrite(10, TimeUnit.SECONDS)
.removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
@Override
@@ -124,6 +129,7 @@
}
public void activate() throws InterruptedException {
+ channels.setLifo(false);
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
initEventLoopGroup();
@@ -146,12 +152,10 @@
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
- InternalMessage message = new InternalMessage.Builder(this)
- .withId(messageIdGenerator.incrementAndGet())
- .withSender(localEp)
- .withType(type)
- .withPayload(payload)
- .build();
+ InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+ localEp,
+ type,
+ payload);
sendAsync(ep, message);
}
@@ -164,7 +168,7 @@
try {
try {
channel = channels.borrowObject(ep);
- channel.eventLoop().execute(new WriteTask(channel, message));
+ channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} finally {
channels.returnObject(ep, channel);
}
@@ -173,7 +177,6 @@
} catch (Exception e) {
throw new IOException(e);
}
-
}
@Override
@@ -181,12 +184,7 @@
CompletableFuture<byte[]> response = new CompletableFuture<>();
Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, response);
- InternalMessage message = new InternalMessage.Builder(this)
- .withId(messageId)
- .withSender(localEp)
- .withType(type)
- .withPayload(payload)
- .build();
+ InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
try {
sendAsync(ep, message);
} catch (Exception e) {
@@ -197,24 +195,26 @@
}
@Override
- public void registerHandler(String type, MessageHandler handler) {
- handlers.put(type, handler);
+ public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
}
@Override
- public void registerHandler(String type, MessageHandler handler, Executor executor) {
- handlers.put(type, new MessageHandler() {
- @Override
- public void handle(Message message) throws IOException {
- executor.execute(() -> {
- try {
- handler.handle(message);
- } catch (Exception e) {
- log.debug("Failed to process message of type {}", type, e);
- }
- });
+ public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> {
+ byte[] responsePayload = handler.apply(message.payload());
+ if (responsePayload != null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ responsePayload);
+ try {
+ sendAsync(message.sender(), response);
+ } catch (IOException e) {
+ log.debug("Failed to respond", e);
+ }
}
- });
+ }));
}
@Override
@@ -222,14 +222,12 @@
handlers.remove(type);
}
- private MessageHandler getMessageHandler(String type) {
- return handlers.get(type);
- }
-
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ b.option(ChannelOption.SO_RCVBUF, 1048576);
+ b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(serverGroup, clientGroup)
.channel(serverChannelClass)
@@ -258,8 +256,9 @@
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
- bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+ bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
bootstrap.group(clientGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -268,6 +267,7 @@
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
+ log.info("Established a new connection to {}", ep);
return f.channel();
}
@@ -291,27 +291,11 @@
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
+ .addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
- private static class WriteTask implements Runnable {
-
- private final InternalMessage message;
- private final Channel channel;
-
- public WriteTask(Channel channel, InternalMessage message) {
- this.channel = channel;
- this.message = message;
- }
-
- @Override
- public void run() {
- channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- }
- }
-
@ChannelHandler.Sharable
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
@@ -329,10 +313,10 @@
private void dispatchLocally(InternalMessage message) throws IOException {
String type = message.type();
- if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
+ if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
CompletableFuture<byte[]> futureResponse =
- NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+ responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.complete(message.payload());
} else {
@@ -341,13 +325,13 @@
+ " request handle", message.id(), message.sender());
}
} finally {
- NettyMessagingService.this.responseFutures.invalidate(message.id());
+ responseFutures.invalidate(message.id());
}
return;
}
- MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+ Consumer<InternalMessage> handler = handlers.get(type);
if (handler != null) {
- handler.handle(message);
+ handler.accept(message);
} else {
log.debug("No handler registered for {}", type);
}