Add a close listener for Client Connections

This is a part of [ONOS-6075]

Also
 - Add some more logging
 - Change depericated netty options to the new option

Change-Id: I8abb87b53e9c3d3c706af2c3b9da1e54d55e9b07
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 2e8682b..e93e63b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -34,6 +34,7 @@
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.epoll.EpollSocketChannel;
@@ -324,8 +325,8 @@
 
     private void startAcceptingConnections() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
-        b.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
-        b.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+        b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+                      new WriteBufferWaterMark(8 * 1024, 32 * 1024));
         b.option(ChannelOption.SO_RCVBUF, 1048576);
         b.option(ChannelOption.TCP_NODELAY, true);
         b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
@@ -344,7 +345,7 @@
             if (future.isSuccess()) {
                 log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
             } else {
-                log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
+                log.warn("{} failed to bind to port {} due to {}", localEp.host(), localEp.port(), future.cause());
             }
         });
     }
@@ -353,13 +354,13 @@
             implements KeyedPoolableObjectFactory<Endpoint, Connection> {
 
         @Override
-        public void activateObject(Endpoint endpoint,  Connection connection)
+        public void activateObject(Endpoint endpoint, Connection connection)
                 throws Exception {
         }
 
         @Override
         public void destroyObject(Endpoint ep, Connection connection) throws Exception {
-            log.debug("Closing connection to {}", ep);
+            log.debug("Closing connection {} to {}", connection, ep);
             //Is this the right way to destroy?
             connection.destroy();
         }
@@ -368,8 +369,8 @@
         public Connection makeObject(Endpoint ep) throws Exception {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
+                          new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
             bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
             bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
             bootstrap.group(clientGroup);
@@ -561,7 +562,7 @@
         if (handler != null) {
             handler.accept(message);
         } else {
-            log.debug("No handler for message type {}", message.type(), message.sender());
+            log.debug("No handler for message type {} from {}", message.type(), message.sender());
             sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
         }
     }