sendAndReceive now returns a Future instead of Reponse
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index b2f679c..d8e5fab 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,9 +4,7 @@
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -20,7 +18,6 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
@@ -32,10 +29,11 @@
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
-import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ListenableFuture;
+
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
@@ -133,14 +131,12 @@
}
@Override
- public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- Response responseFuture =
- messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
- return new InternalClusterMessageResponse(toNodeId, responseFuture);
+ return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
} catch (IOException e) {
log.error("Failed interaction with remote nodeId: " + toNodeId, e);
@@ -188,60 +184,4 @@
rawMessage.respond(response);
}
}
-
- private static final class InternalClusterMessageResponse
- implements ClusterMessageResponse {
-
- private final NodeId sender;
- private final Response responseFuture;
- private volatile boolean isCancelled = false;
- private volatile boolean isDone = false;
-
- public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
- this.sender = sender;
- this.responseFuture = responseFuture;
- }
- @Override
- public NodeId sender() {
- return sender;
- }
-
- @Override
- public byte[] get(long timeout, TimeUnit timeunit)
- throws TimeoutException {
- final byte[] result = responseFuture.get(timeout, timeunit);
- isDone = true;
- return result;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (isDone()) {
- return false;
- }
- // doing nothing for now
- // when onlab.netty Response support cancel, call them.
- isCancelled = true;
- return true;
- }
-
- @Override
- public boolean isCancelled() {
- return isCancelled;
- }
-
- @Override
- public boolean isDone() {
- return this.isDone || isCancelled();
- }
-
- @Override
- public byte[] get() throws InterruptedException, ExecutionException {
- // TODO: consider forbidding this call and force the use of timed get
- // to enforce handling of remote peer failure scenario
- final byte[] result = responseFuture.get();
- isDone = true;
- return result;
- }
- }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index a737868..dbd2688 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -49,7 +50,6 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
@@ -57,6 +57,7 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.google.common.base.Function;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -213,9 +214,9 @@
SERIALIZER.encode(rule));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
@@ -247,9 +248,9 @@
SERIALIZER.encode(deviceId));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
@@ -291,14 +292,17 @@
SERIALIZER.encode(operation));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
+ ListenableFuture<byte[]> responseFuture =
+ clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
+ @Override
+ public CompletedBatchOperation apply(byte[] input) {
+ return SERIALIZER.decode(input);
+ }
+ });
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
}
-
- return null;
}
private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
index 273e3cc..7106aef 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -4,6 +4,7 @@
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -21,7 +22,6 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.serializers.KryoNamespaces;
@@ -34,6 +34,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -184,11 +186,11 @@
SERIALIZER.encode(connectPoint));
try {
- ClusterMessageResponse response =
+ Future<byte[]> response =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a StatsStoreException
throw new RuntimeException(e);
}
@@ -212,11 +214,11 @@
SERIALIZER.encode(connectPoint));
try {
- ClusterMessageResponse response =
+ Future<byte[]> response =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a StatsStoreException
throw new RuntimeException(e);
}