Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 099880d..f96bb0b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -39,6 +39,7 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -56,8 +57,6 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * A Netty based implementation of MessagingService.
@@ -69,14 +68,14 @@
     private final Endpoint localEp;
     private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
-    private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+    private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .maximumSize(100000)
             .expireAfterWrite(10, TimeUnit.SECONDS)
-            .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+            .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
                 @Override
-                public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+                public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
                     if (entry.wasEvicted()) {
-                        entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
                     }
                 }
             })
@@ -178,11 +177,10 @@
     }
 
     @Override
-    public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
-            throws IOException {
-        SettableFuture<byte[]> futureResponse = SettableFuture.create();
+    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+        CompletableFuture<byte[]> response = new CompletableFuture<>();
         Long messageId = messageIdGenerator.incrementAndGet();
-        responseFutures.put(messageId, futureResponse);
+        responseFutures.put(messageId, response);
         InternalMessage message = new InternalMessage.Builder(this)
             .withId(messageId)
             .withSender(localEp)
@@ -193,9 +191,9 @@
             sendAsync(ep, message);
         } catch (Exception e) {
             responseFutures.invalidate(messageId);
-            throw e;
+            response.completeExceptionally(e);
         }
-        return futureResponse;
+        return response;
     }
 
     @Override
@@ -333,10 +331,10 @@
         String type = message.type();
         if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
             try {
-                SettableFuture<byte[]> futureResponse =
+                CompletableFuture<byte[]> futureResponse =
                     NettyMessagingService.this.responseFutures.getIfPresent(message.id());
                 if (futureResponse != null) {
-                    futureResponse.set(message.payload());
+                    futureResponse.complete(message.payload());
                 } else {
                     log.warn("Received a reply for message id:[{}]. "
                             + " from {}. But was unable to locate the"