sendAndReceive now returns a Future instead of Reponse
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index b2f679c..d8e5fab 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,9 +4,7 @@
 
 import java.io.IOException;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -20,7 +18,6 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.serializers.ClusterMessageSerializer;
 import org.onlab.onos.store.serializers.KryoNamespaces;
@@ -32,10 +29,11 @@
 import org.onlab.netty.MessageHandler;
 import org.onlab.netty.MessagingService;
 import org.onlab.netty.NettyMessagingService;
-import org.onlab.netty.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 @Component(immediate = true)
 @Service
 public class ClusterCommunicationManager
@@ -133,14 +131,12 @@
     }
 
     @Override
-    public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
         try {
-            Response responseFuture =
-                    messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
-            return new InternalClusterMessageResponse(toNodeId, responseFuture);
+            return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
 
         } catch (IOException e) {
             log.error("Failed interaction with remote nodeId: " + toNodeId, e);
@@ -188,60 +184,4 @@
             rawMessage.respond(response);
         }
     }
-
-    private static final class InternalClusterMessageResponse
-        implements ClusterMessageResponse {
-
-        private final NodeId sender;
-        private final Response responseFuture;
-        private volatile boolean isCancelled = false;
-        private volatile boolean isDone = false;
-
-        public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
-            this.sender = sender;
-            this.responseFuture = responseFuture;
-        }
-        @Override
-        public NodeId sender() {
-            return sender;
-        }
-
-        @Override
-        public byte[] get(long timeout, TimeUnit timeunit)
-                throws TimeoutException {
-            final byte[] result = responseFuture.get(timeout, timeunit);
-            isDone = true;
-            return result;
-        }
-
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            if (isDone()) {
-                return false;
-            }
-            // doing nothing for now
-            // when onlab.netty Response support cancel, call them.
-            isCancelled = true;
-            return true;
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return isCancelled;
-        }
-
-        @Override
-        public boolean isDone() {
-            return this.isDone || isCancelled();
-        }
-
-        @Override
-        public byte[] get() throws InterruptedException, ExecutionException {
-            // TODO: consider forbidding this call and force the use of timed get
-            //       to enforce handling of remote peer failure scenario
-            final byte[] result = responseFuture.get();
-            isDone = true;
-            return result;
-        }
-    }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index a737868..dbd2688 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -12,6 +12,7 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -49,7 +50,6 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
 import org.onlab.onos.store.flow.ReplicaInfo;
 import org.onlab.onos.store.flow.ReplicaInfoService;
 import org.onlab.onos.store.serializers.DistributedStoreSerializers;
@@ -57,6 +57,7 @@
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -213,9 +214,9 @@
                 SERIALIZER.encode(rule));
 
         try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException e) {
+            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
@@ -247,9 +248,9 @@
                 SERIALIZER.encode(deviceId));
 
         try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException e) {
+            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
@@ -291,14 +292,17 @@
                 SERIALIZER.encode(operation));
 
         try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-        } catch (IOException | TimeoutException e) {
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException(e);
+            ListenableFuture<byte[]> responseFuture =
+                    clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
+                @Override
+                public CompletedBatchOperation apply(byte[] input) {
+                    return SERIALIZER.decode(input);
+                }
+            });
+        } catch (IOException e) {
+            return Futures.immediateFailedFuture(e);
         }
-
-        return null;
     }
 
     private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
index 273e3cc..7106aef 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -4,6 +4,7 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -21,7 +22,6 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
 import org.onlab.onos.store.flow.ReplicaInfo;
 import org.onlab.onos.store.flow.ReplicaInfoService;
 import org.onlab.onos.store.serializers.KryoNamespaces;
@@ -34,6 +34,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -184,11 +186,11 @@
                     SERIALIZER.encode(connectPoint));
 
             try {
-                ClusterMessageResponse response =
+                Future<byte[]> response =
                         clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
                 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
                                                       TimeUnit.MILLISECONDS));
-            } catch (IOException | TimeoutException e) {
+            } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
                 // FIXME: throw a StatsStoreException
                 throw new RuntimeException(e);
             }
@@ -212,11 +214,11 @@
                     SERIALIZER.encode(connectPoint));
 
             try {
-                ClusterMessageResponse response =
+                Future<byte[]> response =
                         clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
                 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
                                                       TimeUnit.MILLISECONDS));
-            } catch (IOException | TimeoutException e) {
+            } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
                 // FIXME: throw a StatsStoreException
                 throw new RuntimeException(e);
             }