Utilities for composing future results
Change-Id: Ie2ecfdedb69638fe7131879caa3b3708c4746006
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 73d2a36..60a80c1 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -15,11 +15,10 @@
*/
package org.onlab.util;
-import com.google.common.base.Charsets;
-import com.google.common.base.Strings;
-import com.google.common.primitives.UnsignedLongs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
+import static java.nio.file.Files.delete;
+import static java.nio.file.Files.walkFileTree;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
@@ -44,16 +43,20 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import static java.nio.file.Files.delete;
-import static java.nio.file.Files.walkFileTree;
-import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
-import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.UnsignedLongs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Miscellaneous utility methods.
@@ -622,6 +625,53 @@
}
/**
+ * Returns a new CompletableFuture completed by reducing a list of computed values
+ * when all of the given CompletableFuture complete.
+ *
+ * @param futures the CompletableFutures
+ * @param reducer reducer for computing the result
+ * @param emptyValue zero value to be returned if the input future list is empty
+ * @param <T> value type of CompletableFuture
+ * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
+ */
+ public static <T> CompletableFuture<T> allOf(List<CompletableFuture<T>> futures,
+ BinaryOperator<T> reducer,
+ T emptyValue) {
+ return Tools.allOf(futures)
+ .thenApply(resultList -> resultList.stream().reduce(reducer).orElse(emptyValue));
+ }
+
+ /**
+ * Returns a new CompletableFuture completed by with the first positive result from a list of
+ * input CompletableFutures.
+ *
+ * @param futures the input list of CompletableFutures
+ * @param positiveResultMatcher matcher to identify a positive result
+ * @param negativeResult value to complete with if none of the futures complete with a positive result
+ * @param <T> value type of CompletableFuture
+ * @return a new CompletableFuture
+ */
+ public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures,
+ Match<T> positiveResultMatcher,
+ T negativeResult) {
+ CompletableFuture<T> responseFuture = new CompletableFuture<>();
+ Tools.allOf(Lists.transform(futures, future -> future.thenAccept(r -> {
+ if (positiveResultMatcher.matches(r)) {
+ responseFuture.complete(r);
+ }
+ }))).whenComplete((r, e) -> {
+ if (!responseFuture.isDone()) {
+ if (e != null) {
+ responseFuture.completeExceptionally(e);
+ } else {
+ responseFuture.complete(negativeResult);
+ }
+ }
+ });
+ return responseFuture;
+ }
+
+ /**
* Returns the contents of {@code ByteBuffer} as byte array.
* <p>
* WARNING: There is a performance cost due to array copy