Added a messaging service implementation on top of IOLoop. Added ability to easily switch between netty and io loop (default is netty)

Change-Id: Id9af0756bf0a542f832f3611b486b2ac680b91e4
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index da72886..eeba05e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -46,10 +46,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,17 +63,18 @@
 import com.google.common.cache.RemovalNotification;
 
 /**
- * A Netty based implementation of MessagingService.
+ * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
  */
 public class NettyMessagingService implements MessagingService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+
     private final Endpoint localEp;
-    private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
-            .maximumSize(100000)
             .expireAfterWrite(10, TimeUnit.SECONDS)
             .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
                 @Override
@@ -124,6 +129,7 @@
     }
 
     public void activate() throws InterruptedException {
+        channels.setLifo(false);
         channels.setTestOnBorrow(true);
         channels.setTestOnReturn(true);
         initEventLoopGroup();
@@ -146,12 +152,10 @@
 
     @Override
     public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
-        InternalMessage message = new InternalMessage.Builder(this)
-            .withId(messageIdGenerator.incrementAndGet())
-            .withSender(localEp)
-            .withType(type)
-            .withPayload(payload)
-            .build();
+        InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+                                                      localEp,
+                                                      type,
+                                                      payload);
         sendAsync(ep, message);
     }
 
@@ -164,7 +168,7 @@
         try {
             try {
                 channel = channels.borrowObject(ep);
-                channel.eventLoop().execute(new WriteTask(channel, message));
+                channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
             } finally {
                 channels.returnObject(ep, channel);
             }
@@ -173,7 +177,6 @@
         } catch (Exception e) {
             throw new IOException(e);
         }
-
     }
 
     @Override
@@ -181,12 +184,7 @@
         CompletableFuture<byte[]> response = new CompletableFuture<>();
         Long messageId = messageIdGenerator.incrementAndGet();
         responseFutures.put(messageId, response);
-        InternalMessage message = new InternalMessage.Builder(this)
-            .withId(messageId)
-            .withSender(localEp)
-            .withType(type)
-            .withPayload(payload)
-            .build();
+        InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
         try {
             sendAsync(ep, message);
         } catch (Exception e) {
@@ -197,24 +195,26 @@
     }
 
     @Override
-    public void registerHandler(String type, MessageHandler handler) {
-        handlers.put(type, handler);
+    public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
     }
 
     @Override
-    public void registerHandler(String type, MessageHandler handler, Executor executor) {
-        handlers.put(type, new MessageHandler() {
-            @Override
-            public void handle(Message message) throws IOException {
-                executor.execute(() -> {
-                    try {
-                        handler.handle(message);
-                    } catch (Exception e) {
-                        log.debug("Failed to process message of type {}", type, e);
-                    }
-                });
+    public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> {
+            byte[] responsePayload = handler.apply(message.payload());
+            if (responsePayload != null) {
+                InternalMessage response = new InternalMessage(message.id(),
+                        localEp,
+                        REPLY_MESSAGE_TYPE,
+                        responsePayload);
+                try {
+                    sendAsync(message.sender(), response);
+                } catch (IOException e) {
+                    log.debug("Failed to respond", e);
+                }
             }
-        });
+        }));
     }
 
     @Override
@@ -222,14 +222,12 @@
         handlers.remove(type);
     }
 
-    private MessageHandler getMessageHandler(String type) {
-        return handlers.get(type);
-    }
-
     private void startAcceptingConnections() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
         b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+        b.option(ChannelOption.SO_RCVBUF, 1048576);
+        b.option(ChannelOption.TCP_NODELAY, true);
         b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         b.group(serverGroup, clientGroup)
             .channel(serverChannelClass)
@@ -258,8 +256,9 @@
         public Channel makeObject(Endpoint ep) throws Exception {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+            bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
             bootstrap.group(clientGroup);
             // TODO: Make this faster:
             // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -268,6 +267,7 @@
             bootstrap.handler(new OnosCommunicationChannelInitializer());
             // Start the client.
             ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
+            log.info("Established a new connection to {}", ep);
             return f.channel();
         }
 
@@ -291,27 +291,11 @@
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
                 .addLast("encoder", encoder)
-                .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
+                .addLast("decoder", new MessageDecoder())
                 .addLast("handler", dispatcher);
         }
     }
 
-    private static class WriteTask implements Runnable {
-
-        private final InternalMessage message;
-        private final Channel channel;
-
-        public WriteTask(Channel channel, InternalMessage message) {
-            this.channel = channel;
-            this.message = message;
-        }
-
-        @Override
-        public void run() {
-            channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-        }
-    }
-
     @ChannelHandler.Sharable
     private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
 
@@ -329,10 +313,10 @@
 
     private void dispatchLocally(InternalMessage message) throws IOException {
         String type = message.type();
-        if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
+        if (REPLY_MESSAGE_TYPE.equals(type)) {
             try {
                 CompletableFuture<byte[]> futureResponse =
-                    NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+                    responseFutures.getIfPresent(message.id());
                 if (futureResponse != null) {
                     futureResponse.complete(message.payload());
                 } else {
@@ -341,13 +325,13 @@
                             + " request handle", message.id(), message.sender());
                 }
             } finally {
-                NettyMessagingService.this.responseFutures.invalidate(message.id());
+                responseFutures.invalidate(message.id());
             }
             return;
         }
-        MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+        Consumer<InternalMessage> handler = handlers.get(type);
         if (handler != null) {
-            handler.handle(message);
+            handler.accept(message);
         } else {
             log.debug("No handler registered for {}", type);
         }