Support user-provided timeouts in intra-cluster communication service

Change-Id: I4ed9cd2e84df83b45ae17af24b9780b9ac97a95d
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index 7aa5ac6..bb17cbc 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.cluster.messaging;
 
+import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -105,11 +106,33 @@
      * @param <R> reply type
      * @return reply future
      */
+    default <M, R> CompletableFuture<R> sendAndReceive(M message,
+            MessageSubject subject,
+            Function<M, byte[]> encoder,
+            Function<byte[], R> decoder,
+            NodeId toNodeId) {
+        return sendAndReceive(message, subject, encoder, decoder, toNodeId, Duration.ZERO);
+    }
+
+    /**
+     * Sends a message and expects a reply.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding request to byte[]
+     * @param decoder function for decoding response from byte[]
+     * @param toNodeId recipient node identifier
+     * @param timeout the message timeout
+     * @param <M> request type
+     * @param <R> reply type
+     * @return reply future
+     */
     <M, R> CompletableFuture<R> sendAndReceive(M message,
             MessageSubject subject,
             Function<M, byte[]> encoder,
             Function<byte[], R> decoder,
-            NodeId toNodeId);
+            NodeId toNodeId,
+            Duration timeout);
 
     /**
      * Adds a new subscriber for the specified message subject.
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index a810a69..8927e51 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -15,11 +15,14 @@
  */
 package org.onosproject.store.cluster.messaging;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 /**
  * Interface for low level messaging primitives.
  */
@@ -42,7 +45,9 @@
      * @param payload message payload.
      * @return a response future
      */
-    CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
+    default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+        return sendAndReceive(ep, type, payload, Duration.ZERO, MoreExecutors.directExecutor());
+    }
 
     /**
      * Sends a message synchronously and expects a response.
@@ -52,7 +57,33 @@
      * @param executor executor over which any follow up actions after completion will be executed.
      * @return a response future
      */
-    CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor);
+    default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
+        return sendAndReceive(ep, type, payload, Duration.ZERO, executor);
+    }
+
+    /**
+     * Sends a message asynchronously and expects a response.
+     * @param ep end point to send the message to.
+     * @param type type of message.
+     * @param payload message payload.
+     * @param timeout operation timeout
+     * @return a response future
+     */
+    default CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Duration timeout) {
+        return sendAndReceive(ep, type, payload, timeout, MoreExecutors.directExecutor());
+    }
+
+    /**
+     * Sends a message synchronously and expects a response.
+     * @param ep end point to send the message to.
+     * @param type type of message.
+     * @param payload message payload.
+     * @param executor executor over which any follow up actions after completion will be executed.
+     * @param timeout operation timeout
+     * @return a response future
+     */
+    CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Duration timeout,
+                                             Executor executor);
 
     /**
      * Registers a new message handler for message type.
diff --git a/core/api/src/test/java/org/onosproject/store/cluster/messaging/ClusterCommunicationServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/cluster/messaging/ClusterCommunicationServiceAdapter.java
index fda20fc..0cf805b 100644
--- a/core/api/src/test/java/org/onosproject/store/cluster/messaging/ClusterCommunicationServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/cluster/messaging/ClusterCommunicationServiceAdapter.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.cluster.messaging;
 
+import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -63,7 +64,7 @@
     @Override
     public <M, R> CompletableFuture<R> sendAndReceive(M message,
                                                       MessageSubject subject, Function<M, byte[]> encoder,
-                                                      Function<byte[], R> decoder, NodeId toNodeId) {
+                                                      Function<byte[], R> decoder, NodeId toNodeId, Duration timeout) {
         return null;
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 868006b..829c474 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
+import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -156,7 +157,8 @@
                                                       MessageSubject subject,
                                                       Function<M, byte[]> encoder,
                                                       Function<byte[], R> decoder,
-                                                      NodeId toNodeId) {
+                                                      NodeId toNodeId,
+                                                      Duration timeout) {
         checkPermission(CLUSTER_WRITE);
         try {
             ClusterMessage envelope = new ClusterMessage(
@@ -164,7 +166,7 @@
                     subject,
                     timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
                             apply(message));
-            return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+            return sendAndReceive(subject, envelope.getBytes(), toNodeId, timeout).
                     thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
         } catch (Exception e) {
             return Tools.exceptionalFuture(e);
@@ -179,7 +181,8 @@
         return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
     }
 
-    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+    private CompletableFuture<byte[]> sendAndReceive(
+        MessageSubject subject, byte[] payload, NodeId toNodeId, Duration timeout) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -187,7 +190,7 @@
                 startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
         MeteringAgent.Context subjectContext = subjectMeteringAgent.
                 startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
-        return messagingService.sendAndReceive(nodeEp, subject.toString(), payload).
+        return messagingService.sendAndReceive(nodeEp, subject.toString(), payload, timeout).
                 whenComplete((bytes, throwable) -> {
                     subjectContext.stop(throwable);
                     epContext.stop(throwable);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index f1ad59a..e5b362e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -360,13 +360,8 @@
     }
 
     @Override
-    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
-        checkPermission(CLUSTER_WRITE);
-        return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
-    }
-
-    @Override
-    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
+    public CompletableFuture<byte[]> sendAndReceive(
+        Endpoint ep, String type, byte[] payload, Duration timeout, Executor executor) {
         checkPermission(CLUSTER_WRITE);
         long messageId = messageIdGenerator.incrementAndGet();
         InternalRequest message = new InternalRequest(preamble,
@@ -375,7 +370,7 @@
                 localEndpoint,
                 type,
                 payload);
-        return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
+        return executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message, timeout), executor);
     }
 
     private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
@@ -754,11 +749,13 @@
      */
     private final class Callback {
         private final String type;
+        private final long timeout;
         private final CompletableFuture<byte[]> future;
         private final long time = System.currentTimeMillis();
 
-        Callback(String type, CompletableFuture<byte[]> future) {
+        Callback(String type, long timeout, CompletableFuture<byte[]> future) {
             this.type = type;
+            this.timeout = timeout;
             this.future = future;
         }
 
@@ -788,9 +785,10 @@
          * Sends a message to the other side of the connection, awaiting a reply.
          *
          * @param message the message to send
+         * @param timeout the response timeout
          * @return a completable future to be completed once a reply is received or the request times out
          */
-        CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
+        CompletableFuture<byte[]> sendAndReceive(InternalRequest message, Duration timeout);
 
         /**
          * Closes the connection.
@@ -844,7 +842,7 @@
                 Callback callback = iterator.next().getValue();
                 try {
                     TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
-                    long currentTimeout = timeoutHistory.currentTimeout;
+                    long currentTimeout = callback.timeout > 0 ? callback.timeout : timeoutHistory.currentTimeout;
                     if (currentTime - callback.time > currentTimeout) {
                         iterator.remove();
                         long elapsedTime = currentTime - callback.time;
@@ -863,8 +861,8 @@
             }
         }
 
-        protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
-            futures.put(id, new Callback(subject, future));
+        protected void registerCallback(long id, String subject, long timeout, CompletableFuture<byte[]> future) {
+            futures.put(id, new Callback(subject, timeout, future));
         }
 
         protected Callback completeCallback(long id) {
@@ -910,10 +908,10 @@
         }
 
         @Override
-        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
+        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message, Duration timeout) {
             CompletableFuture<byte[]> future = new CompletableFuture<>();
             future.whenComplete((r, e) -> completeCallback(message.id()));
-            registerCallback(message.id(), message.subject(), future);
+            registerCallback(message.id(), message.subject(), timeout.toMillis(), future);
             BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
             if (handler != null) {
                 handler.accept(message, new LocalServerConnection(future));
@@ -976,9 +974,9 @@
         }
 
         @Override
-        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
+        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message, Duration timeout) {
             CompletableFuture<byte[]> future = new CompletableFuture<>();
-            registerCallback(message.id(), message.subject(), future);
+            registerCallback(message.id(), message.subject(), timeout.toMillis(), future);
             channel.writeAndFlush(message).addListener(channelFuture -> {
                 if (!channelFuture.isSuccess()) {
                     Callback callback = failCallback(message.id());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index eab42cb..8d6147a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.flow.impl;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -73,6 +74,7 @@
         .register(LogicalTimestamp.class)
         .register(Timestamped.class)
         .build());
+    private static final int GET_FLOW_ENTRIES_TIMEOUT = 15; // seconds
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -243,7 +245,8 @@
                 getFlowsSubject,
                 SERIALIZER::encode,
                 SERIALIZER::decode,
-                replicaInfo.master());
+                replicaInfo.master(),
+                Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
         } else {
             return CompletableFuture.completedFuture(Collections.emptySet());
         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 3471e8e..754dca4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -131,6 +131,7 @@
     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
     private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+    private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
 
     @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
         label = "Number of threads in the message handler pool")
@@ -775,7 +776,7 @@
         public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
             try {
                 return getFlowTable(deviceId).getFlowEntries()
-                    .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                    .get(GET_FLOW_ENTRIES_TIMEOUT, TimeUnit.SECONDS);
             } catch (ExecutionException e) {
                 throw new RuntimeException(e.getCause());
             } catch (TimeoutException | InterruptedException e) {
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java
index b7a1b89..dfe16d5 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/TestClusterCommunicationService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -83,7 +84,8 @@
             MessageSubject subject,
             Function<M, byte[]> encoder,
             Function<byte[], R> decoder,
-            NodeId toNodeId) {
+            NodeId toNodeId,
+            Duration duration) {
         TestClusterCommunicationService node = nodes.get(toNodeId);
         if (node == null) {
             return Futures.exceptionalFuture(new MessagingException.NoRemoteHandler());