Migrating netty messaging into netty messaging manager.

Change-Id: I971db195c9dc155cdf76850f0427ef9b9210113c
diff --git a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
similarity index 89%
rename from utils/netty/src/main/java/org/onlab/netty/DecoderState.java
rename to core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
index c439301..e113a3f 100644
--- a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging.impl;
 
 /**
  * State transitions a decoder goes through as it is decoding an incoming message.
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
similarity index 94%
rename from utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
rename to core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index 102e2a2..9deec66 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,12 +13,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onlab.netty;
-
-import org.onlab.util.ByteArraySizeHashPrinter;
-import org.onosproject.store.cluster.messaging.Endpoint;
+package org.onosproject.store.cluster.messaging.impl;
 
 import com.google.common.base.MoreObjects;
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.store.cluster.messaging.Endpoint;
 
 /**
  * Internal message representation with additional attributes
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
similarity index 90%
rename from utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
rename to core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index 1c582a1..149b706 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,21 +13,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging.impl;
 
+import com.google.common.base.Charsets;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ReplayingDecoder;
-
-import java.util.List;
-
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpAddress.Version;
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
+import java.util.List;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -54,7 +52,7 @@
     }
 
     @Override
-    @java.lang.SuppressWarnings("squid:S128") // suppress switch fall through warning
+    @SuppressWarnings("squid:S128") // suppress switch fall through warning
     protected void decode(
             ChannelHandlerContext context,
             ByteBuf buffer,
@@ -97,9 +95,9 @@
             byte[] payload = new byte[contentLength];
             buffer.readBytes(payload);
             InternalMessage message = new InternalMessage(messageId,
-                    new Endpoint(senderIp, senderPort),
-                    messageType,
-                    payload);
+                                                          new Endpoint(senderIp, senderPort),
+                                                          messageType,
+                                                          payload);
             out.add(message);
             checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
             break;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
similarity index 96%
rename from utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
rename to core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
index c74c1de..48c75dd 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,22 +13,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging.impl;
 
+import com.google.common.base.Charsets;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
-
-import java.io.IOException;
-
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpAddress.Version;
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
+import java.io.IOException;
 
 /**
  * Encode InternalMessage out into a byte buffer.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index ca6f9c1..3d4a4f7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -17,29 +17,114 @@
 
 import com.google.common.base.Strings;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.netty.NettyMessaging;
+import org.onlab.netty.InternalMessage;
+import org.onlab.netty.MessageDecoder;
+import org.onlab.netty.MessageEncoder;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyStore;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
 /**
  * Netty based MessagingService.
  */
 @Component(immediate = true, enabled = true)
 @Service
-public class NettyMessagingManager extends NettyMessaging {
+public class NettyMessagingManager implements MessagingService {
+
+    private static final short MIN_KS_LENGTH = 6;
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final short MIN_KS_LENGTH = 6;
+    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+
+    private Endpoint localEp;
+    private int preamble;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+    private final AtomicLong messageIdGenerator = new AtomicLong(0);
+    private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
+            .expireAfterWrite(10, TimeUnit.SECONDS)
+            .removalListener(new RemovalListener<Long, Callback>() {
+                @Override
+                public void onRemoval(RemovalNotification<Long, Callback> entry) {
+                    if (entry.wasEvicted()) {
+                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
+                    }
+                }
+            })
+            .build();
+
+    private final GenericKeyedObjectPool<Endpoint, Connection> channels
+            = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
+
+    private EventLoopGroup serverGroup;
+    private EventLoopGroup clientGroup;
+    private Class<? extends ServerChannel> serverChannelClass;
+    private Class<? extends Channel> clientChannelClass;
+
+    protected static final boolean TLS_DISABLED = false;
+    protected boolean enableNettyTls = TLS_DISABLED;
+
+    protected String ksLocation;
+    protected String tsLocation;
+    protected char[] ksPwd;
+    protected char[] tsPwd;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterMetadataService clusterMetadataService;
@@ -48,14 +133,32 @@
     public void activate() throws Exception {
         ControllerNode localNode = clusterMetadataService.getLocalNode();
         getTlsParameters();
-        super.start(clusterMetadataService.getClusterMetadata().getName().hashCode(),
-                    new Endpoint(localNode.ip(), localNode.tcpPort()));
+
+        if (started.get()) {
+            log.warn("Already running at local endpoint: {}", localEp);
+            return;
+        }
+        this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
+        this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
+        channels.setLifo(true);
+        channels.setTestOnBorrow(true);
+        channels.setTestOnReturn(true);
+        channels.setMinEvictableIdleTimeMillis(60_000L);
+        channels.setTimeBetweenEvictionRunsMillis(30_000L);
+        initEventLoopGroup();
+        startAcceptingConnections();
+        started.set(true);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() throws Exception {
-        super.stop();
+        if (started.get()) {
+            channels.close();
+            serverGroup.shutdownGracefully();
+            clientGroup.shutdownGracefully();
+            started.set(false);
+        }
         log.info("Stopped");
     }
 
@@ -86,4 +189,409 @@
             }
         }
     }
+    private void initEventLoopGroup() {
+        // try Epoll first and if that does work, use nio.
+        try {
+            clientGroup = new EpollEventLoopGroup();
+            serverGroup = new EpollEventLoopGroup();
+            serverChannelClass = EpollServerSocketChannel.class;
+            clientChannelClass = EpollSocketChannel.class;
+            return;
+        } catch (Throwable e) {
+            log.debug("Failed to initialize native (epoll) transport. "
+                              + "Reason: {}. Proceeding with nio.", e.getMessage());
+        }
+        clientGroup = new NioEventLoopGroup();
+        serverGroup = new NioEventLoopGroup();
+        serverChannelClass = NioServerSocketChannel.class;
+        clientChannelClass = NioSocketChannel.class;
+    }
+
+    @Override
+    public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
+        InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+                                                      localEp,
+                                                      type,
+                                                      payload);
+        return sendAsync(ep, message);
+    }
+
+    protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
+        if (ep.equals(localEp)) {
+            try {
+                dispatchLocally(message);
+            } catch (IOException e) {
+                return Tools.exceptionalFuture(e);
+            }
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            Connection connection = null;
+            try {
+                connection = channels.borrowObject(ep);
+                connection.send(message, future);
+            } finally {
+                channels.returnObject(ep, connection);
+            }
+        } catch (Exception e) {
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+        return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
+    }
+
+    @Override
+    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
+        CompletableFuture<byte[]> response = new CompletableFuture<>();
+        Callback callback = new Callback(response, executor);
+        Long messageId = messageIdGenerator.incrementAndGet();
+        callbacks.put(messageId, callback);
+        InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
+        return sendAsync(ep, message).whenComplete((r, e) -> {
+            if (e != null) {
+                callbacks.invalidate(messageId);
+            }
+        }).thenCompose(v -> response);
+    }
+
+    @Override
+    public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
+    }
+
+    @Override
+    public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> {
+            byte[] responsePayload = handler.apply(message.sender(), message.payload());
+            if (responsePayload != null) {
+                InternalMessage response = new InternalMessage(message.id(),
+                                                               localEp,
+                                                               REPLY_MESSAGE_TYPE,
+                                                               responsePayload);
+                sendAsync(message.sender(), response).whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.debug("Failed to respond", error);
+                    }
+                });
+            }
+        }));
+    }
+
+    @Override
+    public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
+        handlers.put(type, message -> {
+            handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
+                if (error == null) {
+                    InternalMessage response = new InternalMessage(message.id(),
+                                                                   localEp,
+                                                                   REPLY_MESSAGE_TYPE,
+                                                                   result);
+                    sendAsync(message.sender(), response).whenComplete((r, e) -> {
+                        if (e != null) {
+                            log.debug("Failed to respond", e);
+                        }
+                    });
+                }
+            });
+        });
+    }
+
+    @Override
+    public void unregisterHandler(String type) {
+        handlers.remove(type);
+    }
+
+    private void startAcceptingConnections() throws InterruptedException {
+        ServerBootstrap b = new ServerBootstrap();
+        b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+        b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+        b.option(ChannelOption.SO_RCVBUF, 1048576);
+        b.option(ChannelOption.TCP_NODELAY, true);
+        b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        b.group(serverGroup, clientGroup);
+        b.channel(serverChannelClass);
+        if (enableNettyTls) {
+            b.childHandler(new SslServerCommunicationChannelInitializer());
+        } else {
+            b.childHandler(new OnosCommunicationChannelInitializer());
+        }
+        b.option(ChannelOption.SO_BACKLOG, 128);
+        b.childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        // Bind and start to accept incoming connections.
+        b.bind(localEp.port()).sync().addListener(future -> {
+            if (future.isSuccess()) {
+                log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+            } else {
+                log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
+            }
+        });
+    }
+
+    private class OnosCommunicationChannelFactory
+            implements KeyedPoolableObjectFactory<Endpoint, Connection> {
+
+        @Override
+        public void activateObject(Endpoint endpoint,  Connection connection)
+                throws Exception {
+        }
+
+        @Override
+        public void destroyObject(Endpoint ep, Connection connection) throws Exception {
+            log.debug("Closing connection to {}", ep);
+            //Is this the right way to destroy?
+            connection.destroy();
+        }
+
+        @Override
+        public Connection makeObject(Endpoint ep) throws Exception {
+            Bootstrap bootstrap = new Bootstrap();
+            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+            bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
+            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+            bootstrap.group(clientGroup);
+            // TODO: Make this faster:
+            // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
+            bootstrap.channel(clientChannelClass);
+            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+            if (enableNettyTls) {
+                bootstrap.handler(new SslClientCommunicationChannelInitializer());
+            } else {
+                bootstrap.handler(new OnosCommunicationChannelInitializer());
+            }
+            // Start the client.
+            CompletableFuture<Channel> retFuture = new CompletableFuture<>();
+            ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
+
+            f.addListener(future -> {
+                if (future.isSuccess()) {
+                    retFuture.complete(f.channel());
+                } else {
+                    retFuture.completeExceptionally(future.cause());
+                }
+            });
+            log.debug("Established a new connection to {}", ep);
+            return new Connection(retFuture);
+        }
+
+        @Override
+        public void passivateObject(Endpoint ep, Connection connection)
+                throws Exception {
+        }
+
+        @Override
+        public boolean validateObject(Endpoint ep, Connection connection) {
+            return connection.validate();
+        }
+    }
+
+    private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+        private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+        private final ChannelHandler encoder = new MessageEncoder(preamble);
+
+        @Override
+        protected void initChannel(SocketChannel channel) throws Exception {
+            TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+            KeyStore ts = KeyStore.getInstance("JKS");
+            ts.load(new FileInputStream(tsLocation), tsPwd);
+            tmFactory.init(ts);
+
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+            KeyStore ks = KeyStore.getInstance("JKS");
+            ks.load(new FileInputStream(ksLocation), ksPwd);
+            kmf.init(ks, ksPwd);
+
+            SSLContext serverContext = SSLContext.getInstance("TLS");
+            serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
+
+            SSLEngine serverSslEngine = serverContext.createSSLEngine();
+
+            serverSslEngine.setNeedClientAuth(true);
+            serverSslEngine.setUseClientMode(false);
+            serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
+            serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
+            serverSslEngine.setEnableSessionCreation(true);
+
+            channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
+                    .addLast("encoder", encoder)
+                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("handler", dispatcher);
+        }
+    }
+
+    private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+        private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+        private final ChannelHandler encoder = new MessageEncoder(preamble);
+
+        @Override
+        protected void initChannel(SocketChannel channel) throws Exception {
+            TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+            KeyStore ts = KeyStore.getInstance("JKS");
+            ts.load(new FileInputStream(tsLocation), tsPwd);
+            tmFactory.init(ts);
+
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+            KeyStore ks = KeyStore.getInstance("JKS");
+            ks.load(new FileInputStream(ksLocation), ksPwd);
+            kmf.init(ks, ksPwd);
+
+            SSLContext clientContext = SSLContext.getInstance("TLS");
+            clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
+
+            SSLEngine clientSslEngine = clientContext.createSSLEngine();
+
+            clientSslEngine.setUseClientMode(true);
+            clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
+            clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
+            clientSslEngine.setEnableSessionCreation(true);
+
+            channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
+                    .addLast("encoder", encoder)
+                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("handler", dispatcher);
+        }
+    }
+
+    private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+        private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+        private final ChannelHandler encoder = new MessageEncoder(preamble);
+
+        @Override
+        protected void initChannel(SocketChannel channel) throws Exception {
+            channel.pipeline()
+                    .addLast("encoder", encoder)
+                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("handler", dispatcher);
+        }
+    }
+
+    @ChannelHandler.Sharable
+    private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
+            try {
+                dispatchLocally(message);
+            } catch (RejectedExecutionException e) {
+                log.warn("Unable to dispatch message due to {}", e.getMessage());
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+            log.error("Exception inside channel handling pipeline.", cause);
+            context.close();
+        }
+    }
+    private void dispatchLocally(InternalMessage message) throws IOException {
+        String type = message.type();
+        if (REPLY_MESSAGE_TYPE.equals(type)) {
+            try {
+                Callback callback =
+                        callbacks.getIfPresent(message.id());
+                if (callback != null) {
+                    callback.complete(message.payload());
+                } else {
+                    log.warn("Received a reply for message id:[{}]. "
+                                     + " from {}. But was unable to locate the"
+                                     + " request handle", message.id(), message.sender());
+                }
+            } finally {
+                callbacks.invalidate(message.id());
+            }
+            return;
+        }
+        Consumer<InternalMessage> handler = handlers.get(type);
+        if (handler != null) {
+            handler.accept(message);
+        } else {
+            log.debug("No handler registered for {}", type);
+        }
+    }
+
+    private final class Callback {
+        private final CompletableFuture<byte[]> future;
+        private final Executor executor;
+
+        public Callback(CompletableFuture<byte[]> future, Executor executor) {
+            this.future = future;
+            this.executor = executor;
+        }
+
+        public void complete(byte[] value) {
+            executor.execute(() -> future.complete(value));
+        }
+
+        public void completeExceptionally(Throwable error) {
+            executor.execute(() -> future.completeExceptionally(error));
+        }
+    }
+    private final class Connection {
+        private final CompletableFuture<Channel> internalFuture;
+
+        public Connection(CompletableFuture<Channel> internalFuture) {
+            this.internalFuture = internalFuture;
+        }
+
+        /**
+         * Sends a message out on its channel and associated the message with a
+         * completable future used for signaling.
+         * @param message the message to be sent
+         * @param future a future that is completed normally or exceptionally if
+         *               message sending succeeds or fails respectively
+         */
+        public void send(Object message, CompletableFuture<Void> future) {
+            internalFuture.whenComplete((channel, throwable) -> {
+                if (throwable == null) {
+                    channel.writeAndFlush(message).addListener(channelFuture -> {
+                        if (!channelFuture.isSuccess()) {
+                            future.completeExceptionally(channelFuture.cause());
+                        } else {
+                            future.complete(null);
+                        }
+                    });
+                } else {
+                    future.completeExceptionally(throwable);
+                }
+            });
+        }
+
+        /**
+         * Destroys a channel by closing its channel (if it exists) and
+         * cancelling its future.
+         */
+        public void destroy() {
+            Channel channel = internalFuture.getNow(null);
+            if (channel != null) {
+                channel.close();
+            }
+            internalFuture.cancel(false);
+        }
+
+        /**
+         * Determines whether the connection is valid meaning it is either
+         * complete with and active channel
+         * or it has not yet completed.
+         * @return true if the channel has an active connection or has not
+         * yet completed
+         */
+        public boolean validate() {
+            if (internalFuture.isCompletedExceptionally()) {
+                return false;
+            }
+            Channel channel = internalFuture.getNow(null);
+            return channel == null || channel.isActive();
+        }
+    }
 }
diff --git a/utils/netty/src/test/java/org/onlab/netty/NettyMessagingTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
similarity index 60%
rename from utils/netty/src/test/java/org/onlab/netty/NettyMessagingTest.java
rename to core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
index f7737d2..d04c777 100644
--- a/utils/netty/src/test/java/org/onlab/netty/NettyMessagingTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
@@ -1,4 +1,4 @@
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging.impl;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
@@ -9,10 +9,17 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataEventListener;
+import org.onosproject.cluster.ClusterMetadataService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.provider.ProviderId;
 import org.onosproject.store.cluster.messaging.Endpoint;
 
 import com.google.common.util.concurrent.MoreExecutors;
@@ -24,34 +31,39 @@
 /**
  * Unit tests for NettyMessaging.
  */
-public class NettyMessagingTest {
+public class NettyMessagingManagerTest {
 
-    NettyMessaging netty1;
-    NettyMessaging netty2;
+    NettyMessagingManager netty1;
+    NettyMessagingManager netty2;
 
-    Endpoint ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5001);
-    Endpoint ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5002);
-    Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5003);
+    private static final String DUMMY_NAME = "node";
+    private static final String IP_STRING = "127.0.0.1";
+
+    Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
+    Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
+    Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
 
     @Before
     public void setUp() throws Exception {
         ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
-        netty1 = new NettyMessaging();
-        netty1.start(12, ep1);
+        netty1 = new NettyMessagingManager();
+        netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
+        netty1.activate();
 
         ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
-        netty2 = new NettyMessaging();
-        netty2.start(12, ep2);
+        netty2 = new NettyMessagingManager();
+        netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
+        netty2.activate();
     }
 
     @After
     public void tearDown() throws Exception {
         if (netty1 != null) {
-            netty1.stop();
+            netty1.deactivate();
         }
 
         if (netty2 != null) {
-            netty2.stop();
+            netty2.deactivate();
         }
     }
 
@@ -113,9 +125,9 @@
         netty2.registerHandler("test-subject", handler, handlerExecutor);
 
         CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
-                "test-subject",
-                "hello world".getBytes(),
-                completionExecutor);
+                                                                   "test-subject",
+                                                                   "hello world".getBytes(),
+                                                                   completionExecutor);
         response.whenComplete((r, e) -> {
             completionThreadName.set(Thread.currentThread().getName());
         });
@@ -125,4 +137,40 @@
         assertEquals("completion-thread", completionThreadName.get());
         assertEquals("handler-thread", handlerThreadName.get());
     }
-}
+
+    private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
+        return new ClusterMetadataService() {
+            @Override
+            public ClusterMetadata getClusterMetadata() {
+                return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
+                                           name, Sets.newHashSet(), Sets.newHashSet());
+            }
+
+            @Override
+            public ControllerNode getLocalNode() {
+                return new ControllerNode() {
+                    @Override
+                    public NodeId id() {
+                        return null;
+                    }
+
+                    @Override
+                    public IpAddress ip() {
+                        return IpAddress.valueOf(ipAddress);
+                    }
+
+                    @Override
+                    public int tcpPort() {
+                        return ep.port();
+                    }
+                };
+            }
+
+            @Override
+            public void addListener(ClusterMetadataEventListener listener) {}
+
+            @Override
+            public void removeListener(ClusterMetadataEventListener listener) {}
+        };
+    }
+}
\ No newline at end of file
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
deleted file mode 100644
index e4ec42e..0000000
--- a/utils/netty/pom.xml
+++ /dev/null
@@ -1,83 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Copyright 2014 Open Networking Laboratory
-  ~
-  ~ Licensed under the Apache License, Version 2.0 (the "License");
-  ~ you may not use this file except in compliance with the License.
-  ~ You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.onosproject</groupId>
-        <artifactId>onlab-utils</artifactId>
-        <version>1.5.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>onlab-netty</artifactId>
-    <packaging>bundle</packaging>
-
-    <description>Network I/O using Netty framework</description>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava-testlib</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onlab-misc</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onlab-junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>commons-pool</groupId>
-            <artifactId>commons-pool</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-common</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-buffer</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-transport</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-handler</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-codec</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-transport-native-epoll</artifactId>
-          <version>${netty4.version}</version>
-        </dependency>
-    </dependencies>
-</project>
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
deleted file mode 100644
index 8b003f0..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onlab.util.Tools;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.TrustManagerFactory;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.KeyStore;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-
-/**
- * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
- */
-public class NettyMessaging implements MessagingService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
-
-    private Endpoint localEp;
-    private int preamble;
-    private final AtomicBoolean started = new AtomicBoolean(false);
-    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
-    private final AtomicLong messageIdGenerator = new AtomicLong(0);
-    private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
-            .expireAfterWrite(10, TimeUnit.SECONDS)
-            .removalListener(new RemovalListener<Long, Callback>() {
-                @Override
-                public void onRemoval(RemovalNotification<Long, Callback> entry) {
-                    if (entry.wasEvicted()) {
-                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
-                    }
-                }
-            })
-            .build();
-
-    private final GenericKeyedObjectPool<Endpoint, Connection> channels
-            = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
-
-    private EventLoopGroup serverGroup;
-    private EventLoopGroup clientGroup;
-    private Class<? extends ServerChannel> serverChannelClass;
-    private Class<? extends Channel> clientChannelClass;
-
-    protected static final boolean TLS_DISABLED = false;
-    protected boolean enableNettyTls = TLS_DISABLED;
-
-    protected String ksLocation;
-    protected String tsLocation;
-    protected char[] ksPwd;
-    protected char[] tsPwd;
-
-    @SuppressWarnings("squid:S1181")
-    // We really need to catch Throwable due to netty native epoll() handling
-    private void initEventLoopGroup() {
-        // try Epoll first and if that does work, use nio.
-        try {
-            clientGroup = new EpollEventLoopGroup();
-            serverGroup = new EpollEventLoopGroup();
-            serverChannelClass = EpollServerSocketChannel.class;
-            clientChannelClass = EpollSocketChannel.class;
-            return;
-        } catch (Throwable e) {
-            log.debug("Failed to initialize native (epoll) transport. "
-                    + "Reason: {}. Proceeding with nio.", e.getMessage());
-        }
-        clientGroup = new NioEventLoopGroup();
-        serverGroup = new NioEventLoopGroup();
-        serverChannelClass = NioServerSocketChannel.class;
-        clientChannelClass = NioSocketChannel.class;
-    }
-
-    public void start(int preamble, Endpoint localEp) throws Exception {
-        if (started.get()) {
-            log.warn("Already running at local endpoint: {}", localEp);
-            return;
-        }
-        this.preamble = preamble;
-        this.localEp = localEp;
-        channels.setLifo(true);
-        channels.setTestOnBorrow(true);
-        channels.setTestOnReturn(true);
-        channels.setMinEvictableIdleTimeMillis(60_000L);
-        channels.setTimeBetweenEvictionRunsMillis(30_000L);
-        initEventLoopGroup();
-        startAcceptingConnections();
-        started.set(true);
-    }
-
-    public void stop() throws Exception {
-        if (started.get()) {
-            channels.close();
-            serverGroup.shutdownGracefully();
-            clientGroup.shutdownGracefully();
-            started.set(false);
-        }
-    }
-
-    @Override
-    public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
-        InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
-                                                      localEp,
-                                                      type,
-                                                      payload);
-        return sendAsync(ep, message);
-    }
-
-    protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
-        if (ep.equals(localEp)) {
-            try {
-                dispatchLocally(message);
-            } catch (IOException e) {
-                return Tools.exceptionalFuture(e);
-            }
-            return CompletableFuture.completedFuture(null);
-        }
-
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        try {
-            Connection connection = null;
-            try {
-                connection = channels.borrowObject(ep);
-                connection.send(message, future);
-
-            } finally {
-                channels.returnObject(ep, connection);
-            }
-        } catch (Exception e) {
-            future.completeExceptionally(e);
-        }
-        return future;
-    }
-
-    @Override
-    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
-        return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
-    }
-
-    @Override
-    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
-        CompletableFuture<byte[]> response = new CompletableFuture<>();
-        Callback callback = new Callback(response, executor);
-        Long messageId = messageIdGenerator.incrementAndGet();
-        callbacks.put(messageId, callback);
-        InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
-        return sendAsync(ep, message).whenComplete((r, e) -> {
-            if (e != null) {
-                callbacks.invalidate(messageId);
-            }
-        }).thenCompose(v -> response);
-    }
-
-    @Override
-    public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
-        handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
-    }
-
-    @Override
-    public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
-        handlers.put(type, message -> executor.execute(() -> {
-            byte[] responsePayload = handler.apply(message.sender(), message.payload());
-            if (responsePayload != null) {
-                InternalMessage response = new InternalMessage(message.id(),
-                        localEp,
-                        REPLY_MESSAGE_TYPE,
-                        responsePayload);
-                sendAsync(message.sender(), response).whenComplete((result, error) -> {
-                    if (error != null) {
-                        log.debug("Failed to respond", error);
-                    }
-                });
-            }
-        }));
-    }
-
-    @Override
-    public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
-        handlers.put(type, message -> {
-            handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
-                if (error == null) {
-                    InternalMessage response = new InternalMessage(message.id(),
-                                                                   localEp,
-                                                                   REPLY_MESSAGE_TYPE,
-                                                                   result);
-                    sendAsync(message.sender(), response).whenComplete((r, e) -> {
-                        if (e != null) {
-                            log.debug("Failed to respond", e);
-                        }
-                    });
-                }
-            });
-        });
-    }
-
-    @Override
-    public void unregisterHandler(String type) {
-        handlers.remove(type);
-    }
-
-    private void startAcceptingConnections() throws InterruptedException {
-        ServerBootstrap b = new ServerBootstrap();
-        b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
-        b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
-        b.option(ChannelOption.SO_RCVBUF, 1048576);
-        b.option(ChannelOption.TCP_NODELAY, true);
-        b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-        b.group(serverGroup, clientGroup);
-        b.channel(serverChannelClass);
-        if (enableNettyTls) {
-            b.childHandler(new SslServerCommunicationChannelInitializer());
-        } else {
-            b.childHandler(new OnosCommunicationChannelInitializer());
-        }
-        b.option(ChannelOption.SO_BACKLOG, 128);
-        b.childOption(ChannelOption.SO_KEEPALIVE, true);
-
-        // Bind and start to accept incoming connections.
-        b.bind(localEp.port()).sync().addListener(future -> {
-            if (future.isSuccess()) {
-                log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
-            } else {
-                log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
-            }
-        });
-    }
-
-    private class OnosCommunicationChannelFactory
-        implements KeyedPoolableObjectFactory<Endpoint, Connection> {
-
-        @Override
-        public void activateObject(Endpoint endpoint,  Connection connection)
-                throws Exception {
-        }
-
-        @Override
-        public void destroyObject(Endpoint ep, Connection connection) throws Exception {
-            log.debug("Closing connection to {}", ep);
-            //Is this the right way to destroy?
-            connection.destroy();
-        }
-
-        @Override
-        public Connection makeObject(Endpoint ep) throws Exception {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
-            bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
-            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
-            bootstrap.group(clientGroup);
-            // TODO: Make this faster:
-            // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
-            bootstrap.channel(clientChannelClass);
-            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
-            if (enableNettyTls) {
-                bootstrap.handler(new SslClientCommunicationChannelInitializer());
-            } else {
-                bootstrap.handler(new OnosCommunicationChannelInitializer());
-            }
-            // Start the client.
-            CompletableFuture<Channel> retFuture = new CompletableFuture<>();
-            ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
-
-            f.addListener(future -> {
-                        if (future.isSuccess()) {
-                            retFuture.complete(f.channel());
-                        } else {
-                            retFuture.completeExceptionally(future.cause());
-                        }
-                    });
-            log.debug("Established a new connection to {}", ep);
-            return new Connection(retFuture);
-        }
-
-        @Override
-        public void passivateObject(Endpoint ep, Connection connection)
-                throws Exception {
-        }
-
-        @Override
-        public boolean validateObject(Endpoint ep, Connection connection) {
-            return connection.validate();
-        }
-    }
-
-    private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-        private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
-
-        @Override
-        protected void initChannel(SocketChannel channel) throws Exception {
-            TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-            KeyStore ts = KeyStore.getInstance("JKS");
-            ts.load(new FileInputStream(tsLocation), tsPwd);
-            tmFactory.init(ts);
-
-            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-            KeyStore ks = KeyStore.getInstance("JKS");
-            ks.load(new FileInputStream(ksLocation), ksPwd);
-            kmf.init(ks, ksPwd);
-
-            SSLContext serverContext = SSLContext.getInstance("TLS");
-            serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
-
-            SSLEngine serverSslEngine = serverContext.createSSLEngine();
-
-            serverSslEngine.setNeedClientAuth(true);
-            serverSslEngine.setUseClientMode(false);
-            serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
-            serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
-            serverSslEngine.setEnableSessionCreation(true);
-
-            channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
-                    .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder(preamble))
-                    .addLast("handler", dispatcher);
-        }
-
-    }
-
-    private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-        private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
-
-        @Override
-        protected void initChannel(SocketChannel channel) throws Exception {
-            TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-            KeyStore ts = KeyStore.getInstance("JKS");
-            ts.load(new FileInputStream(tsLocation), tsPwd);
-            tmFactory.init(ts);
-
-            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-            KeyStore ks = KeyStore.getInstance("JKS");
-            ks.load(new FileInputStream(ksLocation), ksPwd);
-            kmf.init(ks, ksPwd);
-
-            SSLContext clientContext = SSLContext.getInstance("TLS");
-            clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
-
-            SSLEngine clientSslEngine = clientContext.createSSLEngine();
-
-            clientSslEngine.setUseClientMode(true);
-            clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
-            clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
-            clientSslEngine.setEnableSessionCreation(true);
-
-            channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
-                    .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder(preamble))
-                    .addLast("handler", dispatcher);
-        }
-
-    }
-
-    private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-        private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
-
-        @Override
-        protected void initChannel(SocketChannel channel) throws Exception {
-                channel.pipeline()
-                        .addLast("encoder", encoder)
-                        .addLast("decoder", new MessageDecoder(preamble))
-                        .addLast("handler", dispatcher);
-        }
-    }
-
-    @ChannelHandler.Sharable
-    private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
-
-        @Override
-        protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
-            try {
-                dispatchLocally(message);
-            } catch (RejectedExecutionException e) {
-                log.warn("Unable to dispatch message due to {}", e.getMessage());
-            }
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
-            log.error("Exception inside channel handling pipeline.", cause);
-            context.close();
-        }
-    }
-    private void dispatchLocally(InternalMessage message) throws IOException {
-        String type = message.type();
-        if (REPLY_MESSAGE_TYPE.equals(type)) {
-            try {
-                Callback callback =
-                    callbacks.getIfPresent(message.id());
-                if (callback != null) {
-                    callback.complete(message.payload());
-                } else {
-                    log.warn("Received a reply for message id:[{}]. "
-                            + " from {}. But was unable to locate the"
-                            + " request handle", message.id(), message.sender());
-                }
-            } finally {
-                callbacks.invalidate(message.id());
-            }
-            return;
-        }
-        Consumer<InternalMessage> handler = handlers.get(type);
-        if (handler != null) {
-            handler.accept(message);
-        } else {
-            log.debug("No handler registered for {}", type);
-        }
-    }
-
-    private final class Callback {
-        private final CompletableFuture<byte[]> future;
-        private final Executor executor;
-
-        public Callback(CompletableFuture<byte[]> future, Executor executor) {
-            this.future = future;
-            this.executor = executor;
-        }
-
-        public void complete(byte[] value) {
-            executor.execute(() -> future.complete(value));
-        }
-
-        public void completeExceptionally(Throwable error) {
-            executor.execute(() -> future.completeExceptionally(error));
-        }
-    }
-    private final class Connection {
-        private final CompletableFuture<Channel> internalFuture;
-
-        public Connection(CompletableFuture<Channel> internalFuture) {
-            this.internalFuture = internalFuture;
-        }
-
-        /**
-         * Sends a message out on its channel and associated the message with a
-         * completable future used for signaling.
-         * @param message the message to be sent
-         * @param future a future that is completed normally or exceptionally if
-         *               message sending succeeds or fails respectively
-         */
-        public void send(Object message, CompletableFuture<Void> future) {
-            internalFuture.whenComplete((channel, throwable) -> {
-                if (throwable == null) {
-                    channel.writeAndFlush(message).addListener(channelFuture -> {
-                        if (!channelFuture.isSuccess()) {
-                            future.completeExceptionally(channelFuture.cause());
-                        } else {
-                            future.complete(null);
-                        }
-                    });
-                } else {
-                    future.completeExceptionally(throwable);
-                }
-
-            });
-        }
-
-        /**
-         * Destroys a channel by closing its channel (if it exists) and
-         * cancelling its future.
-         */
-        public void destroy() {
-            Channel channel = internalFuture.getNow(null);
-            if (channel != null) {
-                channel.close();
-            }
-            internalFuture.cancel(false);
-        }
-
-        /**
-         * Determines whether the connection is valid meaning it is either
-         * complete with and active channel
-         * or it has not yet completed.
-         * @return true if the channel has an active connection or has not
-         * yet completed
-         */
-        public boolean validate() {
-            if (internalFuture.isCompletedExceptionally()) {
-                return false;
-            }
-            Channel channel = internalFuture.getNow(null);
-            return channel == null || channel.isActive();
-        }
-    }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/package-info.java b/utils/netty/src/main/java/org/onlab/netty/package-info.java
deleted file mode 100644
index 2d6257f..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Asynchronous messaging APIs implemented using the Netty framework.
- */
-package org.onlab.netty;
diff --git a/utils/pom.xml b/utils/pom.xml
index a407052..6c27a7d 100644
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -34,7 +34,6 @@
     <modules>
         <module>junit</module>
         <module>misc</module>
-        <module>netty</module>
         <module>nio</module>
         <module>yangutils</module>
         <module>osgi</module>