Added a messaging service implementation on top of IOLoop. Added ability to easily switch between netty and io loop (default is netty)

Change-Id: Id9af0756bf0a542f832f3611b486b2ac680b91e4
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
index c95412b..9e5677c 100644
--- a/utils/netty/pom.xml
+++ b/utils/netty/pom.xml
@@ -39,6 +39,10 @@
         </dependency>
         <dependency>
             <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onlab-misc</artifactId>
         </dependency>
         <dependency>
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
deleted file mode 100644
index db38c19..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-//FIXME: Should be move out to test or app
-/**
- * Message handler that echos the message back to the sender.
- */
-public class EchoHandler implements MessageHandler {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public void handle(Message message) throws IOException {
-        log.info("Received message. Echoing it back to the sender.");
-        message.respond(message.payload());
-    }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
deleted file mode 100644
index af97763..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import static com.google.common.base.Preconditions.*;
-
-import java.util.Objects;
-
-import org.onlab.packet.IpAddress;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Representation of a TCP/UDP communication end point.
- */
-public final class Endpoint {
-
-    private final int port;
-    private final IpAddress ip;
-
-    public Endpoint(IpAddress host, int port) {
-        this.ip = checkNotNull(host);
-        this.port = port;
-    }
-
-    public IpAddress host() {
-        return ip;
-    }
-
-    public int port() {
-        return port;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("ip", ip)
-                .add("port", port)
-                .toString();
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(ip, port);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        Endpoint that = (Endpoint) obj;
-        return Objects.equals(this.port, that.port) &&
-               Objects.equals(this.ip, that.ip);
-    }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index 9d42b6d..102e2a2 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -15,9 +15,8 @@
  */
 package org.onlab.netty;
 
-import java.io.IOException;
-
 import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.store.cluster.messaging.Endpoint;
 
 import com.google.common.base.MoreObjects;
 
@@ -25,20 +24,14 @@
  * Internal message representation with additional attributes
  * for supporting, synchronous request/reply behavior.
  */
-public final class InternalMessage implements Message {
+public final class InternalMessage {
 
-    public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+    private final long id;
+    private final Endpoint sender;
+    private final String type;
+    private final byte[] payload;
 
-    private long id;
-    private Endpoint sender;
-    private String type;
-    private byte[] payload;
-    private transient NettyMessagingService messagingService;
-
-    // Must be created using the Builder.
-    private InternalMessage() {}
-
-    InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+    public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
         this.id = id;
         this.sender = sender;
         this.type = type;
@@ -57,26 +50,10 @@
         return sender;
     }
 
-    @Override
     public byte[] payload() {
         return payload;
     }
 
-    protected void setMessagingService(NettyMessagingService messagingService) {
-        this.messagingService = messagingService;
-    }
-
-    @Override
-    public void respond(byte[] data) throws IOException {
-        Builder builder = new Builder(messagingService);
-        InternalMessage message = builder.withId(this.id)
-            .withSender(messagingService.localEp())
-            .withPayload(data)
-            .withType(REPLY_MESSAGE_TYPE)
-            .build();
-        messagingService.sendAsync(sender, message);
-    }
-
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
@@ -86,39 +63,4 @@
                 .add("payload", ByteArraySizeHashPrinter.of(payload))
                 .toString();
     }
-
-    /**
-     * Builder for InternalMessages.
-     */
-    public static final class Builder {
-        private InternalMessage message;
-
-        public Builder(NettyMessagingService messagingService) {
-            message = new InternalMessage();
-            message.messagingService = messagingService;
-        }
-
-        public Builder withId(long id) {
-            message.id = id;
-            return this;
-        }
-
-        public Builder withType(String type) {
-            message.type = type;
-            return this;
-        }
-
-        public Builder withSender(Endpoint sender) {
-            message.sender = sender;
-            return this;
-        }
-        public Builder withPayload(byte[] payload) {
-            message.payload = payload;
-            return this;
-        }
-
-        public InternalMessage build() {
-            return message;
-        }
-    }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
deleted file mode 100644
index d91e8ae..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A MessageHandler that simply logs the information.
- */
-public class LoggingHandler implements MessageHandler {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public void handle(Message message) {
-        log.info("Received message. Payload has {} bytes", message.payload().length);
-    }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
deleted file mode 100644
index 2baea96..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/Message.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-/**
- * A unit of communication.
- * Has a payload. Also supports a feature to respond back to the sender.
- */
-public interface Message {
-
-    /**
-     * Returns the payload of this message.
-     * @return message payload.
-     */
-    public byte[] payload();
-
-    /**
-     * Sends a reply back to the sender of this message.
-     * @param data payload of the response.
-     * @throws IOException if there is a communication error.
-     */
-    public void respond(byte[] data) throws IOException;
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 440fc52..c34d3cc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -24,6 +24,7 @@
 
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,8 +37,6 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final NettyMessagingService messagingService;
-
     private long messageId;
     private Version ipVersion;
     private IpAddress senderIp;
@@ -46,9 +45,8 @@
     private String messageType;
     private int contentLength;
 
-    public MessageDecoder(NettyMessagingService messagingService) {
+    public MessageDecoder() {
         super(DecoderState.READ_MESSAGE_ID);
-        this.messagingService = messagingService;
     }
 
     @Override
@@ -91,7 +89,6 @@
                     new Endpoint(senderIp, senderPort),
                     messageType,
                     payload);
-            message.setMessagingService(messagingService);
             out.add(message);
             checkpoint(DecoderState.READ_MESSAGE_ID);
             break;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index f5a8d2e..2b7784f 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -24,6 +24,7 @@
 
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
deleted file mode 100644
index a87d8fc..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-/**
- * Handler for a message.
- */
-public interface MessageHandler {
-
-    /**
-     * Handles the message.
-     *
-     * @param message message.
-     * @throws IOException if an error is encountered handling the message
-     */
-    public void handle(Message message) throws IOException;
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
deleted file mode 100644
index 1b42b04..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Interface for low level messaging primitives.
- */
-public interface MessagingService {
-    /**
-     * Sends a message asynchronously to the specified communication end point.
-     * The message is specified using the type and payload.
-     * @param ep end point to send the message to.
-     * @param type type of message.
-     * @param payload message payload bytes.
-     * @throws IOException when I/O exception of some sort has occurred
-     */
-    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
-
-    /**
-     * Sends a message synchronously and waits for a response.
-     * @param ep end point to send the message to.
-     * @param type type of message.
-     * @param payload message payload.
-     * @return a response future
-     */
-    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
-
-    /**
-     * Registers a new message handler for message type.
-     * @param type message type.
-     * @param handler message handler
-     * @param executor executor to use for running message handler logic.
-     */
-    public void registerHandler(String type, MessageHandler handler, Executor executor);
-
-    /**
-     * Registers a new message handler for message type.
-     * @param type message type.
-     * @param handler message handler
-     */
-    @Deprecated
-    public void registerHandler(String type, MessageHandler handler);
-
-    /**
-     * Unregister current handler, if one exists for message type.
-     * @param type message type
-     */
-    public void unregisterHandler(String type);
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index da72886..eeba05e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -46,10 +46,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,17 +63,18 @@
 import com.google.common.cache.RemovalNotification;
 
 /**
- * A Netty based implementation of MessagingService.
+ * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
  */
 public class NettyMessagingService implements MessagingService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+
     private final Endpoint localEp;
-    private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
-            .maximumSize(100000)
             .expireAfterWrite(10, TimeUnit.SECONDS)
             .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
                 @Override
@@ -124,6 +129,7 @@
     }
 
     public void activate() throws InterruptedException {
+        channels.setLifo(false);
         channels.setTestOnBorrow(true);
         channels.setTestOnReturn(true);
         initEventLoopGroup();
@@ -146,12 +152,10 @@
 
     @Override
     public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
-        InternalMessage message = new InternalMessage.Builder(this)
-            .withId(messageIdGenerator.incrementAndGet())
-            .withSender(localEp)
-            .withType(type)
-            .withPayload(payload)
-            .build();
+        InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+                                                      localEp,
+                                                      type,
+                                                      payload);
         sendAsync(ep, message);
     }
 
@@ -164,7 +168,7 @@
         try {
             try {
                 channel = channels.borrowObject(ep);
-                channel.eventLoop().execute(new WriteTask(channel, message));
+                channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
             } finally {
                 channels.returnObject(ep, channel);
             }
@@ -173,7 +177,6 @@
         } catch (Exception e) {
             throw new IOException(e);
         }
-
     }
 
     @Override
@@ -181,12 +184,7 @@
         CompletableFuture<byte[]> response = new CompletableFuture<>();
         Long messageId = messageIdGenerator.incrementAndGet();
         responseFutures.put(messageId, response);
-        InternalMessage message = new InternalMessage.Builder(this)
-            .withId(messageId)
-            .withSender(localEp)
-            .withType(type)
-            .withPayload(payload)
-            .build();
+        InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
         try {
             sendAsync(ep, message);
         } catch (Exception e) {
@@ -197,24 +195,26 @@
     }
 
     @Override
-    public void registerHandler(String type, MessageHandler handler) {
-        handlers.put(type, handler);
+    public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
     }
 
     @Override
-    public void registerHandler(String type, MessageHandler handler, Executor executor) {
-        handlers.put(type, new MessageHandler() {
-            @Override
-            public void handle(Message message) throws IOException {
-                executor.execute(() -> {
-                    try {
-                        handler.handle(message);
-                    } catch (Exception e) {
-                        log.debug("Failed to process message of type {}", type, e);
-                    }
-                });
+    public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> {
+            byte[] responsePayload = handler.apply(message.payload());
+            if (responsePayload != null) {
+                InternalMessage response = new InternalMessage(message.id(),
+                        localEp,
+                        REPLY_MESSAGE_TYPE,
+                        responsePayload);
+                try {
+                    sendAsync(message.sender(), response);
+                } catch (IOException e) {
+                    log.debug("Failed to respond", e);
+                }
             }
-        });
+        }));
     }
 
     @Override
@@ -222,14 +222,12 @@
         handlers.remove(type);
     }
 
-    private MessageHandler getMessageHandler(String type) {
-        return handlers.get(type);
-    }
-
     private void startAcceptingConnections() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
         b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+        b.option(ChannelOption.SO_RCVBUF, 1048576);
+        b.option(ChannelOption.TCP_NODELAY, true);
         b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         b.group(serverGroup, clientGroup)
             .channel(serverChannelClass)
@@ -258,8 +256,9 @@
         public Channel makeObject(Endpoint ep) throws Exception {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+            bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
             bootstrap.group(clientGroup);
             // TODO: Make this faster:
             // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -268,6 +267,7 @@
             bootstrap.handler(new OnosCommunicationChannelInitializer());
             // Start the client.
             ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
+            log.info("Established a new connection to {}", ep);
             return f.channel();
         }
 
@@ -291,27 +291,11 @@
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
                 .addLast("encoder", encoder)
-                .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
+                .addLast("decoder", new MessageDecoder())
                 .addLast("handler", dispatcher);
         }
     }
 
-    private static class WriteTask implements Runnable {
-
-        private final InternalMessage message;
-        private final Channel channel;
-
-        public WriteTask(Channel channel, InternalMessage message) {
-            this.channel = channel;
-            this.message = message;
-        }
-
-        @Override
-        public void run() {
-            channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-        }
-    }
-
     @ChannelHandler.Sharable
     private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
 
@@ -329,10 +313,10 @@
 
     private void dispatchLocally(InternalMessage message) throws IOException {
         String type = message.type();
-        if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
+        if (REPLY_MESSAGE_TYPE.equals(type)) {
             try {
                 CompletableFuture<byte[]> futureResponse =
-                    NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+                    responseFutures.getIfPresent(message.id());
                 if (futureResponse != null) {
                     futureResponse.complete(message.payload());
                 } else {
@@ -341,13 +325,13 @@
                             + " request handle", message.id(), message.sender());
                 }
             } finally {
-                NettyMessagingService.this.responseFutures.invalidate(message.id());
+                responseFutures.invalidate(message.id());
             }
             return;
         }
-        MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+        Consumer<InternalMessage> handler = handlers.get(type);
         if (handler != null) {
-            handler.handle(message);
+            handler.accept(message);
         } else {
             log.debug("No handler registered for {}", type);
         }
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index 3eb0d39..61d8541 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -18,13 +18,17 @@
 import static org.junit.Assert.assertArrayEquals;
 
 import java.net.InetAddress;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import org.apache.commons.lang3.RandomUtils;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Simple ping-pong test that exercises NettyMessagingService.
@@ -39,9 +43,9 @@
         try {
             pinger.activate();
             ponger.activate();
-            ponger.registerHandler("echo", new EchoHandler());
+            ponger.registerHandler("echo", Function.identity(), MoreExecutors.directExecutor());
             byte[] payload = RandomUtils.nextBytes(100);
-            Future<byte[]> responseFuture =
+            CompletableFuture<byte[]> responseFuture =
                     pinger.sendAndReceive(
                             new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
             assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));