diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index 6ccd483..73f4258 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -17,8 +17,8 @@
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 
 /**
  * Interface for low level messaging primitives.
@@ -36,7 +36,7 @@
     CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload);
 
     /**
-     * Sends a message synchronously and waits for a response.
+     * Sends a message asynchronously and expects a response.
      * @param ep end point to send the message to.
      * @param type type of message.
      * @param payload message payload.
@@ -45,12 +45,14 @@
     CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
 
     /**
-     * Registers a new message handler for message type.
-     * @param type message type.
-     * @param handler message handler
-     * @param executor executor to use for running message handler logic.
+     * Sends a message synchronously and expects a response.
+     * @param ep end point to send the message to.
+     * @param type type of message.
+     * @param payload message payload.
+     * @param executor executor over which any follow up actions after completion will be executed.
+     * @return a response future
      */
-    void registerHandler(String type, Consumer<byte[]> handler, Executor executor);
+    CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor);
 
     /**
      * Registers a new message handler for message type.
@@ -58,14 +60,22 @@
      * @param handler message handler
      * @param executor executor to use for running message handler logic.
      */
-    void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor);
+    void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor);
+
+    /**
+     * Registers a new message handler for message type.
+     * @param type message type.
+     * @param handler message handler
+     * @param executor executor to use for running message handler logic.
+     */
+    void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor);
 
     /**
      * Registers a new message handler for message type.
      * @param type message type.
      * @param handler message handler
      */
-    void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler);
+    void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler);
 
     /**
      * Unregister current handler, if one exists for message type.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index b2ee832..b537517 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -49,7 +49,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -241,9 +241,9 @@
         });
     }
 
-    private class HeartbeatMessageHandler implements Consumer<byte[]> {
+    private class HeartbeatMessageHandler implements BiConsumer<Endpoint, byte[]> {
         @Override
-        public void accept(byte[] message) {
+        public void accept(Endpoint sender, byte[] message) {
             HeartbeatMessage hb = SERIALIZER.decode(message);
             failureDetector.report(hb.source().id());
             hb.knownPeers().forEach(node -> {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 8a237ef..df4ac5c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -35,10 +35,13 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
+
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -210,7 +213,7 @@
                 executor);
     }
 
-    private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
+    private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
         private ClusterMessageHandler handler;
 
         public InternalClusterMessageHandler(ClusterMessageHandler handler) {
@@ -218,14 +221,14 @@
         }
 
         @Override
-        public byte[] apply(byte[] bytes) {
+        public byte[] apply(Endpoint sender, byte[] bytes) {
             ClusterMessage message = ClusterMessage.fromBytes(bytes);
             handler.handle(message);
             return message.response();
         }
     }
 
-    private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
+    private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
         private final Function<byte[], M> decoder;
         private final Function<R, byte[]> encoder;
         private final Function<M, CompletableFuture<R>> handler;
@@ -239,12 +242,12 @@
         }
 
         @Override
-        public CompletableFuture<byte[]> apply(byte[] bytes) {
+        public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
             return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
         }
     }
 
-    private class InternalMessageConsumer<M> implements Consumer<byte[]> {
+    private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
         private final Function<byte[], M> decoder;
         private final Consumer<M> consumer;
 
@@ -254,7 +257,7 @@
         }
 
         @Override
-        public void accept(byte[] bytes) {
+        public void accept(Endpoint sender, byte[] bytes) {
             consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
         }
     }
