Use failure detection for Netty messages to improve ability to adapt to changing network conditions
Change-Id: I2928072ffd9c662e1817d67453f19626681c0aff
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 82ba95f..284251b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -103,13 +103,13 @@
@Component(immediate = true)
@Service
public class NettyMessagingManager implements MessagingService {
- private static final long DEFAULT_TIMEOUT_MILLIS = 500;
private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
- private static final long MIN_TIMEOUT_MILLIS = 250;
- private static final long MAX_TIMEOUT_MILLIS = 5000;
private static final long TIMEOUT_INTERVAL = 50;
private static final int WINDOW_SIZE = 100;
- private static final double TIMEOUT_MULTIPLIER = 2.5;
+ private static final int MIN_SAMPLES = 25;
+ private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
+ private static final int PHI_FAILURE_THRESHOLD = 5;
+ private static final long MAX_TIMEOUT_MILLIS = 15000;
private static final int CHANNEL_POOL_SIZE = 8;
private static final byte[] EMPTY_PAYLOAD = new byte[0];
@@ -741,14 +741,14 @@
private abstract class AbstractClientConnection implements ClientConnection {
private final Map<Long, Callback> futures = Maps.newConcurrentMap();
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
+ private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
.expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
.build();
/**
* Times out callbacks for this connection.
*/
- protected void timeoutCallbacks() {
+ void timeoutCallbacks() {
// Store the current time.
long currentTime = System.currentTimeMillis();
@@ -758,12 +758,11 @@
while (iterator.hasNext()) {
Callback callback = iterator.next().getValue();
try {
- TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
- long currentTimeout = timeoutHistory.currentTimeout;
- if (currentTime - callback.time > currentTimeout) {
+ RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
+ long elapsedTime = currentTime - callback.time;
+ if (elapsedTime > MAX_TIMEOUT_MILLIS || requestMonitor.isTimedOut(elapsedTime)) {
iterator.remove();
- long elapsedTime = currentTime - callback.time;
- timeoutHistory.addReplyTime(elapsedTime);
+ requestMonitor.addReplyTime(elapsedTime);
callback.completeExceptionally(
new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
}
@@ -771,11 +770,6 @@
throw new AssertionError();
}
}
-
- // Iterate through all timeout histories and recompute the timeout.
- for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
- timeoutHistory.recomputeTimeoutMillis();
- }
}
protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
@@ -786,8 +780,8 @@
Callback callback = futures.remove(id);
if (callback != null) {
try {
- TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
- timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
+ RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
+ requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
} catch (ExecutionException e) {
throw new AssertionError();
}
@@ -985,10 +979,8 @@
/**
* Request-reply timeout history tracker.
*/
- private static final class TimeoutHistory {
- private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
- private final AtomicLong maxReplyTime = new AtomicLong();
- private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
+ private static final class RequestMonitor {
+ private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
/**
* Adds a reply time to the history.
@@ -996,19 +988,41 @@
* @param replyTime the reply time to add to the history
*/
void addReplyTime(long replyTime) {
- maxReplyTime.getAndAccumulate(replyTime, Math::max);
+ samples.addValue(replyTime);
}
/**
- * Computes the current timeout.
+ * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
+ *
+ * @param elapsedTime the elapsed request time
+ * @return indicates whether the request should be timed out
*/
- private void recomputeTimeoutMillis() {
- double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
- timeoutHistory.addValue(
- Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
- if (timeoutHistory.getN() == WINDOW_SIZE) {
- this.currentTimeout = (long) timeoutHistory.getMax();
+ boolean isTimedOut(long elapsedTime) {
+ return phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
+ }
+
+ /**
+ * Compute phi for the specified node id.
+ *
+ * @param elapsedTime the duration since the request was sent
+ * @return phi value
+ */
+ private double phi(long elapsedTime) {
+ if (samples.getN() < MIN_SAMPLES) {
+ return 0.0;
}
+ return computePhi(samples, elapsedTime);
+ }
+
+ /**
+ * Computes the phi value from the given samples.
+ *
+ * @param samples the samples from which to compute phi
+ * @param elapsedTime the duration since the request was sent
+ * @return phi
+ */
+ private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
+ return samples.getN() > 0 ? PHI_FACTOR * elapsedTime / samples.getMean() : 100;
}
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
index 435f6bd..e48b225 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
@@ -43,6 +43,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
@@ -160,7 +161,7 @@
}
@Test
- public void testSendTimeout() {
+ public void testDefaultTimeout() {
String subject = nextSubject();
BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>();
netty2.registerHandler(subject, handler);
@@ -173,6 +174,30 @@
}
}
+ @Test
+ public void testDynamicTimeout() {
+ String subject = nextSubject();
+ AtomicInteger counter = new AtomicInteger();
+ BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> {
+ if (counter.incrementAndGet() <= 50) {
+ return CompletableFuture.completedFuture(new byte[0]);
+ } else {
+ return new CompletableFuture<>();
+ }
+ };
+ netty2.registerHandler(subject, handler);
+
+ for (int i = 0; i < 50; i++) {
+ netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
+ }
+ try {
+ netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
+ fail();
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+ }
+
/*
* Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
* and response completion occurs on the expected thread.