Refactor multimap streams to avoid blocking iteration on initialization

Change-Id: I6a357b37e85808972267ef2daf5328fd5035aac4
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index cd9e6ef..0d15ddc 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -69,7 +69,7 @@
         <dependency>
             <groupId>io.atomix</groupId>
             <artifactId>atomix</artifactId>
-            <version>2.0.22</version>
+            <version>2.0.23</version>
         </dependency>
 
         <dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index 39d49fe..b04c9dd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -17,21 +17,21 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.ConcurrentModificationException;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multiset;
 import io.atomix.protocols.raft.proxy.RaftProxy;
-import io.atomix.utils.concurrent.AtomixFuture;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -43,9 +43,9 @@
 import org.onosproject.store.service.Versioned;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
@@ -56,10 +56,13 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
@@ -89,13 +92,11 @@
         .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
         .build());
 
-    private volatile EntryIterator iterator;
     private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
 
     public AtomixConsistentSetMultimap(RaftProxy proxy) {
         super(proxy);
         proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleChange);
-        proxy.addEventListener(ENTRY, SERIALIZER::decode, this::handleEntry);
         proxy.addStateChangeListener(state -> {
             if (state == RaftProxy.State.CONNECTED && isListening()) {
                 proxy.invoke(ADD_LISTENER);
@@ -108,13 +109,6 @@
             mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
     }
 
-    private void handleEntry(Map.Entry<String, byte[]> entry) {
-        EntryIterator iterator = this.iterator;
-        if (iterator != null) {
-            iterator.add(entry);
-        }
-    }
-
     @Override
     public CompletableFuture<Integer> size() {
         return proxy.invoke(SIZE, SERIALIZER::decode);
@@ -234,51 +228,87 @@
 
     @Override
     public CompletableFuture<AsyncIterator<Map.Entry<String, byte[]>>> iterator() {
-        return proxy.<Integer>invoke(ITERATE, SERIALIZER::decode).thenApply(count -> {
-            iterator = new EntryIterator(count);
-            return iterator;
-        });
+        return proxy.<Long>invoke(OPEN_ITERATOR, SERIALIZER::decode).thenApply(ConsistentMultimapIterator::new);
     }
 
-    private class EntryIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
-        private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> in = new LinkedList<>();
-        private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> out = new LinkedList<>();
-        private final int total;
-        private volatile int count;
+    /**
+     * Consistent multimap iterator.
+     */
+    private class ConsistentMultimapIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
+        private final long id;
+        private volatile CompletableFuture<IteratorBatch> batch;
+        private volatile CompletableFuture<Void> closeFuture;
 
-        EntryIterator(int total) {
-            this.total = total;
+        ConsistentMultimapIterator(long id) {
+            this.id = id;
+            this.batch = CompletableFuture.completedFuture(
+                new IteratorBatch(0, Collections.emptyList()));
         }
 
-        synchronized void add(Map.Entry<String, byte[]> entry) {
-            CompletableFuture<Map.Entry<String, byte[]>> future = out.poll();
-            if (future != null) {
-                future.complete(entry);
-            } else {
-                in.add(CompletableFuture.completedFuture(entry));
+        /**
+         * Returns the current batch iterator or lazily fetches the next batch from the cluster.
+         *
+         * @return the next batch iterator
+         */
+        private CompletableFuture<Iterator<Map.Entry<String, byte[]>>> batch() {
+            return batch.thenCompose(iterator -> {
+                if (iterator != null && !iterator.hasNext()) {
+                    batch = fetch(iterator.position());
+                    return batch.thenApply(Function.identity());
+                }
+                return CompletableFuture.completedFuture(iterator);
+            });
+        }
+
+        /**
+         * Fetches the next batch of entries from the cluster.
+         *
+         * @param position the position from which to fetch the next batch
+         * @return the next batch of entries from the cluster
+         */
+        private CompletableFuture<IteratorBatch> fetch(int position) {
+            return proxy.<IteratorPosition, IteratorBatch>invoke(
+                NEXT,
+                SERIALIZER::encode,
+                new IteratorPosition(id, position),
+                SERIALIZER::decode)
+                .thenCompose(batch -> {
+                    if (batch == null) {
+                        return close().thenApply(v -> null);
+                    }
+                    return CompletableFuture.completedFuture(batch);
+                });
+        }
+
+        /**
+         * Closes the iterator.
+         *
+         * @return future to be completed once the iterator has been closed
+         */
+        private CompletableFuture<Void> close() {
+            if (closeFuture == null) {
+                synchronized (this) {
+                    if (closeFuture == null) {
+                        closeFuture = proxy.invoke(CLOSE_ITERATOR, SERIALIZER::encode, id);
+                    }
+                }
             }
+            return closeFuture;
         }
 
         @Override
-        public synchronized CompletableFuture<Boolean> hasNext() {
-            return CompletableFuture.completedFuture(count < total);
+        public CompletableFuture<Boolean> hasNext() {
+            return batch().thenApply(iterator -> iterator != null && iterator.hasNext());
         }
 
         @Override
-        public synchronized CompletableFuture<Map.Entry<String, byte[]>> next() {
-            if (count == total) {
-                return Tools.exceptionalFuture(new NoSuchElementException());
-            }
-            count++;
-
-            CompletableFuture<Map.Entry<String, byte[]>> next = in.poll();
-            if (next != null) {
-                return next;
-            }
-
-            CompletableFuture<Map.Entry<String, byte[]>> future = new AtomixFuture<>();
-            out.add(future);
-            return future;
+        public CompletableFuture<Map.Entry<String, byte[]>> next() {
+            return batch().thenCompose(iterator -> {
+                if (iterator == null) {
+                    return Tools.exceptionalFuture(new NoSuchElementException());
+                }
+                return CompletableFuture.completedFuture(iterator.next());
+            });
         }
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 6ad93b6..4d38d71 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -18,6 +18,8 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Maps;
@@ -53,7 +55,9 @@
     CLEAR(OperationType.COMMAND),
     ADD_LISTENER(OperationType.COMMAND),
     REMOVE_LISTENER(OperationType.COMMAND),
-    ITERATE(OperationType.COMMAND);
+    OPEN_ITERATOR(OperationType.COMMAND),
+    NEXT(OperationType.QUERY),
+    CLOSE_ITERATOR(OperationType.COMMAND);
 
     private final OperationType type;
 
@@ -86,6 +90,8 @@
             .register(Versioned.class)
             .register(ArrayList.class)
             .register(Maps.immutableEntry("", "").getClass())
+            .register(IteratorBatch.class)
+            .register(IteratorPosition.class)
             .build("AtomixConsistentSetMultimapOperations");
 
     /**
@@ -382,4 +388,77 @@
             super(key);
         }
     }
+
+    /**
+     * Iterator position.
+     */
+    public static class IteratorPosition {
+        private long iteratorId;
+        private int position;
+
+        private IteratorPosition() {
+        }
+
+        public IteratorPosition(long iteratorId, int position) {
+            this.iteratorId = iteratorId;
+            this.position = position;
+        }
+
+        public long iteratorId() {
+            return iteratorId;
+        }
+
+        public int position() {
+            return position;
+        }
+    }
+
+    /**
+     * Iterator batch.
+     */
+    public static class IteratorBatch implements Iterator<Map.Entry<String, byte[]>> {
+        private int position;
+        private Collection<Map.Entry<String, byte[]>> entries;
+        private transient volatile Iterator<Map.Entry<String, byte[]>> iterator;
+
+        private IteratorBatch() {
+        }
+
+        public IteratorBatch(int position, Collection<Map.Entry<String, byte[]>> entries) {
+            this.position = position;
+            this.entries = entries;
+        }
+
+        public int position() {
+            return position;
+        }
+
+        public Collection<Map.Entry<String, byte[]>> entries() {
+            return entries;
+        }
+
+        private Iterator<Map.Entry<String, byte[]>> iterator() {
+            Iterator<Map.Entry<String, byte[]>> iterator = this.iterator;
+            if (iterator == null) {
+                synchronized (entries) {
+                    iterator = this.iterator;
+                    if (iterator == null) {
+                        iterator = entries.iterator();
+                        this.iterator = iterator;
+                    }
+                }
+            }
+            return iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator().hasNext();
+        }
+
+        @Override
+        public Map.Entry<String, byte[]> next() {
+            return iterator().next();
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 5a93bef..6c864b0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -16,11 +16,13 @@
 
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,9 +60,9 @@
 import org.onosproject.store.service.Versioned;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
@@ -71,11 +73,14 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
@@ -93,6 +98,7 @@
  * State Machine for {@link AtomixConsistentSetMultimap} resource.
  */
 public class AtomixConsistentSetMultimapService extends AbstractRaftService {
+    private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
 
     private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
             .register(KryoNamespaces.BASIC)
@@ -119,13 +125,18 @@
 
     private AtomicLong globalVersion = new AtomicLong(1);
     private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
-    private Map<String, MapEntryValue> backingMap = Maps.newHashMap();
+    private Map<String, MapEntryValue> backingMap = Maps.newConcurrentMap();
+    private Map<Long, IteratorContext> iterators = Maps.newHashMap();
 
     @Override
     public void snapshot(SnapshotWriter writer) {
         writer.writeLong(globalVersion.get());
         writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
         writer.writeObject(backingMap, serializer::encode);
+
+        Map<Long, Long> iterators = Maps.newHashMap();
+        this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
+        writer.writeObject(iterators, serializer::encode);
     }
 
     @Override
@@ -138,6 +149,11 @@
         }
 
         backingMap = reader.readObject(serializer::decode);
+
+        Map<Long, Long> iterators = reader.readObject(serializer::decode);
+        this.iterators = Maps.newHashMap();
+        iterators.forEach((id, session) ->
+            this.iterators.put(id, new IteratorContext(session, backingMap.entrySet().iterator())));
     }
 
     @Override
@@ -161,17 +177,21 @@
         executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
         executor.register(ADD_LISTENER, this::listen);
         executor.register(REMOVE_LISTENER, this::unlisten);
-        executor.register(ITERATE, this::iterate, serializer::encode);
+        executor.register(OPEN_ITERATOR, this::openIterator, serializer::encode);
+        executor.register(NEXT, serializer::decode, this::next, serializer::encode);
+        executor.register(CLOSE_ITERATOR, serializer::decode, this::closeIterator);
     }
 
     @Override
     public void onExpire(RaftSession session) {
         listeners.remove(session.sessionId().id());
+        iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
     }
 
     @Override
     public void onClose(RaftSession session) {
         listeners.remove(session.sessionId().id());
+        iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
     }
 
     /**
@@ -492,20 +512,66 @@
     }
 
     /**
-     * Handles an iterate commit.
+     * Handles an open iterator commit.
      *
-     * @param commit the iterate commit
-     * @return count of commit entries
+     * @param commit the open iterator commit
+     * @return iterator identifier
      */
-    protected int iterate(Commit<Void> commit) {
-        int count = 0;
-        for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
-            for (byte[] value : entry.getValue().values()) {
-                commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
-                count++;
+    protected long openIterator(Commit<Void> commit) {
+        iterators.put(commit.index(), new IteratorContext(
+            commit.session().sessionId().id(),
+            backingMap.entrySet().iterator()));
+        return commit.index();
+    }
+
+    /**
+     * Handles an iterator next commit.
+     *
+     * @param commit the next commit
+     * @return a list of entries to iterate
+     */
+    protected IteratorBatch next(Commit<IteratorPosition> commit) {
+        final long iteratorId = commit.value().iteratorId();
+        final int position = commit.value().position();
+
+        IteratorContext context = iterators.get(iteratorId);
+        if (context == null) {
+            return null;
+        }
+
+        List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
+        int size = 0;
+        while (context.iterator.hasNext()) {
+            context.position++;
+            if (context.position > position) {
+                Map.Entry<String, MapEntryValue> entry = context.iterator.next();
+                String key = entry.getKey();
+                int keySize = key.length();
+                for (byte[] value : entry.getValue().values()) {
+                    entries.add(Maps.immutableEntry(key, value));
+                    size += keySize;
+                    size += value.length;
+                }
+
+                if (size >= MAX_ITERATOR_BATCH_SIZE) {
+                    break;
+                }
             }
         }
-        return count;
+
+        if (entries.isEmpty()) {
+            return null;
+        }
+        return new IteratorBatch(context.position, entries);
+    }
+
+    /**
+     * Handles a close iterator commit.
+     *
+     * @param commit the close iterator commit
+     */
+    protected void closeIterator(Commit<Long> commit) {
+        iterators.remove(commit.value());
     }
 
     /**
@@ -787,4 +853,15 @@
             }
         }
     }
+
+    private static class IteratorContext {
+        private final long sessionId;
+        private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
+        private int position = 0;
+
+        IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
+            this.sessionId = sessionId;
+            this.iterator = iterator;
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
index f9418a4..5efdeb3 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
@@ -18,7 +18,16 @@
 import java.util.Collection;
 import java.util.Collections;
 
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftContext;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
 import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.session.SessionId;
 import io.atomix.protocols.raft.session.impl.RaftSessionContext;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.protocols.raft.storage.snapshot.Snapshot;
@@ -27,16 +36,23 @@
 import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
 import io.atomix.storage.StorageLevel;
 import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
 import org.junit.Test;
 import org.onlab.util.Match;
 import org.onosproject.store.service.Versioned;
 
+import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.service.DistributedPrimitive.Type.LEADER_ELECTOR;
 
 /**
  * Consistent set multimap service test.
@@ -46,19 +62,48 @@
     @SuppressWarnings("unchecked")
     public void testSnapshot() throws Exception {
         SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
-                .withPrefix("test")
-                .withStorageLevel(StorageLevel.MEMORY)
-                .build());
+            .withPrefix("test")
+            .withStorageLevel(StorageLevel.MEMORY)
+            .build());
         Snapshot snapshot = store.newSnapshot(2, new WallClockTimestamp());
 
+        DefaultServiceContext context = mock(DefaultServiceContext.class);
+        expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
+        expect(context.serviceName()).andReturn("test").anyTimes();
+        expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+
+        RaftContext server = mock(RaftContext.class);
+        expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+        replay(context, server);
+
+        RaftSession session = new RaftSessionContext(
+            SessionId.from(1),
+            MemberId.from("1"),
+            "test",
+            ServiceType.from(LEADER_ELECTOR.name()),
+            ReadConsistency.LINEARIZABLE,
+            100,
+            5000,
+            System.currentTimeMillis(),
+            context,
+            server,
+            new SingleThreadContextFactory(new AtomixThreadFactory()));
+
         AtomixConsistentSetMultimapService service = new AtomixConsistentSetMultimapService();
         service.put(new DefaultCommit<>(
-                2,
-                PUT,
-                new AtomixConsistentSetMultimapOperations.Put(
-                        "foo", Collections.singletonList("Hello world!".getBytes()), Match.ANY),
-                mock(RaftSessionContext.class),
-                System.currentTimeMillis()));
+            2,
+            PUT,
+            new AtomixConsistentSetMultimapOperations.Put(
+                "foo", Collections.singletonList("Hello world!".getBytes()), Match.ANY),
+            session,
+            System.currentTimeMillis()));
+        service.openIterator(new DefaultCommit<>(
+            3,
+            OPEN_ITERATOR,
+            null,
+            session,
+            System.currentTimeMillis()));
 
         try (SnapshotWriter writer = snapshot.openWriter()) {
             service.snapshot(writer);
@@ -72,13 +117,20 @@
         }
 
         Versioned<Collection<? extends byte[]>> value = service.get(new DefaultCommit<>(
-                2,
-                GET,
-                new AtomixConsistentSetMultimapOperations.Get("foo"),
-                mock(RaftSessionContext.class),
-                System.currentTimeMillis()));
+            3,
+            GET,
+            new AtomixConsistentSetMultimapOperations.Get("foo"),
+            session,
+            System.currentTimeMillis()));
         assertNotNull(value);
         assertEquals(1, value.value().size());
         assertArrayEquals("Hello world!".getBytes(), value.value().iterator().next());
+
+        assertEquals(1, service.next(new DefaultCommit<>(
+            4,
+            NEXT,
+            new AtomixConsistentSetMultimapOperations.IteratorPosition(3L, 0),
+            session,
+            System.currentTimeMillis())).entries().size());
     }
 }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index 6c2912e..b14afb3 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -336,18 +336,19 @@
     @Test
     public void testStreams() throws Exception {
         AtomixConsistentSetMultimap map = createResource("testStreams");
-        for (int i = 0; i < 10000; i++) {
-            allKeys.forEach(key -> {
-                map.put(key, UUID.randomUUID().toString().getBytes()).join();
-            });
+        for (int i = 0; i < 100; i++) {
+            for (int j = 0; j < 100; j++) {
+                map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+            }
         }
 
         List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
         AsyncIterator<Map.Entry<String, byte[]>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
         while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+            map.put(keyOne, UUID.randomUUID().toString().getBytes()).join();
             entries.add(iterator.next().get(5, TimeUnit.SECONDS));
         }
-        assertEquals(40000, entries.size());
+        assertEquals(10000, entries.size());
     }
 
     /**
diff --git a/lib/BUCK b/lib/BUCK
index 2347630..e44da2a 100644
--- a/lib/BUCK
+++ b/lib/BUCK
@@ -1,4 +1,4 @@
-# ***** This file was auto-generated at Tue, 19 Jun 2018 22:30:22 GMT. Do not edit this file manually. *****
+# ***** This file was auto-generated at Sat, 30 Jun 2018 08:02:26 GMT. Do not edit this file manually. *****
 # ***** Use onos-lib-gen *****
 
 pass_thru_pom(
@@ -207,10 +207,10 @@
 
 remote_jar (
   name = 'atomix',
-  out = 'atomix-2.0.22.jar',
-  url = 'mvn:io.atomix:atomix:jar:2.0.22',
-  sha1 = '6c4f4d95ad933ac612ef9ab85b22f5c4f714a3e1',
-  maven_coords = 'io.atomix:atomix:2.0.22',
+  out = 'atomix-2.0.23.jar',
+  url = 'mvn:io.atomix:atomix:jar:2.0.23',
+  sha1 = '6b41cebf257e0094c2276b6fa589b2c73aff5f99',
+  maven_coords = 'io.atomix:atomix:2.0.23',
   visibility = [ 'PUBLIC' ],
 )
 
diff --git a/lib/deps.json b/lib/deps.json
index c8c8512..a0be46d 100644
--- a/lib/deps.json
+++ b/lib/deps.json
@@ -116,7 +116,7 @@
     "aopalliance-repackaged": "mvn:org.glassfish.hk2.external:aopalliance-repackaged:2.5.0-b42",
     "amqp-client": "mvn:com.rabbitmq:amqp-client:jar:3.6.1",
     "asm": "mvn:org.ow2.asm:asm:5.0.4",
-    "atomix": "mvn:io.atomix:atomix:2.0.22",
+    "atomix": "mvn:io.atomix:atomix:2.0.23",
     "commons-codec": "mvn:commons-codec:commons-codec:1.10",
     "commons-cli": "mvn:commons-cli:commons-cli:1.3",
     "commons-collections": "mvn:commons-collections:commons-collections:3.2.2",
diff --git a/tools/build/bazel/generate_workspace.bzl b/tools/build/bazel/generate_workspace.bzl
index a789d10..1239015 100644
--- a/tools/build/bazel/generate_workspace.bzl
+++ b/tools/build/bazel/generate_workspace.bzl
@@ -1,4 +1,4 @@
-# ***** This file was auto-generated at Fri, 29 Jun 2018 16:58:53 GMT. Do not edit this file manually. *****
+# ***** This file was auto-generated at Sat, 30 Jun 2018 08:02:29 GMT. Do not edit this file manually. *****
 # ***** Use onos-lib-gen *****
 
 load("//tools/build/bazel:variables.bzl", "ONOS_GROUP_ID", "ONOS_VERSION")
@@ -132,8 +132,8 @@
 
     native.maven_jar(
         name = "atomix",
-        artifact = "io.atomix:atomix:2.0.22",
-        sha1 = "6c4f4d95ad933ac612ef9ab85b22f5c4f714a3e1",
+        artifact = "io.atomix:atomix:2.0.23",
+        sha1 = "6b41cebf257e0094c2276b6fa589b2c73aff5f99",
     )
 
     native.maven_jar(
@@ -2110,7 +2110,7 @@
 artifact_map[str(Label("@aopalliance_repackaged//jar"))] = "mvn:org.glassfish.hk2.external:aopalliance-repackaged:jar:2.5.0-b42"
 artifact_map[str(Label("@amqp_client//jar"))] = "mvn:com.rabbitmq:amqp-client:jar:3.6.1"
 artifact_map[str(Label("@asm//jar"))] = "mvn:org.ow2.asm:asm:jar:5.0.4"
-artifact_map[str(Label("@atomix//jar"))] = "mvn:io.atomix:atomix:jar:2.0.22"
+artifact_map[str(Label("@atomix//jar"))] = "mvn:io.atomix:atomix:jar:2.0.23"
 artifact_map[str(Label("@commons_codec//jar"))] = "mvn:commons-codec:commons-codec:jar:1.10"
 artifact_map[str(Label("@commons_cli//jar"))] = "mvn:commons-cli:commons-cli:jar:1.3"
 artifact_map[str(Label("@commons_collections//jar"))] = "mvn:commons-collections:commons-collections:jar:3.2.2"
diff --git a/tools/package/features/features.xml b/tools/package/features/features.xml
index 979f8b8..a89555d 100644
--- a/tools/package/features/features.xml
+++ b/tools/package/features/features.xml
@@ -59,7 +59,7 @@
         <bundle>mvn:com.typesafe/config/1.2.1</bundle>
         <bundle>mvn:com.googlecode.concurrent-trees/concurrent-trees/2.6.0</bundle>
         <bundle>mvn:commons-io/commons-io/2.4</bundle>
-        <bundle>mvn:io.atomix/atomix/2.0.22</bundle>
+        <bundle>mvn:io.atomix/atomix/2.0.23</bundle>
 
         <bundle>mvn:org.glassfish.jersey.core/jersey-client/2.26</bundle>