Implement lazy iterators/streams for ConsistentMap

Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
index b686687..5d9e5d8 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
@@ -20,6 +20,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -380,6 +381,11 @@
         }
 
         @Override
+        public Iterator<Map.Entry<K, Versioned<V>>> iterator() {
+            return map.entrySet().iterator();
+        }
+
+        @Override
         public void addListener(MapEventListener<K, V> listener, Executor executor) {
             throw new UnsupportedOperationException();
         }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
index c292959..a7cd94a 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -15,7 +15,16 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftContext;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
 import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.session.SessionId;
 import io.atomix.protocols.raft.session.impl.RaftSessionContext;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.protocols.raft.storage.snapshot.Snapshot;
@@ -24,15 +33,23 @@
 import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
 import io.atomix.storage.StorageLevel;
 import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
 import org.junit.Test;
 import org.onosproject.store.service.Versioned;
 
+import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import static org.onosproject.store.service.DistributedPrimitive.Type.LEADER_ELECTOR;
 
 /**
  * Consistent map service test.
@@ -47,13 +64,42 @@
                 .build());
         Snapshot snapshot = store.newSnapshot(2, new WallClockTimestamp());
 
+        DefaultServiceContext context = mock(DefaultServiceContext.class);
+        expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
+        expect(context.serviceName()).andReturn("test").anyTimes();
+        expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+
+        RaftContext server = mock(RaftContext.class);
+        expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+        replay(context, server);
+
+        RaftSession session = new RaftSessionContext(
+            SessionId.from(1),
+            MemberId.from("1"),
+            "test",
+            ServiceType.from(LEADER_ELECTOR.name()),
+            ReadConsistency.LINEARIZABLE,
+            100,
+            5000,
+            System.currentTimeMillis(),
+            context,
+            server,
+            new SingleThreadContextFactory(new AtomixThreadFactory()));
+
         AtomixConsistentMapService service = new AtomixConsistentMapService();
         service.put(new DefaultCommit<>(
                 2,
                 PUT,
                 new Put("foo", "Hello world!".getBytes()),
-                mock(RaftSessionContext.class),
+                session,
                 System.currentTimeMillis()));
+        service.openIterator(new DefaultCommit<>(
+            3,
+            OPEN_ITERATOR,
+            null,
+            session,
+            System.currentTimeMillis()));
 
         try (SnapshotWriter writer = snapshot.openWriter()) {
             service.snapshot(writer);
@@ -74,5 +120,12 @@
                 System.currentTimeMillis()));
         assertNotNull(value);
         assertArrayEquals("Hello world!".getBytes(), value.value());
+
+        assertEquals(1, service.next(new DefaultCommit<>(
+            4,
+            NEXT,
+            new AtomixConsistentMapOperations.IteratorPosition(3L, 0),
+            session,
+            System.currentTimeMillis())).entries().size());
     }
 }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 3e7a872..8753783 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,7 @@
 import org.onosproject.store.primitives.impl.DistributedPrimitives;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
@@ -34,12 +35,16 @@
 import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.is;
@@ -634,6 +639,25 @@
         assertEquals("1.0.1:Hello world again!", map1.get("bar").join().value());
     }
 
+    @Test
+    public void testIterator() throws Exception {
+        AtomixConsistentMap map = newPrimitive("testIterator");
+        for (int i = 0; i < 100; i++) {
+            for (int j = 0; j < 100; j++) {
+                map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+            }
+        }
+
+        List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+        AsyncIterator<Map.Entry<String, Versioned<byte[]>>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
+        while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+            map.put("foo", UUID.randomUUID().toString().getBytes()).join();
+            entries.add(iterator.next().get(5, TimeUnit.SECONDS));
+        }
+        assertEquals(100, entries.size());
+        assertEquals(101, map.asConsistentMap().stream().count());
+    }
+
     private static class TestMapEventListener implements MapEventListener<String, byte[]> {
 
         private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
index 941009c..1548fa5 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
@@ -20,14 +20,20 @@
 import io.atomix.protocols.raft.service.RaftService;
 import org.junit.Test;
 import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -480,6 +486,25 @@
         //map.delete().join();
     }
 
+    @Test
+    public void testIterator() throws Exception {
+        AtomixConsistentTreeMap map = newPrimitive("testIterator");
+        for (int i = 0; i < 100; i++) {
+            for (int j = 0; j < 100; j++) {
+                map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+            }
+        }
+
+        List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+        AsyncIterator<Map.Entry<String, Versioned<byte[]>>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
+        while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+            map.put("foo", UUID.randomUUID().toString().getBytes()).join();
+            entries.add(iterator.next().get(5, TimeUnit.SECONDS));
+        }
+        assertEquals(100, entries.size());
+        assertEquals(101, map.asConsistentMap().stream().count());
+    }
+
     private AtomixConsistentTreeMap createResource(String mapName) {
         try {
             AtomixConsistentTreeMap map = newPrimitive(mapName);