Support user-provided timeouts in intra-cluster communication service

Change-Id: I4ed9cd2e84df83b45ae17af24b9780b9ac97a95d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 4b7a80a..576e0b7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
+import java.time.Duration;
 import com.google.common.base.Objects;
 import com.google.common.base.Throwables;
 import org.onlab.util.Tools;
@@ -154,7 +155,8 @@
                                                       MessageSubject subject,
                                                       Function<M, byte[]> encoder,
                                                       Function<byte[], R> decoder,
-                                                      NodeId toNodeId) {
+                                                      NodeId toNodeId,
+                                                      Duration timeout) {
         checkPermission(CLUSTER_WRITE);
         try {
             ClusterMessage envelope = new ClusterMessage(
@@ -162,7 +164,7 @@
                     subject,
                     timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
                             apply(message));
-            return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+            return sendAndReceive(subject, envelope.getBytes(), toNodeId, timeout).
                     thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
         } catch (Exception e) {
             return Tools.exceptionalFuture(e);
@@ -177,7 +179,8 @@
         return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
     }
 
-    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+    private CompletableFuture<byte[]> sendAndReceive(
+        MessageSubject subject, byte[] payload, NodeId toNodeId, Duration timeout) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -185,7 +188,7 @@
                 startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
         MeteringAgent.Context subjectContext = subjectMeteringAgent.
                 startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
-        return messagingService.sendAndReceive(nodeEp, subject.toString(), payload).
+        return messagingService.sendAndReceive(nodeEp, subject.toString(), payload, timeout).
                 whenComplete((bytes, throwable) -> {
                     subjectContext.stop(throwable);
                     epContext.stop(throwable);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 0ffd59c..f3206c3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.flow.impl;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -73,6 +74,7 @@
         .register(LogicalTimestamp.class)
         .register(Timestamped.class)
         .build());
+    private static final int GET_FLOW_ENTRIES_TIMEOUT = 15; // seconds
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -243,7 +245,8 @@
                 getFlowsSubject,
                 SERIALIZER::encode,
                 SERIALIZER::decode,
-                replicaInfo.master());
+                replicaInfo.master(),
+                Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
         } else {
             return CompletableFuture.completedFuture(Collections.emptySet());
         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 9a68356..b4e80a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -136,6 +136,7 @@
     private final Logger log = getLogger(getClass());
 
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+    private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
 
     /** Number of threads in the message handler pool. */
     private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
@@ -783,7 +784,7 @@
         public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
             try {
                 return getFlowTable(deviceId).getFlowEntries()
-                    .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                    .get(GET_FLOW_ENTRIES_TIMEOUT, TimeUnit.SECONDS);
             } catch (ExecutionException e) {
                 throw new FlowRuleStoreException(e.getCause());
             } catch (TimeoutException e) {