Use failure detection for Netty messages to improve ability to adapt to changing network conditions

Change-Id: I2928072ffd9c662e1817d67453f19626681c0aff
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 82ba95f..284251b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -103,13 +103,13 @@
 @Component(immediate = true)
 @Service
 public class NettyMessagingManager implements MessagingService {
-    private static final long DEFAULT_TIMEOUT_MILLIS = 500;
     private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
-    private static final long MIN_TIMEOUT_MILLIS = 250;
-    private static final long MAX_TIMEOUT_MILLIS = 5000;
     private static final long TIMEOUT_INTERVAL = 50;
     private static final int WINDOW_SIZE = 100;
-    private static final double TIMEOUT_MULTIPLIER = 2.5;
+    private static final int MIN_SAMPLES = 25;
+    private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
+    private static final int PHI_FAILURE_THRESHOLD = 5;
+    private static final long MAX_TIMEOUT_MILLIS = 15000;
     private static final int CHANNEL_POOL_SIZE = 8;
 
     private static final byte[] EMPTY_PAYLOAD = new byte[0];
@@ -741,14 +741,14 @@
     private abstract class AbstractClientConnection implements ClientConnection {
         private final Map<Long, Callback> futures = Maps.newConcurrentMap();
         private final AtomicBoolean closed = new AtomicBoolean(false);
-        private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
+        private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
                 .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
                 .build();
 
         /**
          * Times out callbacks for this connection.
          */
-        protected void timeoutCallbacks() {
+        void timeoutCallbacks() {
             // Store the current time.
             long currentTime = System.currentTimeMillis();
 
@@ -758,12 +758,11 @@
             while (iterator.hasNext()) {
                 Callback callback = iterator.next().getValue();
                 try {
-                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
-                    long currentTimeout = timeoutHistory.currentTimeout;
-                    if (currentTime - callback.time > currentTimeout) {
+                    RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
+                    long elapsedTime = currentTime - callback.time;
+                    if (elapsedTime > MAX_TIMEOUT_MILLIS || requestMonitor.isTimedOut(elapsedTime)) {
                         iterator.remove();
-                        long elapsedTime = currentTime - callback.time;
-                        timeoutHistory.addReplyTime(elapsedTime);
+                        requestMonitor.addReplyTime(elapsedTime);
                         callback.completeExceptionally(
                                 new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
                     }
@@ -771,11 +770,6 @@
                     throw new AssertionError();
                 }
             }
-
-            // Iterate through all timeout histories and recompute the timeout.
-            for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
-                timeoutHistory.recomputeTimeoutMillis();
-            }
         }
 
         protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
@@ -786,8 +780,8 @@
             Callback callback = futures.remove(id);
             if (callback != null) {
                 try {
-                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
-                    timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
+                    RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
+                    requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
                 } catch (ExecutionException e) {
                     throw new AssertionError();
                 }
@@ -985,10 +979,8 @@
     /**
      * Request-reply timeout history tracker.
      */
-    private static final class TimeoutHistory {
-        private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
-        private final AtomicLong maxReplyTime = new AtomicLong();
-        private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
+    private static final class RequestMonitor {
+        private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
 
         /**
          * Adds a reply time to the history.
@@ -996,19 +988,41 @@
          * @param replyTime the reply time to add to the history
          */
         void addReplyTime(long replyTime) {
-            maxReplyTime.getAndAccumulate(replyTime, Math::max);
+            samples.addValue(replyTime);
         }
 
         /**
-         * Computes the current timeout.
+         * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
+         *
+         * @param elapsedTime the elapsed request time
+         * @return indicates whether the request should be timed out
          */
-        private void recomputeTimeoutMillis() {
-            double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
-            timeoutHistory.addValue(
-                    Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
-            if (timeoutHistory.getN() == WINDOW_SIZE) {
-                this.currentTimeout = (long) timeoutHistory.getMax();
+        boolean isTimedOut(long elapsedTime) {
+            return phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
+        }
+
+        /**
+         * Compute phi for the specified node id.
+         *
+         * @param elapsedTime the duration since the request was sent
+         * @return phi value
+         */
+        private double phi(long elapsedTime) {
+            if (samples.getN() < MIN_SAMPLES) {
+                return 0.0;
             }
+            return computePhi(samples, elapsedTime);
+        }
+
+        /**
+         * Computes the phi value from the given samples.
+         *
+         * @param samples     the samples from which to compute phi
+         * @param elapsedTime the duration since the request was sent
+         * @return phi
+         */
+        private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
+            return samples.getN() > 0 ? PHI_FACTOR * elapsedTime / samples.getMean() : 100;
         }
     }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
index 435f6bd..e48b225 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
@@ -43,6 +43,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
@@ -160,7 +161,7 @@
     }
 
     @Test
-    public void testSendTimeout() {
+    public void testDefaultTimeout() {
         String subject = nextSubject();
         BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>();
         netty2.registerHandler(subject, handler);
@@ -173,6 +174,30 @@
         }
     }
 
+    @Test
+    public void testDynamicTimeout() {
+        String subject = nextSubject();
+        AtomicInteger counter = new AtomicInteger();
+        BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> {
+            if (counter.incrementAndGet() <= 50) {
+                return CompletableFuture.completedFuture(new byte[0]);
+            } else {
+                return new CompletableFuture<>();
+            }
+        };
+        netty2.registerHandler(subject, handler);
+
+        for (int i = 0; i < 50; i++) {
+            netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
+        }
+        try {
+            netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
+            fail();
+        } catch (CompletionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+    }
+
     /*
      * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
      * and response completion occurs on the expected thread.