sendAndReceive now returns a Future instead of Reponse
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index b2f679c..d8e5fab 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,9 +4,7 @@
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -20,7 +18,6 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
@@ -32,10 +29,11 @@
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
-import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ListenableFuture;
+
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
@@ -133,14 +131,12 @@
}
@Override
- public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- Response responseFuture =
- messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
- return new InternalClusterMessageResponse(toNodeId, responseFuture);
+ return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
} catch (IOException e) {
log.error("Failed interaction with remote nodeId: " + toNodeId, e);
@@ -188,60 +184,4 @@
rawMessage.respond(response);
}
}
-
- private static final class InternalClusterMessageResponse
- implements ClusterMessageResponse {
-
- private final NodeId sender;
- private final Response responseFuture;
- private volatile boolean isCancelled = false;
- private volatile boolean isDone = false;
-
- public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
- this.sender = sender;
- this.responseFuture = responseFuture;
- }
- @Override
- public NodeId sender() {
- return sender;
- }
-
- @Override
- public byte[] get(long timeout, TimeUnit timeunit)
- throws TimeoutException {
- final byte[] result = responseFuture.get(timeout, timeunit);
- isDone = true;
- return result;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (isDone()) {
- return false;
- }
- // doing nothing for now
- // when onlab.netty Response support cancel, call them.
- isCancelled = true;
- return true;
- }
-
- @Override
- public boolean isCancelled() {
- return isCancelled;
- }
-
- @Override
- public boolean isDone() {
- return this.isDone || isCancelled();
- }
-
- @Override
- public byte[] get() throws InterruptedException, ExecutionException {
- // TODO: consider forbidding this call and force the use of timed get
- // to enforce handling of remote peer failure scenario
- final byte[] result = responseFuture.get();
- isDone = true;
- return result;
- }
- }
-}
+}
\ No newline at end of file