Implement lazy iterators/streams for ConsistentMap
Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
index 1bc62b3..b692bf3 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
@@ -16,6 +16,7 @@
package org.onosproject.store.primitives;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -32,6 +33,7 @@
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
@@ -184,6 +186,11 @@
}
@Override
+ public Iterator<Entry<K, Versioned<V>>> iterator() {
+ return new DefaultIterator<>(complete(asyncMap.iterator()));
+ }
+
+ @Override
public void addListener(MapEventListener<K, V> listener, Executor executor) {
complete(asyncMap.addListener(listener, executor));
}
@@ -208,6 +215,24 @@
return asyncMap.statusChangeListeners();
}
+ private class DefaultIterator<K, V> implements Iterator<Entry<K, Versioned<V>>> {
+ private final AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+ public DefaultIterator(AsyncIterator<Map.Entry<K, Versioned<V>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return complete(iterator.hasNext());
+ }
+
+ @Override
+ public Map.Entry<K, Versioned<V>> next() {
+ return complete(iterator.next());
+ }
+ }
+
@Override
public Map<K, V> asJavaMap() {
synchronized (this) {
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
index c9dd5df..7ccf8c7 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
@@ -18,6 +18,7 @@
import com.google.common.base.Throwables;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentTreeMap;
import org.onosproject.store.service.MapEventListener;
@@ -25,6 +26,7 @@
import org.onosproject.store.service.Versioned;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
@@ -277,6 +279,11 @@
}
@Override
+ public Iterator<Map.Entry<String, Versioned<V>>> iterator() {
+ return new DefaultIterator<>(complete(treeMap.iterator()));
+ }
+
+ @Override
public void addListener(MapEventListener<String, V> listener,
Executor executor) {
complete(treeMap.addListener(listener, executor));
@@ -287,6 +294,24 @@
complete(treeMap.removeListener(listener));
}
+ private class DefaultIterator<K, V> implements Iterator<Map.Entry<K, Versioned<V>>> {
+ private final AsyncIterator<Map.Entry<K, Versioned<V>>> iterator;
+
+ public DefaultIterator(AsyncIterator<Map.Entry<K, Versioned<V>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return complete(iterator.hasNext());
+ }
+
+ @Override
+ public Map.Entry<K, Versioned<V>> next() {
+ return complete(iterator.next());
+ }
+ }
+
@Override
public Map<String, V> asJavaMap() {
synchronized (this) {
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 749ab5b..45688c0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -17,6 +17,7 @@
package org.onosproject.store.service;
import java.util.Collection;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
@@ -54,7 +55,8 @@
* the returned future will be {@link CompletableFuture#complete completed} when the
* operation finishes.
*/
-public interface AsyncConsistentMap<K, V> extends DistributedPrimitive, Transactional<MapUpdate<K, V>> {
+public interface AsyncConsistentMap<K, V>
+ extends DistributedPrimitive, Transactional<MapUpdate<K, V>>, AsyncIterable<Map.Entry<K, Versioned<V>>> {
@Override
default DistributedPrimitive.Type primitiveType() {
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 1f6579b1..304764e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -24,6 +24,8 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import com.google.common.util.concurrent.MoreExecutors;
@@ -34,7 +36,7 @@
* @param <K> type of key
* @param <V> type of value
*/
-public interface ConsistentMap<K, V> extends DistributedPrimitive {
+public interface ConsistentMap<K, V> extends DistributedPrimitive, Iterable<Map.Entry<K, Versioned<V>>> {
/**
* Returns the number of entries in the map.
@@ -283,6 +285,17 @@
boolean replace(K key, long oldVersion, V newValue);
/**
+ * Streams entries from the map.
+ * <p>
+ * This method is optimized for large maps.
+ *
+ * @return the map entry stream
+ */
+ default Stream<Entry<K, Versioned<V>>> stream() {
+ return StreamSupport.stream(spliterator(), false);
+ }
+
+ /**
* Registers the specified listener to be notified whenever the map is updated.
*
* @param listener listener to notify about map events
diff --git a/core/api/src/test/java/org/onosproject/store/primitives/AsyncConsistentTreeMapAdapter.java b/core/api/src/test/java/org/onosproject/store/primitives/AsyncConsistentTreeMapAdapter.java
index 9f6da1a..7e014cc 100644
--- a/core/api/src/test/java/org/onosproject/store/primitives/AsyncConsistentTreeMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/primitives/AsyncConsistentTreeMapAdapter.java
@@ -19,6 +19,7 @@
import com.google.common.collect.Maps;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentTreeMap;
import org.onosproject.store.service.MapEventListener;
@@ -367,4 +368,9 @@
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return null;
}
+
+ @Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V>>>> iterator() {
+ return null;
+ }
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
index 85e8e19..f39e7d0 100644
--- a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
@@ -134,6 +134,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<K, Versioned<V>>>> iterator() {
+ return null;
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return null;
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
index 18f9d63..a8db799 100644
--- a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
@@ -16,6 +16,7 @@
package org.onosproject.store.service;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -155,6 +156,11 @@
}
@Override
+ public Iterator<Map.Entry<K, Versioned<V>>> iterator() {
+ return null;
+ }
+
+ @Override
public void addListener(MapEventListener<K, V> listener, Executor executor) {
}