Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 01dda9e..62330fb 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -15,10 +15,10 @@
  */
 package org.onlab.util;
 
-import com.google.common.base.Strings;
-import com.google.common.primitives.UnsignedLongs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
+import static java.nio.file.Files.delete;
+import static java.nio.file.Files.walkFileTree;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -37,12 +37,17 @@
 import java.util.Collection;
 import java.util.Dictionary;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-import static java.nio.file.Files.delete;
-import static java.nio.file.Files.walkFileTree;
-import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
-import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
+
+import com.google.common.base.Strings;
+import com.google.common.primitives.UnsignedLongs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Miscellaneous utility methods.
@@ -324,6 +329,51 @@
                                          dst.getAbsolutePath()));
     }
 
+    /**
+     * Returns the future value when complete or if future
+     * completes exceptionally returns the defaultValue.
+     * @param future future
+     * @param defaultValue default value
+     * @param <T> future value type
+     * @return future value when complete or if future
+     * completes exceptionally returns the defaultValue.
+     */
+    public static <T> T futureGetOrElse(Future<T> future, T defaultValue) {
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return defaultValue;
+        } catch (ExecutionException e) {
+            return defaultValue;
+        }
+    }
+
+    /**
+     * Returns the future value when complete or if future
+     * completes exceptionally returns the defaultValue.
+     * @param future future
+     * @param timeout time to wait for successful completion
+     * @param timeUnit time unit
+     * @param defaultValue default value
+     * @param <T> future value type
+     * @return future value when complete or if future
+     * completes exceptionally returns the defaultValue.
+     */
+    public static <T> T futureGetOrElse(Future<T> future,
+                                        long timeout,
+                                        TimeUnit timeUnit,
+                                        T defaultValue) {
+        try {
+            return future.get(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return defaultValue;
+        } catch (ExecutionException | TimeoutException e) {
+            return defaultValue;
+        }
+    }
+
     // Auxiliary path visitor for recursive directory structure copying.
     private static class DirectoryCopier extends SimpleFileVisitor<Path> {
         private Path src;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index 12e1d87..d7823d4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -16,10 +16,9 @@
 package org.onlab.netty;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 /**
  * Interface for low level messaging primitives.
  */
@@ -40,9 +39,8 @@
      * @param type type of message.
      * @param payload message payload.
      * @return a response future
-     * @throws IOException when I/O exception of some sort has occurred
      */
-    public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
+    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
 
     /**
      * Registers a new message handler for message type.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 099880d..f96bb0b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -39,6 +39,7 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -56,8 +57,6 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * A Netty based implementation of MessagingService.
@@ -69,14 +68,14 @@
     private final Endpoint localEp;
     private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
-    private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+    private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .maximumSize(100000)
             .expireAfterWrite(10, TimeUnit.SECONDS)
-            .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+            .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
                 @Override
-                public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+                public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
                     if (entry.wasEvicted()) {
-                        entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
                     }
                 }
             })
@@ -178,11 +177,10 @@
     }
 
     @Override
-    public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
-            throws IOException {
-        SettableFuture<byte[]> futureResponse = SettableFuture.create();
+    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+        CompletableFuture<byte[]> response = new CompletableFuture<>();
         Long messageId = messageIdGenerator.incrementAndGet();
-        responseFutures.put(messageId, futureResponse);
+        responseFutures.put(messageId, response);
         InternalMessage message = new InternalMessage.Builder(this)
             .withId(messageId)
             .withSender(localEp)
@@ -193,9 +191,9 @@
             sendAsync(ep, message);
         } catch (Exception e) {
             responseFutures.invalidate(messageId);
-            throw e;
+            response.completeExceptionally(e);
         }
-        return futureResponse;
+        return response;
     }
 
     @Override
@@ -333,10 +331,10 @@
         String type = message.type();
         if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
             try {
-                SettableFuture<byte[]> futureResponse =
+                CompletableFuture<byte[]> futureResponse =
                     NettyMessagingService.this.responseFutures.getIfPresent(message.id());
                 if (futureResponse != null) {
-                    futureResponse.set(message.payload());
+                    futureResponse.complete(message.payload());
                 } else {
                     log.warn("Received a reply for message id:[{}]. "
                             + " from {}. But was unable to locate the"