[ONOS-6267] Detect and complete blocked futures on I/O threads.

Change-Id: I0488dc5096f9e610b97405ad05c02d0ff3854b5f
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 883a7c8..0c76a91 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -643,28 +643,41 @@
     }
 
     /**
-     * Returns a future that's completed using the given {@link Executor} once the given {@code future} is completed.
+     * Returns a future that's completed using the given {@code orderedExecutor} if the future is not blocked or the
+     * given {@code threadPoolExecutor} if the future is blocked.
      * <p>
-     * {@link CompletableFuture}'s async methods cannot be relied upon to complete futures on an executor thread. If a
-     * future is completed synchronously, {@code CompletableFuture} async methods will often complete the future on the
-     * current thread, ignoring the provided {@code Executor}. This method ensures a more reliable and consistent thread
-     * model by ensuring that futures are always completed using the provided {@code Executor}.
+     * This method allows futures to maintain single-thread semantics via the provided {@code orderedExecutor} while
+     * ensuring user code can block without blocking completion of futures. When the returned future or any of its
+     * descendants is blocked on a {@link CompletableFuture#get()} or {@link CompletableFuture#join()} call, completion
+     * of the returned future will be done using the provided {@code threadPoolExecutor}.
      *
      * @param future the future to convert into an asynchronous future
-     * @param executor the executor with which to complete the returned future
+     * @param orderedExecutor the ordered executor with which to attempt to complete the future
+     * @param threadPoolExecutor the backup executor with which to complete blocked futures
      * @param <T> future value type
      * @return a new completable future to be completed using the provided {@code executor} once the provided
      * {@code future} is complete
      */
-    public static <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future, Executor executor) {
-        CompletableFuture<T> newFuture = new CompletableFuture<T>();
-        future.whenComplete((result, error) -> executor.execute(() -> {
-            if (future.isCompletedExceptionally()) {
-                newFuture.completeExceptionally(error);
+    public static <T> CompletableFuture<T> orderedFuture(
+            CompletableFuture<T> future,
+            Executor orderedExecutor,
+            Executor threadPoolExecutor) {
+        BlockingAwareFuture<T> newFuture = new BlockingAwareFuture<T>();
+        future.whenComplete((result, error) -> {
+            Runnable completer = () -> {
+                if (future.isCompletedExceptionally()) {
+                    newFuture.completeExceptionally(error);
+                } else {
+                    newFuture.complete(result);
+                }
+            };
+
+            if (newFuture.isBlocked()) {
+                threadPoolExecutor.execute(completer);
             } else {
-                newFuture.complete(result);
+                orderedExecutor.execute(completer);
             }
-        }));
+        });
         return newFuture;
     }