Attemp to improve hasPendingUpdates()
- Create update list using Stream, so that
hasPendingUpdates() can short cut on first update found.
- Might improve performance when there is large number of updates in a Tx.
Change-Id: I20820b7212c642315a620d715b5c750e35d31dd0
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
index 9c8831e..469cf11 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
@@ -20,7 +20,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.service.AsyncConsistentMap;
@@ -189,41 +191,49 @@
return updates().size();
}
- protected List<MapUpdate<K, V>> updates() {
- List<MapUpdate<K, V>> updates = Lists.newLinkedList();
- deleteSet.forEach(key -> {
- Versioned<V> original = readCache.get(key);
- if (original != null) {
- updates.add(MapUpdate.<K, V>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
- .withKey(key)
- .withCurrentVersion(original.version())
- .build());
- }
- });
- writeCache.forEach((key, value) -> {
- Versioned<V> original = readCache.get(key);
- if (original == null) {
- updates.add(MapUpdate.<K, V>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey(key)
- .withValue(value)
- .build());
- } else {
- updates.add(MapUpdate.<K, V>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
- .withKey(key)
- .withCurrentVersion(original.version())
- .withValue(value)
- .build());
- }
- });
- return updates;
+ @Override
+ public boolean hasPendingUpdates() {
+ return updatesStream().findAny().isPresent();
}
+ protected Stream<MapUpdate<K, V>> updatesStream() {
+ return Stream.concat(
+ // 1st stream: delete ops
+ deleteSet.stream()
+ .map(key -> Pair.of(key, readCache.get(key)))
+ .filter(e -> e.getValue() != null)
+ .map(e -> MapUpdate.<K, V>newBuilder()
+ .withMapName(name)
+ .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withKey(e.getKey())
+ .withCurrentVersion(e.getValue().version())
+ .build()),
+ // 2nd stream: write ops
+ writeCache.entrySet().stream()
+ .map(e -> {
+ Versioned<V> original = readCache.get(e.getKey());
+ if (original == null) {
+ return MapUpdate.<K, V>newBuilder()
+ .withMapName(name)
+ .withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withKey(e.getKey())
+ .withValue(e.getValue())
+ .build();
+ } else {
+ return MapUpdate.<K, V>newBuilder()
+ .withMapName(name)
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey(e.getKey())
+ .withCurrentVersion(original.version())
+ .withValue(e.getValue())
+ .build();
+ }
+ }));
+ }
+
+ protected List<MapUpdate<K, V>> updates() {
+ return updatesStream().collect(Collectors.toList());
+ }
protected List<MapUpdate<String, byte[]>> toMapUpdates() {
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();