Implement lazy iterators/streams for ConsistentMap

Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
index 1bc62b3..b692bf3 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.primitives;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -32,6 +33,7 @@
 
 import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
@@ -184,6 +186,11 @@
     }
 
     @Override
+    public Iterator<Entry<K, Versioned<V>>> iterator() {
+        return new DefaultIterator<>(complete(asyncMap.iterator()));
+    }
+
+    @Override
     public void addListener(MapEventListener<K, V> listener, Executor executor) {
         complete(asyncMap.addListener(listener, executor));
     }
@@ -208,6 +215,24 @@
         return asyncMap.statusChangeListeners();
     }
 
+    private class DefaultIterator<K, V> implements Iterator<Entry<K, Versioned<V>>> {
+        private final AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+        public DefaultIterator(AsyncIterator<Map.Entry<K, Versioned<V>>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return complete(iterator.hasNext());
+        }
+
+        @Override
+        public Map.Entry<K, Versioned<V>> next() {
+            return complete(iterator.next());
+        }
+    }
+
     @Override
     public Map<K, V> asJavaMap() {
         synchronized (this) {
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
index c9dd5df..7ccf8c7 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
@@ -18,6 +18,7 @@
 
 import com.google.common.base.Throwables;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.ConsistentTreeMap;
 import org.onosproject.store.service.MapEventListener;
@@ -25,6 +26,7 @@
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
@@ -277,6 +279,11 @@
     }
 
     @Override
+    public Iterator<Map.Entry<String, Versioned<V>>> iterator() {
+        return new DefaultIterator<>(complete(treeMap.iterator()));
+    }
+
+    @Override
     public void addListener(MapEventListener<String, V> listener,
                             Executor executor) {
         complete(treeMap.addListener(listener, executor));
@@ -287,6 +294,24 @@
         complete(treeMap.removeListener(listener));
     }
 
+    private class DefaultIterator<K, V> implements Iterator<Map.Entry<K, Versioned<V>>> {
+        private final AsyncIterator<Map.Entry<K, Versioned<V>>> iterator;
+
+        public DefaultIterator(AsyncIterator<Map.Entry<K, Versioned<V>>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return complete(iterator.hasNext());
+        }
+
+        @Override
+        public Map.Entry<K, Versioned<V>> next() {
+            return complete(iterator.next());
+        }
+    }
+
     @Override
     public Map<String, V> asJavaMap() {
         synchronized (this) {
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 749ab5b..45688c0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -17,6 +17,7 @@
 package org.onosproject.store.service;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
@@ -54,7 +55,8 @@
  * the returned future will be {@link CompletableFuture#complete completed} when the
  * operation finishes.
  */
-public interface AsyncConsistentMap<K, V> extends DistributedPrimitive, Transactional<MapUpdate<K, V>> {
+public interface AsyncConsistentMap<K, V>
+    extends DistributedPrimitive, Transactional<MapUpdate<K, V>>, AsyncIterable<Map.Entry<K, Versioned<V>>> {
 
     @Override
     default DistributedPrimitive.Type primitiveType() {
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 1f6579b1..304764e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -24,6 +24,8 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -34,7 +36,7 @@
  * @param <K> type of key
  * @param <V> type of value
  */
-public interface ConsistentMap<K, V> extends DistributedPrimitive {
+public interface ConsistentMap<K, V> extends DistributedPrimitive, Iterable<Map.Entry<K, Versioned<V>>> {
 
     /**
      * Returns the number of entries in the map.
@@ -283,6 +285,17 @@
     boolean replace(K key, long oldVersion, V newValue);
 
     /**
+     * Streams entries from the map.
+     * <p>
+     * This method is optimized for large maps.
+     *
+     * @return the map entry stream
+     */
+    default Stream<Entry<K, Versioned<V>>> stream() {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    /**
      * Registers the specified listener to be notified whenever the map is updated.
      *
      * @param listener listener to notify about map events