Utilities for composing future results

Change-Id: Ie2ecfdedb69638fe7131879caa3b3708c4746006
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 2a6b2c7..6378024 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -25,13 +25,12 @@
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.onlab.util.Match;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.primitives.MapUpdate;
@@ -41,9 +40,10 @@
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
  * {@link AsyncConsistentMap} that has its entries partitioned horizontally across
@@ -73,12 +73,9 @@
 
     @Override
     public CompletableFuture<Integer> size() {
-        AtomicInteger totalSize = new AtomicInteger(0);
-        return CompletableFuture.allOf(getMaps()
-                                      .stream()
-                                      .map(map -> map.size().thenAccept(totalSize::addAndGet))
-                                      .toArray(CompletableFuture[]::new))
-                                .thenApply(v -> totalSize.get());
+        return Tools.allOf(getMaps().stream().map(m -> m.size()).collect(Collectors.toList()),
+                            Math::addExact,
+                            0);
     }
 
     @Override
@@ -93,12 +90,9 @@
 
     @Override
     public CompletableFuture<Boolean> containsValue(V value) {
-        AtomicBoolean contains = new AtomicBoolean(false);
-        return CompletableFuture.allOf(getMaps().stream()
-                                                .map(map -> map.containsValue(value)
-                                                               .thenAccept(v -> contains.set(contains.get() || v)))
-                                                .toArray(CompletableFuture[]::new))
-                                .thenApply(v -> contains.get());
+        return Tools.firstOf(getMaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
+                            Match.ifValue(true),
+                            false);
     }
     @Override
     public CompletableFuture<Versioned<V>> get(K key) {
@@ -136,29 +130,23 @@
 
     @Override
     public CompletableFuture<Set<K>> keySet() {
-        Set<K> allKeys = Sets.newConcurrentHashSet();
-        return CompletableFuture.allOf(getMaps().stream()
-                                                .map(map -> map.keySet().thenAccept(allKeys::addAll))
-                                                .toArray(CompletableFuture[]::new))
-                                .thenApply(v -> allKeys);
+        return Tools.allOf(getMaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
+                    (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
+                    ImmutableSet.of());
     }
 
     @Override
     public CompletableFuture<Collection<Versioned<V>>> values() {
-        List<Versioned<V>> allValues = Lists.newCopyOnWriteArrayList();
-        return CompletableFuture.allOf(getMaps().stream()
-                                                .map(map -> map.values().thenAccept(allValues::addAll))
-                                                .toArray(CompletableFuture[]::new))
-                                .thenApply(v -> allValues);
+        return Tools.allOf(getMaps().stream().map(m -> m.values()).collect(Collectors.toList()),
+                    (c1, c2) -> ImmutableList.<Versioned<V>>builder().addAll(c1).addAll(c2).build(),
+                    ImmutableList.of());
     }
 
     @Override
     public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
-        Set<Entry<K, Versioned<V>>> allEntries = Sets.newConcurrentHashSet();
-        return CompletableFuture.allOf(getMaps().stream()
-                                                .map(map -> map.entrySet().thenAccept(allEntries::addAll))
-                                                .toArray(CompletableFuture[]::new))
-                                .thenApply(v -> allEntries);
+        return Tools.allOf(getMaps().stream().map(m -> m.entrySet()).collect(Collectors.toList()),
+                (s1, s2) -> ImmutableSet.<Entry<K, Versioned<V>>>builder().addAll(s1).addAll(s2).build(),
+                ImmutableSet.of());
     }
 
     @Override
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 73d2a36..60a80c1 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -15,11 +15,10 @@
  */
 package org.onlab.util;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Strings;
-import com.google.common.primitives.UnsignedLongs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
+import static java.nio.file.Files.delete;
+import static java.nio.file.Files.walkFileTree;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.File;
 import java.io.IOException;
@@ -44,16 +43,20 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BinaryOperator;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
-import static java.nio.file.Files.delete;
-import static java.nio.file.Files.walkFileTree;
-import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
-import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.UnsignedLongs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Miscellaneous utility methods.
@@ -622,6 +625,53 @@
     }
 
     /**
+     * Returns a new CompletableFuture completed by reducing a list of computed values
+     * when all of the given CompletableFuture complete.
+     *
+     * @param futures the CompletableFutures
+     * @param reducer reducer for computing the result
+     * @param emptyValue zero value to be returned if the input future list is empty
+     * @param <T> value type of CompletableFuture
+     * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
+     */
+    public static <T> CompletableFuture<T> allOf(List<CompletableFuture<T>> futures,
+                                                 BinaryOperator<T> reducer,
+                                                 T emptyValue) {
+        return Tools.allOf(futures)
+                    .thenApply(resultList -> resultList.stream().reduce(reducer).orElse(emptyValue));
+    }
+
+    /**
+     * Returns a new CompletableFuture completed by with the first positive result from a list of
+     * input CompletableFutures.
+     *
+     * @param futures the input list of CompletableFutures
+     * @param positiveResultMatcher matcher to identify a positive result
+     * @param negativeResult value to complete with if none of the futures complete with a positive result
+     * @param <T> value type of CompletableFuture
+     * @return a new CompletableFuture
+     */
+    public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures,
+                                                   Match<T> positiveResultMatcher,
+                                                   T negativeResult) {
+        CompletableFuture<T> responseFuture = new CompletableFuture<>();
+        Tools.allOf(Lists.transform(futures, future -> future.thenAccept(r -> {
+            if (positiveResultMatcher.matches(r)) {
+                responseFuture.complete(r);
+            }
+        }))).whenComplete((r, e) -> {
+            if (!responseFuture.isDone()) {
+                if (e != null) {
+                    responseFuture.completeExceptionally(e);
+                } else {
+                    responseFuture.complete(negativeResult);
+                }
+            }
+        });
+        return responseFuture;
+    }
+
+    /**
      * Returns the contents of {@code ByteBuffer} as byte array.
      * <p>
      * WARNING: There is a performance cost due to array copy