Support user-provided timeouts in intra-cluster communication service
Change-Id: I4ed9cd2e84df83b45ae17af24b9780b9ac97a95d
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index 7aa5ac6..bb17cbc 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.cluster.messaging;
+import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -105,11 +106,33 @@
* @param <R> reply type
* @return reply future
*/
+ default <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ return sendAndReceive(message, subject, encoder, decoder, toNodeId, Duration.ZERO);
+ }
+
+ /**
+ * Sends a message and expects a reply.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding request to byte[]
+ * @param decoder function for decoding response from byte[]
+ * @param toNodeId recipient node identifier
+ * @param timeout the message timeout
+ * @param <M> request type
+ * @param <R> reply type
+ * @return reply future
+ */
<M, R> CompletableFuture<R> sendAndReceive(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
- NodeId toNodeId);
+ NodeId toNodeId,
+ Duration timeout);
/**
* Adds a new subscriber for the specified message subject.
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index a810a69..8927e51 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -15,11 +15,14 @@
*/
package org.onosproject.store.cluster.messaging;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
+import com.google.common.util.concurrent.MoreExecutors;
+
/**
* Interface for low level messaging primitives.
*/
@@ -42,7 +45,9 @@
* @param payload message payload.
* @return a response future
*/
- CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
+ default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+ return sendAndReceive(ep, type, payload, Duration.ZERO, MoreExecutors.directExecutor());
+ }
/**
* Sends a message synchronously and expects a response.
@@ -52,7 +57,33 @@
* @param executor executor over which any follow up actions after completion will be executed.
* @return a response future
*/
- CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor);
+ default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
+ return sendAndReceive(ep, type, payload, Duration.ZERO, executor);
+ }
+
+ /**
+ * Sends a message asynchronously and expects a response.
+ * @param ep end point to send the message to.
+ * @param type type of message.
+ * @param payload message payload.
+ * @param timeout operation timeout
+ * @return a response future
+ */
+ default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Duration timeout) {
+ return sendAndReceive(ep, type, payload, timeout, MoreExecutors.directExecutor());
+ }
+
+ /**
+ * Sends a message synchronously and expects a response.
+ * @param ep end point to send the message to.
+ * @param type type of message.
+ * @param payload message payload.
+ * @param executor executor over which any follow up actions after completion will be executed.
+ * @param timeout operation timeout
+ * @return a response future
+ */
+ CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Duration timeout,
+ Executor executor);
/**
* Registers a new message handler for message type.