Implement lazy iterators/streams for ConsistentMap
Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 917f44b..193a250 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -30,6 +30,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -150,6 +151,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+ return delegateMap.iterator();
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return delegateMap.addListener(listener, executor);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index 2bee5eb..b3a1f95 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -30,6 +30,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -239,6 +240,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V>>>> iterator() {
+ return delegateMap.iterator();
+ }
+
+ @Override
public CompletableFuture<Void> addListener(
MapEventListener<String, V> listener, Executor executor) {
return delegateMap.addListener(listener, executor);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index d5d4c73..38ba0f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -18,8 +18,11 @@
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -35,6 +38,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -185,6 +189,12 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+ return Tools.allOf(getMaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
+ .thenApply(PartitionedMultimapIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return CompletableFuture.allOf(getMaps().stream()
.map(map -> map.addListener(listener, executor))
@@ -254,4 +264,42 @@
private Collection<AsyncConsistentMap<K, V>> getMaps() {
return partitions.values();
}
+
+ private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, Versioned<V>>> {
+ private final Iterator<AsyncIterator<Entry<K, Versioned<V>>>> iterators;
+ private volatile AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+ public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, Versioned<V>>>> iterators) {
+ this.iterators = iterators.iterator();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ if (iterator == null && iterators.hasNext()) {
+ iterator = iterators.next();
+ }
+ if (iterator == null) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return iterator.hasNext()
+ .thenCompose(hasNext -> {
+ if (!hasNext) {
+ iterator = null;
+ return hasNext();
+ }
+ return CompletableFuture.completedFuture(true);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Entry<K, Versioned<V>>> next() {
+ if (iterator == null && iterators.hasNext()) {
+ iterator = iterators.next();
+ }
+ if (iterator == null) {
+ return Tools.exceptionalFuture(new NoSuchElementException());
+ }
+ return iterator.next();
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 4e662b0..d68ada5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -21,6 +21,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
@@ -248,6 +249,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K1, Versioned<V1>>>> iterator() {
+ return backingMap.iterator().thenApply(TranscodingIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener, Executor executor) {
synchronized (listeners) {
InternalBackingMapEventListener backingMapListener =
@@ -328,6 +334,25 @@
return backingMap.statusChangeListeners();
}
+ private class TranscodingIterator implements AsyncIterator<Entry<K1, Versioned<V1>>> {
+ private final AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator;
+
+ public TranscodingIterator(AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<K1, Versioned<V1>>> next() {
+ return iterator.next().thenApply(entry ->
+ Maps.immutableEntry(keyDecoder.apply(entry.getKey()), entry.getValue().map(valueDecoder)));
+ }
+ }
+
private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index fcaca0e..d8e686e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -21,6 +21,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
@@ -340,6 +341,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V1>>>> iterator() {
+ return backingMap.iterator().thenApply(TranscodingIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(
MapEventListener<String, V1> listener,
Executor executor) {
@@ -387,6 +393,25 @@
throw new UnsupportedOperationException("This operation is not yet supported.");
}
+ private class TranscodingIterator implements AsyncIterator<Map.Entry<String, Versioned<V1>>> {
+ private final AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator;
+
+ public TranscodingIterator(AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V1>>> next() {
+ return iterator.next().thenApply(entry ->
+ Maps.immutableEntry(entry.getKey(), entry.getValue().map(valueDecoder)));
+ }
+ }
+
private class InternalBackingMapEventListener
implements MapEventListener<String, V2> {