MessagingService API enchancements

Change-Id: Iabfe15d4f08d7c53bd6575c5d94d0ac9f4e1a38e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 8a237ef..df4ac5c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -35,10 +35,13 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
+
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -210,7 +213,7 @@
                 executor);
     }
 
-    private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
+    private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
         private ClusterMessageHandler handler;
 
         public InternalClusterMessageHandler(ClusterMessageHandler handler) {
@@ -218,14 +221,14 @@
         }
 
         @Override
-        public byte[] apply(byte[] bytes) {
+        public byte[] apply(Endpoint sender, byte[] bytes) {
             ClusterMessage message = ClusterMessage.fromBytes(bytes);
             handler.handle(message);
             return message.response();
         }
     }
 
-    private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
+    private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
         private final Function<byte[], M> decoder;
         private final Function<R, byte[]> encoder;
         private final Function<M, CompletableFuture<R>> handler;
@@ -239,12 +242,12 @@
         }
 
         @Override
-        public CompletableFuture<byte[]> apply(byte[] bytes) {
+        public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
             return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
         }
     }
 
-    private class InternalMessageConsumer<M> implements Consumer<byte[]> {
+    private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
         private final Function<byte[], M> decoder;
         private final Consumer<M> consumer;
 
@@ -254,7 +257,7 @@
         }
 
         @Override
-        public void accept(byte[] bytes) {
+        public void accept(Endpoint sender, byte[] bytes) {
             consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
         }
     }