Support user-provided timeouts in intra-cluster communication service
Change-Id: I4ed9cd2e84df83b45ae17af24b9780b9ac97a95d
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index 7aa5ac6..bb17cbc 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.cluster.messaging;
+import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -105,11 +106,33 @@
* @param <R> reply type
* @return reply future
*/
+ default <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ return sendAndReceive(message, subject, encoder, decoder, toNodeId, Duration.ZERO);
+ }
+
+ /**
+ * Sends a message and expects a reply.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding request to byte[]
+ * @param decoder function for decoding response from byte[]
+ * @param toNodeId recipient node identifier
+ * @param timeout the message timeout
+ * @param <M> request type
+ * @param <R> reply type
+ * @return reply future
+ */
<M, R> CompletableFuture<R> sendAndReceive(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
- NodeId toNodeId);
+ NodeId toNodeId,
+ Duration timeout);
/**
* Adds a new subscriber for the specified message subject.
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index a810a69..8927e51 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -15,11 +15,14 @@
*/
package org.onosproject.store.cluster.messaging;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
+import com.google.common.util.concurrent.MoreExecutors;
+
/**
* Interface for low level messaging primitives.
*/
@@ -42,7 +45,9 @@
* @param payload message payload.
* @return a response future
*/
- CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
+ default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+ return sendAndReceive(ep, type, payload, Duration.ZERO, MoreExecutors.directExecutor());
+ }
/**
* Sends a message synchronously and expects a response.
@@ -52,7 +57,33 @@
* @param executor executor over which any follow up actions after completion will be executed.
* @return a response future
*/
- CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor);
+ default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
+ return sendAndReceive(ep, type, payload, Duration.ZERO, executor);
+ }
+
+ /**
+ * Sends a message asynchronously and expects a response.
+ * @param ep end point to send the message to.
+ * @param type type of message.
+ * @param payload message payload.
+ * @param timeout operation timeout
+ * @return a response future
+ */
+ default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Duration timeout) {
+ return sendAndReceive(ep, type, payload, timeout, MoreExecutors.directExecutor());
+ }
+
+ /**
+ * Sends a message synchronously and expects a response.
+ * @param ep end point to send the message to.
+ * @param type type of message.
+ * @param payload message payload.
+ * @param executor executor over which any follow up actions after completion will be executed.
+ * @param timeout operation timeout
+ * @return a response future
+ */
+ CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Duration timeout,
+ Executor executor);
/**
* Registers a new message handler for message type.
diff --git a/core/api/src/test/java/org/onosproject/store/cluster/messaging/ClusterCommunicationServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/cluster/messaging/ClusterCommunicationServiceAdapter.java
index fda20fc..0cf805b 100644
--- a/core/api/src/test/java/org/onosproject/store/cluster/messaging/ClusterCommunicationServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/cluster/messaging/ClusterCommunicationServiceAdapter.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.cluster.messaging;
+import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -63,7 +64,7 @@
@Override
public <M, R> CompletableFuture<R> sendAndReceive(M message,
MessageSubject subject, Function<M, byte[]> encoder,
- Function<byte[], R> decoder, NodeId toNodeId) {
+ Function<byte[], R> decoder, NodeId toNodeId, Duration timeout) {
return null;
}
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationService.java b/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationService.java
index 680ddd0..c273f43 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationService.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.cluster.impl;
+import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -83,7 +84,8 @@
MessageSubject subject,
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
- NodeId toNodeId) {
+ NodeId toNodeId,
+ Duration duration) {
TestClusterCommunicationService node = nodes.get(toNodeId);
if (node == null) {
return Tools.exceptionalFuture(new MessagingException.NoRemoteHandler());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 4b7a80a..576e0b7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.cluster.messaging.impl;
+import java.time.Duration;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import org.onlab.util.Tools;
@@ -154,7 +155,8 @@
MessageSubject subject,
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
- NodeId toNodeId) {
+ NodeId toNodeId,
+ Duration timeout) {
checkPermission(CLUSTER_WRITE);
try {
ClusterMessage envelope = new ClusterMessage(
@@ -162,7 +164,7 @@
subject,
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId, timeout).
thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
} catch (Exception e) {
return Tools.exceptionalFuture(e);
@@ -177,7 +179,8 @@
return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
}
- private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ private CompletableFuture<byte[]> sendAndReceive(
+ MessageSubject subject, byte[] payload, NodeId toNodeId, Duration timeout) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -185,7 +188,7 @@
startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
MeteringAgent.Context subjectContext = subjectMeteringAgent.
startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
- return messagingService.sendAndReceive(nodeEp, subject.toString(), payload).
+ return messagingService.sendAndReceive(nodeEp, subject.toString(), payload, timeout).
whenComplete((bytes, throwable) -> {
subjectContext.stop(throwable);
epContext.stop(throwable);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 0ffd59c..f3206c3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.flow.impl;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -73,6 +74,7 @@
.register(LogicalTimestamp.class)
.register(Timestamped.class)
.build());
+ private static final int GET_FLOW_ENTRIES_TIMEOUT = 15; // seconds
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -243,7 +245,8 @@
getFlowsSubject,
SERIALIZER::encode,
SERIALIZER::decode,
- replicaInfo.master());
+ replicaInfo.master(),
+ Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
} else {
return CompletableFuture.completedFuture(Collections.emptySet());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 9a68356..b4e80a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -136,6 +136,7 @@
private final Logger log = getLogger(getClass());
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+ private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
/** Number of threads in the message handler pool. */
private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
@@ -783,7 +784,7 @@
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
try {
return getFlowTable(deviceId).getFlowEntries()
- .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ .get(GET_FLOW_ENTRIES_TIMEOUT, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw new FlowRuleStoreException(e.getCause());
} catch (TimeoutException e) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/cluster/messaging/impl/AtomixMessagingManager.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/cluster/messaging/impl/AtomixMessagingManager.java
index 54d49d1..0f9a5d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/cluster/messaging/impl/AtomixMessagingManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/cluster/messaging/impl/AtomixMessagingManager.java
@@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
@@ -71,13 +72,9 @@
}
@Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
- return messagingService.sendAndReceive(toAddress(ep), type, payload);
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
- return messagingService.sendAndReceive(toAddress(ep), type, payload, executor);
+ public CompletableFuture<byte[]> sendAndReceive(
+ Endpoint ep, String type, byte[] payload, Duration timeout, Executor executor) {
+ return messagingService.sendAndReceive(toAddress(ep), type, payload, timeout, executor);
}
@Override