MessagingService API enchancements
Change-Id: Iabfe15d4f08d7c53bd6575c5d94d0ac9f4e1a38e
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index 6ccd483..73f4258 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -17,8 +17,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
/**
* Interface for low level messaging primitives.
@@ -36,7 +36,7 @@
CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload);
/**
- * Sends a message synchronously and waits for a response.
+ * Sends a message asynchronously and expects a response.
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload.
@@ -45,12 +45,14 @@
CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
/**
- * Registers a new message handler for message type.
- * @param type message type.
- * @param handler message handler
- * @param executor executor to use for running message handler logic.
+ * Sends a message synchronously and expects a response.
+ * @param ep end point to send the message to.
+ * @param type type of message.
+ * @param payload message payload.
+ * @param executor executor over which any follow up actions after completion will be executed.
+ * @return a response future
*/
- void registerHandler(String type, Consumer<byte[]> handler, Executor executor);
+ CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor);
/**
* Registers a new message handler for message type.
@@ -58,14 +60,22 @@
* @param handler message handler
* @param executor executor to use for running message handler logic.
*/
- void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor);
+ void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor);
+
+ /**
+ * Registers a new message handler for message type.
+ * @param type message type.
+ * @param handler message handler
+ * @param executor executor to use for running message handler logic.
+ */
+ void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor);
/**
* Registers a new message handler for message type.
* @param type message type.
* @param handler message handler
*/
- void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler);
+ void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler);
/**
* Unregister current handler, if one exists for message type.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index b2ee832..b537517 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -49,7 +49,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -241,9 +241,9 @@
});
}
- private class HeartbeatMessageHandler implements Consumer<byte[]> {
+ private class HeartbeatMessageHandler implements BiConsumer<Endpoint, byte[]> {
@Override
- public void accept(byte[] message) {
+ public void accept(Endpoint sender, byte[] message) {
HeartbeatMessage hb = SERIALIZER.decode(message);
failureDetector.report(hb.source().id());
hb.knownPeers().forEach(node -> {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 8a237ef..df4ac5c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -35,10 +35,13 @@
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
+
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -210,7 +213,7 @@
executor);
}
- private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
+ private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
private ClusterMessageHandler handler;
public InternalClusterMessageHandler(ClusterMessageHandler handler) {
@@ -218,14 +221,14 @@
}
@Override
- public byte[] apply(byte[] bytes) {
+ public byte[] apply(Endpoint sender, byte[] bytes) {
ClusterMessage message = ClusterMessage.fromBytes(bytes);
handler.handle(message);
return message.response();
}
}
- private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
+ private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
private final Function<byte[], M> decoder;
private final Function<R, byte[]> encoder;
private final Function<M, CompletableFuture<R>> handler;
@@ -239,12 +242,12 @@
}
@Override
- public CompletableFuture<byte[]> apply(byte[] bytes) {
+ public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
}
}
- private class InternalMessageConsumer<M> implements Consumer<byte[]> {
+ private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
private final Function<byte[], M> decoder;
private final Consumer<M> consumer;
@@ -254,7 +257,7 @@
}
@Override
- public void accept(byte[] bytes) {
+ public void accept(Endpoint sender, byte[] bytes) {
consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
}
}