Implement lazy iterators/streams for ConsistentMap

Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 3e7a872..8753783 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,7 @@
 import org.onosproject.store.primitives.impl.DistributedPrimitives;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
@@ -34,12 +35,16 @@
 import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.is;
@@ -634,6 +639,25 @@
         assertEquals("1.0.1:Hello world again!", map1.get("bar").join().value());
     }
 
+    @Test
+    public void testIterator() throws Exception {
+        AtomixConsistentMap map = newPrimitive("testIterator");
+        for (int i = 0; i < 100; i++) {
+            for (int j = 0; j < 100; j++) {
+                map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+            }
+        }
+
+        List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+        AsyncIterator<Map.Entry<String, Versioned<byte[]>>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
+        while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+            map.put("foo", UUID.randomUUID().toString().getBytes()).join();
+            entries.add(iterator.next().get(5, TimeUnit.SECONDS));
+        }
+        assertEquals(100, entries.size());
+        assertEquals(101, map.asConsistentMap().stream().count());
+    }
+
     private static class TestMapEventListener implements MapEventListener<String, byte[]> {
 
         private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);