Refactor multimap streams to avoid blocking iteration on initialization

Change-Id: I6a357b37e85808972267ef2daf5328fd5035aac4
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
index f9418a4..5efdeb3 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
@@ -18,7 +18,16 @@
 import java.util.Collection;
 import java.util.Collections;
 
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftContext;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
 import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.session.SessionId;
 import io.atomix.protocols.raft.session.impl.RaftSessionContext;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.protocols.raft.storage.snapshot.Snapshot;
@@ -27,16 +36,23 @@
 import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
 import io.atomix.storage.StorageLevel;
 import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
 import org.junit.Test;
 import org.onlab.util.Match;
 import org.onosproject.store.service.Versioned;
 
+import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.service.DistributedPrimitive.Type.LEADER_ELECTOR;
 
 /**
  * Consistent set multimap service test.
@@ -46,19 +62,48 @@
     @SuppressWarnings("unchecked")
     public void testSnapshot() throws Exception {
         SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
-                .withPrefix("test")
-                .withStorageLevel(StorageLevel.MEMORY)
-                .build());
+            .withPrefix("test")
+            .withStorageLevel(StorageLevel.MEMORY)
+            .build());
         Snapshot snapshot = store.newSnapshot(2, new WallClockTimestamp());
 
+        DefaultServiceContext context = mock(DefaultServiceContext.class);
+        expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
+        expect(context.serviceName()).andReturn("test").anyTimes();
+        expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+
+        RaftContext server = mock(RaftContext.class);
+        expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+        replay(context, server);
+
+        RaftSession session = new RaftSessionContext(
+            SessionId.from(1),
+            MemberId.from("1"),
+            "test",
+            ServiceType.from(LEADER_ELECTOR.name()),
+            ReadConsistency.LINEARIZABLE,
+            100,
+            5000,
+            System.currentTimeMillis(),
+            context,
+            server,
+            new SingleThreadContextFactory(new AtomixThreadFactory()));
+
         AtomixConsistentSetMultimapService service = new AtomixConsistentSetMultimapService();
         service.put(new DefaultCommit<>(
-                2,
-                PUT,
-                new AtomixConsistentSetMultimapOperations.Put(
-                        "foo", Collections.singletonList("Hello world!".getBytes()), Match.ANY),
-                mock(RaftSessionContext.class),
-                System.currentTimeMillis()));
+            2,
+            PUT,
+            new AtomixConsistentSetMultimapOperations.Put(
+                "foo", Collections.singletonList("Hello world!".getBytes()), Match.ANY),
+            session,
+            System.currentTimeMillis()));
+        service.openIterator(new DefaultCommit<>(
+            3,
+            OPEN_ITERATOR,
+            null,
+            session,
+            System.currentTimeMillis()));
 
         try (SnapshotWriter writer = snapshot.openWriter()) {
             service.snapshot(writer);
@@ -72,13 +117,20 @@
         }
 
         Versioned<Collection<? extends byte[]>> value = service.get(new DefaultCommit<>(
-                2,
-                GET,
-                new AtomixConsistentSetMultimapOperations.Get("foo"),
-                mock(RaftSessionContext.class),
-                System.currentTimeMillis()));
+            3,
+            GET,
+            new AtomixConsistentSetMultimapOperations.Get("foo"),
+            session,
+            System.currentTimeMillis()));
         assertNotNull(value);
         assertEquals(1, value.value().size());
         assertArrayEquals("Hello world!".getBytes(), value.value().iterator().next());
+
+        assertEquals(1, service.next(new DefaultCommit<>(
+            4,
+            NEXT,
+            new AtomixConsistentSetMultimapOperations.IteratorPosition(3L, 0),
+            session,
+            System.currentTimeMillis())).entries().size());
     }
 }