Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index 12e1d87..d7823d4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -16,10 +16,9 @@
package org.onlab.netty;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* Interface for low level messaging primitives.
*/
@@ -40,9 +39,8 @@
* @param type type of message.
* @param payload message payload.
* @return a response future
- * @throws IOException when I/O exception of some sort has occurred
*/
- public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
/**
* Registers a new message handler for message type.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 099880d..f96bb0b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -56,8 +57,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
/**
* A Netty based implementation of MessagingService.
@@ -69,14 +68,14 @@
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+ private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterWrite(10, TimeUnit.SECONDS)
- .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+ .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
@Override
- public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+ public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
if (entry.wasEvicted()) {
- entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+ entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
}
}
})
@@ -178,11 +177,10 @@
}
@Override
- public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
- throws IOException {
- SettableFuture<byte[]> futureResponse = SettableFuture.create();
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+ CompletableFuture<byte[]> response = new CompletableFuture<>();
Long messageId = messageIdGenerator.incrementAndGet();
- responseFutures.put(messageId, futureResponse);
+ responseFutures.put(messageId, response);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
@@ -193,9 +191,9 @@
sendAsync(ep, message);
} catch (Exception e) {
responseFutures.invalidate(messageId);
- throw e;
+ response.completeExceptionally(e);
}
- return futureResponse;
+ return response;
}
@Override
@@ -333,10 +331,10 @@
String type = message.type();
if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
try {
- SettableFuture<byte[]> futureResponse =
+ CompletableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
- futureResponse.set(message.payload());
+ futureResponse.complete(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"