sendAndReceive now returns a Future instead of Reponse
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 26d835d..6e5aa89 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -5,6 +5,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
@@ -26,7 +27,6 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
@@ -34,6 +34,8 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
/**
* A Netty based implementation of MessagingService.
@@ -44,7 +46,8 @@
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
- private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
+ private final AtomicLong messageIdGenerator = new AtomicLong(0);
+ private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
@@ -119,7 +122,7 @@
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
- .withId(RandomUtils.nextLong())
+ .withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
.withType(type)
.withPayload(payload)
@@ -142,10 +145,10 @@
}
@Override
- public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
+ public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
throws IOException {
- AsyncResponse futureResponse = new AsyncResponse();
- Long messageId = RandomUtils.nextLong();
+ SettableFuture<byte[]> futureResponse = SettableFuture.create();
+ Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
@@ -267,10 +270,10 @@
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
- AsyncResponse futureResponse =
+ SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
- futureResponse.setResponse(message.payload());
+ futureResponse.set(message.payload());
} else {
log.warn("Received a reply. But was unable to locate the request handle");
}