Utilities for composing future results
Change-Id: Ie2ecfdedb69638fe7131879caa3b3708c4746006
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 2a6b2c7..6378024 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -25,13 +25,12 @@
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.onlab.util.Match;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.primitives.MapUpdate;
@@ -41,9 +40,10 @@
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
/**
* {@link AsyncConsistentMap} that has its entries partitioned horizontally across
@@ -73,12 +73,9 @@
@Override
public CompletableFuture<Integer> size() {
- AtomicInteger totalSize = new AtomicInteger(0);
- return CompletableFuture.allOf(getMaps()
- .stream()
- .map(map -> map.size().thenAccept(totalSize::addAndGet))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> totalSize.get());
+ return Tools.allOf(getMaps().stream().map(m -> m.size()).collect(Collectors.toList()),
+ Math::addExact,
+ 0);
}
@Override
@@ -93,12 +90,9 @@
@Override
public CompletableFuture<Boolean> containsValue(V value) {
- AtomicBoolean contains = new AtomicBoolean(false);
- return CompletableFuture.allOf(getMaps().stream()
- .map(map -> map.containsValue(value)
- .thenAccept(v -> contains.set(contains.get() || v)))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> contains.get());
+ return Tools.firstOf(getMaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
+ Match.ifValue(true),
+ false);
}
@Override
public CompletableFuture<Versioned<V>> get(K key) {
@@ -136,29 +130,23 @@
@Override
public CompletableFuture<Set<K>> keySet() {
- Set<K> allKeys = Sets.newConcurrentHashSet();
- return CompletableFuture.allOf(getMaps().stream()
- .map(map -> map.keySet().thenAccept(allKeys::addAll))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> allKeys);
+ return Tools.allOf(getMaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
+ (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
+ ImmutableSet.of());
}
@Override
public CompletableFuture<Collection<Versioned<V>>> values() {
- List<Versioned<V>> allValues = Lists.newCopyOnWriteArrayList();
- return CompletableFuture.allOf(getMaps().stream()
- .map(map -> map.values().thenAccept(allValues::addAll))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> allValues);
+ return Tools.allOf(getMaps().stream().map(m -> m.values()).collect(Collectors.toList()),
+ (c1, c2) -> ImmutableList.<Versioned<V>>builder().addAll(c1).addAll(c2).build(),
+ ImmutableList.of());
}
@Override
public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
- Set<Entry<K, Versioned<V>>> allEntries = Sets.newConcurrentHashSet();
- return CompletableFuture.allOf(getMaps().stream()
- .map(map -> map.entrySet().thenAccept(allEntries::addAll))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> allEntries);
+ return Tools.allOf(getMaps().stream().map(m -> m.entrySet()).collect(Collectors.toList()),
+ (s1, s2) -> ImmutableSet.<Entry<K, Versioned<V>>>builder().addAll(s1).addAll(s2).build(),
+ ImmutableSet.of());
}
@Override
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 73d2a36..60a80c1 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -15,11 +15,10 @@
*/
package org.onlab.util;
-import com.google.common.base.Charsets;
-import com.google.common.base.Strings;
-import com.google.common.primitives.UnsignedLongs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
+import static java.nio.file.Files.delete;
+import static java.nio.file.Files.walkFileTree;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
@@ -44,16 +43,20 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import static java.nio.file.Files.delete;
-import static java.nio.file.Files.walkFileTree;
-import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
-import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.UnsignedLongs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Miscellaneous utility methods.
@@ -622,6 +625,53 @@
}
/**
+ * Returns a new CompletableFuture completed by reducing a list of computed values
+ * when all of the given CompletableFuture complete.
+ *
+ * @param futures the CompletableFutures
+ * @param reducer reducer for computing the result
+ * @param emptyValue zero value to be returned if the input future list is empty
+ * @param <T> value type of CompletableFuture
+ * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
+ */
+ public static <T> CompletableFuture<T> allOf(List<CompletableFuture<T>> futures,
+ BinaryOperator<T> reducer,
+ T emptyValue) {
+ return Tools.allOf(futures)
+ .thenApply(resultList -> resultList.stream().reduce(reducer).orElse(emptyValue));
+ }
+
+ /**
+ * Returns a new CompletableFuture completed by with the first positive result from a list of
+ * input CompletableFutures.
+ *
+ * @param futures the input list of CompletableFutures
+ * @param positiveResultMatcher matcher to identify a positive result
+ * @param negativeResult value to complete with if none of the futures complete with a positive result
+ * @param <T> value type of CompletableFuture
+ * @return a new CompletableFuture
+ */
+ public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures,
+ Match<T> positiveResultMatcher,
+ T negativeResult) {
+ CompletableFuture<T> responseFuture = new CompletableFuture<>();
+ Tools.allOf(Lists.transform(futures, future -> future.thenAccept(r -> {
+ if (positiveResultMatcher.matches(r)) {
+ responseFuture.complete(r);
+ }
+ }))).whenComplete((r, e) -> {
+ if (!responseFuture.isDone()) {
+ if (e != null) {
+ responseFuture.completeExceptionally(e);
+ } else {
+ responseFuture.complete(negativeResult);
+ }
+ }
+ });
+ return responseFuture;
+ }
+
+ /**
* Returns the contents of {@code ByteBuffer} as byte array.
* <p>
* WARNING: There is a performance cost due to array copy