Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 01dda9e..62330fb 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -15,10 +15,10 @@
*/
package org.onlab.util;
-import com.google.common.base.Strings;
-import com.google.common.primitives.UnsignedLongs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
+import static java.nio.file.Files.delete;
+import static java.nio.file.Files.walkFileTree;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.BufferedReader;
import java.io.File;
@@ -37,12 +37,17 @@
import java.util.Collection;
import java.util.Dictionary;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-import static java.nio.file.Files.delete;
-import static java.nio.file.Files.walkFileTree;
-import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
-import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
+
+import com.google.common.base.Strings;
+import com.google.common.primitives.UnsignedLongs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Miscellaneous utility methods.
@@ -324,6 +329,51 @@
dst.getAbsolutePath()));
}
+ /**
+ * Returns the future value when complete or if future
+ * completes exceptionally returns the defaultValue.
+ * @param future future
+ * @param defaultValue default value
+ * @param <T> future value type
+ * @return future value when complete or if future
+ * completes exceptionally returns the defaultValue.
+ */
+ public static <T> T futureGetOrElse(Future<T> future, T defaultValue) {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return defaultValue;
+ } catch (ExecutionException e) {
+ return defaultValue;
+ }
+ }
+
+ /**
+ * Returns the future value when complete or if future
+ * completes exceptionally returns the defaultValue.
+ * @param future future
+ * @param timeout time to wait for successful completion
+ * @param timeUnit time unit
+ * @param defaultValue default value
+ * @param <T> future value type
+ * @return future value when complete or if future
+ * completes exceptionally returns the defaultValue.
+ */
+ public static <T> T futureGetOrElse(Future<T> future,
+ long timeout,
+ TimeUnit timeUnit,
+ T defaultValue) {
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return defaultValue;
+ } catch (ExecutionException | TimeoutException e) {
+ return defaultValue;
+ }
+ }
+
// Auxiliary path visitor for recursive directory structure copying.
private static class DirectoryCopier extends SimpleFileVisitor<Path> {
private Path src;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index 12e1d87..d7823d4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -16,10 +16,9 @@
package org.onlab.netty;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* Interface for low level messaging primitives.
*/
@@ -40,9 +39,8 @@
* @param type type of message.
* @param payload message payload.
* @return a response future
- * @throws IOException when I/O exception of some sort has occurred
*/
- public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
/**
* Registers a new message handler for message type.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 099880d..f96bb0b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -56,8 +57,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
/**
* A Netty based implementation of MessagingService.
@@ -69,14 +68,14 @@
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+ private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterWrite(10, TimeUnit.SECONDS)
- .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+ .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
@Override
- public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+ public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
if (entry.wasEvicted()) {
- entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+ entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
}
}
})
@@ -178,11 +177,10 @@
}
@Override
- public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
- throws IOException {
- SettableFuture<byte[]> futureResponse = SettableFuture.create();
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+ CompletableFuture<byte[]> response = new CompletableFuture<>();
Long messageId = messageIdGenerator.incrementAndGet();
- responseFutures.put(messageId, futureResponse);
+ responseFutures.put(messageId, response);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
@@ -193,9 +191,9 @@
sendAsync(ep, message);
} catch (Exception e) {
responseFutures.invalidate(messageId);
- throw e;
+ response.completeExceptionally(e);
}
- return futureResponse;
+ return response;
}
@Override
@@ -333,10 +331,10 @@
String type = message.type();
if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
try {
- SettableFuture<byte[]> futureResponse =
+ CompletableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
- futureResponse.set(message.payload());
+ futureResponse.complete(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"