sendAndReceive now returns a Future instead of Reponse
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
deleted file mode 100644
index 1772a3c..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.onlab.netty;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * An asynchronous response.
- * This class provides a base implementation of Response, with methods to retrieve the
- * result and query to see if the result is ready. The result can only be retrieved when
- * it is ready and the get methods will block if the result is not ready yet.
- */
-public class AsyncResponse implements Response {
-
- private byte[] value;
- private boolean done = false;
- private final long start = System.nanoTime();
-
- @Override
- public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
- timeout = timeUnit.toNanos(timeout);
- boolean interrupted = false;
- try {
- synchronized (this) {
- while (!done) {
- try {
- long timeRemaining = timeout - (System.nanoTime() - start);
- if (timeRemaining <= 0) {
- throw new TimeoutException("Operation timed out.");
- }
- TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- return value;
- }
-
- @Override
- public byte[] get() throws InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isReady() {
- return done;
- }
-
- /**
- * Sets response value and unblocks any thread blocking on the response to become
- * available.
- * @param data response data.
- */
- public synchronized void setResponse(byte[] data) {
- if (!done) {
- done = true;
- value = data;
- this.notifyAll();
- }
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index 08676ac..bf93331 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -2,6 +2,8 @@
import java.io.IOException;
+import com.google.common.util.concurrent.ListenableFuture;
+
/**
* Interface for low level messaging primitives.
*/
@@ -24,7 +26,7 @@
* @return a response future
* @throws IOException
*/
- public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
+ public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Registers a new message handler for message type.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 26d835d..6e5aa89 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -5,6 +5,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
@@ -26,7 +27,6 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
@@ -34,6 +34,8 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
/**
* A Netty based implementation of MessagingService.
@@ -44,7 +46,8 @@
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
- private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
+ private final AtomicLong messageIdGenerator = new AtomicLong(0);
+ private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
@@ -119,7 +122,7 @@
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
- .withId(RandomUtils.nextLong())
+ .withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
.withType(type)
.withPayload(payload)
@@ -142,10 +145,10 @@
}
@Override
- public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
+ public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
throws IOException {
- AsyncResponse futureResponse = new AsyncResponse();
- Long messageId = RandomUtils.nextLong();
+ SettableFuture<byte[]> futureResponse = SettableFuture.create();
+ Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
@@ -267,10 +270,10 @@
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
- AsyncResponse futureResponse =
+ SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
- futureResponse.setResponse(message.payload());
+ futureResponse.set(message.payload());
} else {
log.warn("Received a reply. But was unable to locate the request handle");
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java
deleted file mode 100644
index 150755e..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/Response.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.onlab.netty;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Response object returned when making synchronous requests.
- * Can you used to check is a response is ready and/or wait for a response
- * to become available.
- */
-public interface Response {
-
- /**
- * Gets the response waiting for a designated timeout period.
- * @param timeout timeout period (since request was sent out)
- * @param tu unit of time.
- * @return response payload
- * @throws TimeoutException if the timeout expires before the response arrives.
- */
- public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
-
- /**
- * Gets the response waiting for indefinite timeout period.
- * @return response payload
- * @throws InterruptedException if the thread is interrupted before the response arrives.
- */
- public byte[] get() throws InterruptedException;
-
- /**
- * Checks if the response is ready without blocking.
- * @return true if response is ready, false otherwise.
- */
- public boolean isReady();
-}