sendAndReceive now returns a Future instead of Reponse
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
index 6f77924..a14c568 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java
@@ -4,6 +4,7 @@
import java.io.IOException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -12,7 +13,6 @@
import org.onlab.metrics.MetricsManager;
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
-import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,10 +74,10 @@
for (int i = 0; i < warmup; i++) {
messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
- Response response = messaging
+ Future<byte[]> responseFuture = messaging
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
- response.get(100000, TimeUnit.MILLISECONDS);
+ responseFuture.get(100000, TimeUnit.MILLISECONDS);
}
log.info("measuring round-trip send & receive");
@@ -85,13 +85,13 @@
int timeouts = 0;
for (int i = 0; i < iterations; i++) {
- Response response;
+ Future<byte[]> responseFuture;
Timer.Context context = sendAndReceiveTimer.time();
try {
- response = messaging
+ responseFuture = messaging
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
- response.get(10000, TimeUnit.MILLISECONDS);
+ responseFuture.get(10000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
timeouts++;
log.info("timeout:" + timeouts + " at iteration:" + i);
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index 6fc150c..2cff64a 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -5,6 +5,8 @@
import org.onlab.onos.cluster.NodeId;
+import com.google.common.util.concurrent.ListenableFuture;
+
// TODO: remove IOExceptions?
/**
* Service for assisting communications between controller cluster nodes.
@@ -40,10 +42,10 @@
* Sends a message synchronously.
* @param message message to send
* @param toNodeId recipient node identifier
- * @return ClusterMessageResponse which is reply future.
+ * @return reply future.
* @throws IOException
*/
- ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
+ ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
/**
* Adds a new subscriber for the specified message subject.
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
deleted file mode 100644
index d2a0039..0000000
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.onlab.onos.store.cluster.messaging;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.onlab.onos.cluster.NodeId;
-
-public interface ClusterMessageResponse extends Future<byte[]> {
-
- public NodeId sender();
-
- // TODO InterruptedException, ExecutionException removed from original
- // Future declaration. Revisit if we ever need those.
- @Override
- public byte[] get(long timeout, TimeUnit unit) throws TimeoutException;
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index b2f679c..d8e5fab 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,9 +4,7 @@
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -20,7 +18,6 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
@@ -32,10 +29,11 @@
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
-import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ListenableFuture;
+
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
@@ -133,14 +131,12 @@
}
@Override
- public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- Response responseFuture =
- messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
- return new InternalClusterMessageResponse(toNodeId, responseFuture);
+ return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
} catch (IOException e) {
log.error("Failed interaction with remote nodeId: " + toNodeId, e);
@@ -188,60 +184,4 @@
rawMessage.respond(response);
}
}
-
- private static final class InternalClusterMessageResponse
- implements ClusterMessageResponse {
-
- private final NodeId sender;
- private final Response responseFuture;
- private volatile boolean isCancelled = false;
- private volatile boolean isDone = false;
-
- public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
- this.sender = sender;
- this.responseFuture = responseFuture;
- }
- @Override
- public NodeId sender() {
- return sender;
- }
-
- @Override
- public byte[] get(long timeout, TimeUnit timeunit)
- throws TimeoutException {
- final byte[] result = responseFuture.get(timeout, timeunit);
- isDone = true;
- return result;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (isDone()) {
- return false;
- }
- // doing nothing for now
- // when onlab.netty Response support cancel, call them.
- isCancelled = true;
- return true;
- }
-
- @Override
- public boolean isCancelled() {
- return isCancelled;
- }
-
- @Override
- public boolean isDone() {
- return this.isDone || isCancelled();
- }
-
- @Override
- public byte[] get() throws InterruptedException, ExecutionException {
- // TODO: consider forbidding this call and force the use of timed get
- // to enforce handling of remote peer failure scenario
- final byte[] result = responseFuture.get();
- isDone = true;
- return result;
- }
- }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index a737868..dbd2688 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -49,7 +50,6 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
@@ -57,6 +57,7 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.google.common.base.Function;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -213,9 +214,9 @@
SERIALIZER.encode(rule));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
@@ -247,9 +248,9 @@
SERIALIZER.encode(deviceId));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
@@ -291,14 +292,17 @@
SERIALIZER.encode(operation));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
+ ListenableFuture<byte[]> responseFuture =
+ clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
+ @Override
+ public CompletedBatchOperation apply(byte[] input) {
+ return SERIALIZER.decode(input);
+ }
+ });
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
}
-
- return null;
}
private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
index 273e3cc..7106aef 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -4,6 +4,7 @@
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -21,7 +22,6 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.serializers.KryoNamespaces;
@@ -34,6 +34,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -184,11 +186,11 @@
SERIALIZER.encode(connectPoint));
try {
- ClusterMessageResponse response =
+ Future<byte[]> response =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a StatsStoreException
throw new RuntimeException(e);
}
@@ -212,11 +214,11 @@
SERIALIZER.encode(connectPoint));
try {
- ClusterMessageResponse response =
+ Future<byte[]> response =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a StatsStoreException
throw new RuntimeException(e);
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
deleted file mode 100644
index 1772a3c..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.onlab.netty;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * An asynchronous response.
- * This class provides a base implementation of Response, with methods to retrieve the
- * result and query to see if the result is ready. The result can only be retrieved when
- * it is ready and the get methods will block if the result is not ready yet.
- */
-public class AsyncResponse implements Response {
-
- private byte[] value;
- private boolean done = false;
- private final long start = System.nanoTime();
-
- @Override
- public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
- timeout = timeUnit.toNanos(timeout);
- boolean interrupted = false;
- try {
- synchronized (this) {
- while (!done) {
- try {
- long timeRemaining = timeout - (System.nanoTime() - start);
- if (timeRemaining <= 0) {
- throw new TimeoutException("Operation timed out.");
- }
- TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- return value;
- }
-
- @Override
- public byte[] get() throws InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isReady() {
- return done;
- }
-
- /**
- * Sets response value and unblocks any thread blocking on the response to become
- * available.
- * @param data response data.
- */
- public synchronized void setResponse(byte[] data) {
- if (!done) {
- done = true;
- value = data;
- this.notifyAll();
- }
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index 08676ac..bf93331 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -2,6 +2,8 @@
import java.io.IOException;
+import com.google.common.util.concurrent.ListenableFuture;
+
/**
* Interface for low level messaging primitives.
*/
@@ -24,7 +26,7 @@
* @return a response future
* @throws IOException
*/
- public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
+ public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Registers a new message handler for message type.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 26d835d..6e5aa89 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -5,6 +5,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
@@ -26,7 +27,6 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
@@ -34,6 +34,8 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
/**
* A Netty based implementation of MessagingService.
@@ -44,7 +46,8 @@
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
- private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
+ private final AtomicLong messageIdGenerator = new AtomicLong(0);
+ private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
@@ -119,7 +122,7 @@
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
- .withId(RandomUtils.nextLong())
+ .withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
.withType(type)
.withPayload(payload)
@@ -142,10 +145,10 @@
}
@Override
- public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
+ public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
throws IOException {
- AsyncResponse futureResponse = new AsyncResponse();
- Long messageId = RandomUtils.nextLong();
+ SettableFuture<byte[]> futureResponse = SettableFuture.create();
+ Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
@@ -267,10 +270,10 @@
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
- AsyncResponse futureResponse =
+ SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
- futureResponse.setResponse(message.payload());
+ futureResponse.set(message.payload());
} else {
log.warn("Received a reply. But was unable to locate the request handle");
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java
deleted file mode 100644
index 150755e..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/Response.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.onlab.netty;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Response object returned when making synchronous requests.
- * Can you used to check is a response is ready and/or wait for a response
- * to become available.
- */
-public interface Response {
-
- /**
- * Gets the response waiting for a designated timeout period.
- * @param timeout timeout period (since request was sent out)
- * @param tu unit of time.
- * @return response payload
- * @throws TimeoutException if the timeout expires before the response arrives.
- */
- public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
-
- /**
- * Gets the response waiting for indefinite timeout period.
- * @return response payload
- * @throws InterruptedException if the thread is interrupted before the response arrives.
- */
- public byte[] get() throws InterruptedException;
-
- /**
- * Checks if the response is ready without blocking.
- * @return true if response is ready, false otherwise.
- */
- public boolean isReady();
-}
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index 36d2a1e..ddcdd6f 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -1,9 +1,12 @@
package org.onlab.netty;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
+
import static org.junit.Assert.*;
+
import org.junit.Test;
/**
@@ -20,8 +23,8 @@
ponger.activate();
ponger.registerHandler("echo", new EchoHandler());
byte[] payload = RandomUtils.nextBytes(100);
- Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
- assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS));
+ Future<byte[]> responseFuture = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
+ assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
} finally {
pinger.deactivate();
ponger.deactivate();