sendAndReceive now returns a Future instead of Reponse
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index a737868..dbd2688 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -12,6 +12,7 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -49,7 +50,6 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
 import org.onlab.onos.store.flow.ReplicaInfo;
 import org.onlab.onos.store.flow.ReplicaInfoService;
 import org.onlab.onos.store.serializers.DistributedStoreSerializers;
@@ -57,6 +57,7 @@
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -213,9 +214,9 @@
                 SERIALIZER.encode(rule));
 
         try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException e) {
+            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
@@ -247,9 +248,9 @@
                 SERIALIZER.encode(deviceId));
 
         try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException e) {
+            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
@@ -291,14 +292,17 @@
                 SERIALIZER.encode(operation));
 
         try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-        } catch (IOException | TimeoutException e) {
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException(e);
+            ListenableFuture<byte[]> responseFuture =
+                    clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
+                @Override
+                public CompletedBatchOperation apply(byte[] input) {
+                    return SERIALIZER.decode(input);
+                }
+            });
+        } catch (IOException e) {
+            return Futures.immediateFailedFuture(e);
         }
-
-        return null;
     }
 
     private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {