sendAndReceive now returns a Future instead of Reponse
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index a737868..dbd2688 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -49,7 +50,6 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
@@ -57,6 +57,7 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.google.common.base.Function;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -213,9 +214,9 @@
SERIALIZER.encode(rule));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
@@ -247,9 +248,9 @@
SERIALIZER.encode(deviceId));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException e) {
+ Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
@@ -291,14 +292,17 @@
SERIALIZER.encode(operation));
try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
+ ListenableFuture<byte[]> responseFuture =
+ clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
+ @Override
+ public CompletedBatchOperation apply(byte[] input) {
+ return SERIALIZER.decode(input);
+ }
+ });
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
}
-
- return null;
}
private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {