Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean

Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index c19dc59..9c63d84 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -20,7 +20,6 @@
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
@@ -136,32 +135,39 @@
     }
 
     @Override
-    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
+    public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
         InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
                                                       localEp,
                                                       type,
                                                       payload);
-        sendAsync(ep, message);
+        return sendAsync(ep, message);
     }
 
-    protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
-        if (ep.equals(localEp)) {
-            dispatchLocally(message);
-            return;
-        }
-        Channel channel = null;
+    protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         try {
-            try {
-                channel = channels.borrowObject(ep);
-                channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-            } finally {
-                channels.returnObject(ep, channel);
+            if (ep.equals(localEp)) {
+                dispatchLocally(message);
+                future.complete(null);
+            } else {
+                Channel channel = null;
+                try {
+                    channel = channels.borrowObject(ep);
+                    channel.writeAndFlush(message).addListener(channelFuture -> {
+                        if (!channelFuture.isSuccess()) {
+                            future.completeExceptionally(channelFuture.cause());
+                        } else {
+                            future.complete(null);
+                        }
+                    });
+                } finally {
+                    channels.returnObject(ep, channel);
+                }
             }
-        } catch (IOException e) {
-            throw e;
         } catch (Exception e) {
-            throw new IOException(e);
+            future.completeExceptionally(e);
         }
+        return future;
     }
 
     @Override
@@ -193,11 +199,11 @@
                         localEp,
                         REPLY_MESSAGE_TYPE,
                         responsePayload);
-                try {
-                    sendAsync(message.sender(), response);
-                } catch (IOException e) {
-                    log.debug("Failed to respond", e);
-                }
+                sendAsync(message.sender(), response).whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.debug("Failed to respond", error);
+                    }
+                });
             }
         }));
     }
@@ -206,17 +212,17 @@
     public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
         handlers.put(type, message -> {
             handler.apply(message.payload()).whenComplete((result, error) -> {
-            if (error == null) {
-                InternalMessage response = new InternalMessage(message.id(),
-                        localEp,
-                        REPLY_MESSAGE_TYPE,
-                        result);
-                try {
-                    sendAsync(message.sender(), response);
-                } catch (IOException e) {
-                    log.debug("Failed to respond", e);
+                if (error == null) {
+                    InternalMessage response = new InternalMessage(message.id(),
+                            localEp,
+                            REPLY_MESSAGE_TYPE,
+                            result);
+                    sendAsync(message.sender(), response).whenComplete((r, e) -> {
+                        if (e != null) {
+                            log.debug("Failed to respond", e);
+                        }
+                    });
                 }
-            }
             });
         });
     }