Implement lazy iterators/streams for ConsistentMap

Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 917f44b..193a250 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -30,6 +30,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Version;
@@ -150,6 +151,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+        return delegateMap.iterator();
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
         return delegateMap.addListener(listener, executor);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index 2bee5eb..b3a1f95 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -30,6 +30,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Version;
@@ -239,6 +240,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V>>>> iterator() {
+        return delegateMap.iterator();
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(
             MapEventListener<String, V> listener, Executor executor) {
         return delegateMap.addListener(listener, executor);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index d5d4c73..38ba0f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -18,8 +18,11 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
@@ -35,6 +38,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Version;
@@ -185,6 +189,12 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+        return Tools.allOf(getMaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
+            .thenApply(PartitionedMultimapIterator::new);
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
         return CompletableFuture.allOf(getMaps().stream()
                                                 .map(map -> map.addListener(listener, executor))
@@ -254,4 +264,42 @@
     private Collection<AsyncConsistentMap<K, V>> getMaps() {
         return partitions.values();
     }
+
+    private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, Versioned<V>>> {
+        private final Iterator<AsyncIterator<Entry<K, Versioned<V>>>> iterators;
+        private volatile AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+        public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, Versioned<V>>>> iterators) {
+            this.iterators = iterators.iterator();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            if (iterator == null && iterators.hasNext()) {
+                iterator = iterators.next();
+            }
+            if (iterator == null) {
+                return CompletableFuture.completedFuture(false);
+            }
+            return iterator.hasNext()
+                .thenCompose(hasNext -> {
+                    if (!hasNext) {
+                        iterator = null;
+                        return hasNext();
+                    }
+                    return CompletableFuture.completedFuture(true);
+                });
+        }
+
+        @Override
+        public CompletableFuture<Entry<K, Versioned<V>>> next() {
+            if (iterator == null && iterators.hasNext()) {
+                iterator = iterators.next();
+            }
+            if (iterator == null) {
+                return Tools.exceptionalFuture(new NoSuchElementException());
+            }
+            return iterator.next();
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 4e662b0..d68ada5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -21,6 +21,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
@@ -248,6 +249,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Entry<K1, Versioned<V1>>>> iterator() {
+        return backingMap.iterator().thenApply(TranscodingIterator::new);
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener, Executor executor) {
         synchronized (listeners) {
             InternalBackingMapEventListener backingMapListener =
@@ -328,6 +334,25 @@
         return backingMap.statusChangeListeners();
     }
 
+    private class TranscodingIterator implements AsyncIterator<Entry<K1, Versioned<V1>>> {
+        private final AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator;
+
+        public TranscodingIterator(AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public CompletableFuture<Map.Entry<K1, Versioned<V1>>> next() {
+            return iterator.next().thenApply(entry ->
+                Maps.immutableEntry(keyDecoder.apply(entry.getKey()), entry.getValue().map(valueDecoder)));
+        }
+    }
+
     private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
 
         private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index fcaca0e..d8e686e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -21,6 +21,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
@@ -340,6 +341,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V1>>>> iterator() {
+        return backingMap.iterator().thenApply(TranscodingIterator::new);
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(
             MapEventListener<String, V1> listener,
             Executor executor) {
@@ -387,6 +393,25 @@
         throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 
+    private class TranscodingIterator implements AsyncIterator<Map.Entry<String, Versioned<V1>>> {
+        private final AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator;
+
+        public TranscodingIterator(AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public CompletableFuture<Map.Entry<String, Versioned<V1>>> next() {
+            return iterator.next().thenApply(entry ->
+                Maps.immutableEntry(entry.getKey(), entry.getValue().map(valueDecoder)));
+        }
+    }
+
     private class InternalBackingMapEventListener
             implements MapEventListener<String, V2> {