[ONOS-6267] Detect and complete blocked futures on I/O threads.
Change-Id: I0488dc5096f9e610b97405ad05c02d0ff3854b5f
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 883a7c8..0c76a91 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -643,28 +643,41 @@
}
/**
- * Returns a future that's completed using the given {@link Executor} once the given {@code future} is completed.
+ * Returns a future that's completed using the given {@code orderedExecutor} if the future is not blocked or the
+ * given {@code threadPoolExecutor} if the future is blocked.
* <p>
- * {@link CompletableFuture}'s async methods cannot be relied upon to complete futures on an executor thread. If a
- * future is completed synchronously, {@code CompletableFuture} async methods will often complete the future on the
- * current thread, ignoring the provided {@code Executor}. This method ensures a more reliable and consistent thread
- * model by ensuring that futures are always completed using the provided {@code Executor}.
+ * This method allows futures to maintain single-thread semantics via the provided {@code orderedExecutor} while
+ * ensuring user code can block without blocking completion of futures. When the returned future or any of its
+ * descendants is blocked on a {@link CompletableFuture#get()} or {@link CompletableFuture#join()} call, completion
+ * of the returned future will be done using the provided {@code threadPoolExecutor}.
*
* @param future the future to convert into an asynchronous future
- * @param executor the executor with which to complete the returned future
+ * @param orderedExecutor the ordered executor with which to attempt to complete the future
+ * @param threadPoolExecutor the backup executor with which to complete blocked futures
* @param <T> future value type
* @return a new completable future to be completed using the provided {@code executor} once the provided
* {@code future} is complete
*/
- public static <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future, Executor executor) {
- CompletableFuture<T> newFuture = new CompletableFuture<T>();
- future.whenComplete((result, error) -> executor.execute(() -> {
- if (future.isCompletedExceptionally()) {
- newFuture.completeExceptionally(error);
+ public static <T> CompletableFuture<T> orderedFuture(
+ CompletableFuture<T> future,
+ Executor orderedExecutor,
+ Executor threadPoolExecutor) {
+ BlockingAwareFuture<T> newFuture = new BlockingAwareFuture<T>();
+ future.whenComplete((result, error) -> {
+ Runnable completer = () -> {
+ if (future.isCompletedExceptionally()) {
+ newFuture.completeExceptionally(error);
+ } else {
+ newFuture.complete(result);
+ }
+ };
+
+ if (newFuture.isBlocked()) {
+ threadPoolExecutor.execute(completer);
} else {
- newFuture.complete(result);
+ orderedExecutor.execute(completer);
}
- }));
+ });
return newFuture;
}