Refactor multimap streams to avoid blocking iteration on initialization

Change-Id: I6a357b37e85808972267ef2daf5328fd5035aac4
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index 39d49fe..b04c9dd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -17,21 +17,21 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.ConcurrentModificationException;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multiset;
 import io.atomix.protocols.raft.proxy.RaftProxy;
-import io.atomix.utils.concurrent.AtomixFuture;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -43,9 +43,9 @@
 import org.onosproject.store.service.Versioned;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
@@ -56,10 +56,13 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
@@ -89,13 +92,11 @@
         .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
         .build());
 
-    private volatile EntryIterator iterator;
     private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
 
     public AtomixConsistentSetMultimap(RaftProxy proxy) {
         super(proxy);
         proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleChange);
-        proxy.addEventListener(ENTRY, SERIALIZER::decode, this::handleEntry);
         proxy.addStateChangeListener(state -> {
             if (state == RaftProxy.State.CONNECTED && isListening()) {
                 proxy.invoke(ADD_LISTENER);
@@ -108,13 +109,6 @@
             mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
     }
 
-    private void handleEntry(Map.Entry<String, byte[]> entry) {
-        EntryIterator iterator = this.iterator;
-        if (iterator != null) {
-            iterator.add(entry);
-        }
-    }
-
     @Override
     public CompletableFuture<Integer> size() {
         return proxy.invoke(SIZE, SERIALIZER::decode);
@@ -234,51 +228,87 @@
 
     @Override
     public CompletableFuture<AsyncIterator<Map.Entry<String, byte[]>>> iterator() {
-        return proxy.<Integer>invoke(ITERATE, SERIALIZER::decode).thenApply(count -> {
-            iterator = new EntryIterator(count);
-            return iterator;
-        });
+        return proxy.<Long>invoke(OPEN_ITERATOR, SERIALIZER::decode).thenApply(ConsistentMultimapIterator::new);
     }
 
-    private class EntryIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
-        private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> in = new LinkedList<>();
-        private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> out = new LinkedList<>();
-        private final int total;
-        private volatile int count;
+    /**
+     * Consistent multimap iterator.
+     */
+    private class ConsistentMultimapIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
+        private final long id;
+        private volatile CompletableFuture<IteratorBatch> batch;
+        private volatile CompletableFuture<Void> closeFuture;
 
-        EntryIterator(int total) {
-            this.total = total;
+        ConsistentMultimapIterator(long id) {
+            this.id = id;
+            this.batch = CompletableFuture.completedFuture(
+                new IteratorBatch(0, Collections.emptyList()));
         }
 
-        synchronized void add(Map.Entry<String, byte[]> entry) {
-            CompletableFuture<Map.Entry<String, byte[]>> future = out.poll();
-            if (future != null) {
-                future.complete(entry);
-            } else {
-                in.add(CompletableFuture.completedFuture(entry));
+        /**
+         * Returns the current batch iterator or lazily fetches the next batch from the cluster.
+         *
+         * @return the next batch iterator
+         */
+        private CompletableFuture<Iterator<Map.Entry<String, byte[]>>> batch() {
+            return batch.thenCompose(iterator -> {
+                if (iterator != null && !iterator.hasNext()) {
+                    batch = fetch(iterator.position());
+                    return batch.thenApply(Function.identity());
+                }
+                return CompletableFuture.completedFuture(iterator);
+            });
+        }
+
+        /**
+         * Fetches the next batch of entries from the cluster.
+         *
+         * @param position the position from which to fetch the next batch
+         * @return the next batch of entries from the cluster
+         */
+        private CompletableFuture<IteratorBatch> fetch(int position) {
+            return proxy.<IteratorPosition, IteratorBatch>invoke(
+                NEXT,
+                SERIALIZER::encode,
+                new IteratorPosition(id, position),
+                SERIALIZER::decode)
+                .thenCompose(batch -> {
+                    if (batch == null) {
+                        return close().thenApply(v -> null);
+                    }
+                    return CompletableFuture.completedFuture(batch);
+                });
+        }
+
+        /**
+         * Closes the iterator.
+         *
+         * @return future to be completed once the iterator has been closed
+         */
+        private CompletableFuture<Void> close() {
+            if (closeFuture == null) {
+                synchronized (this) {
+                    if (closeFuture == null) {
+                        closeFuture = proxy.invoke(CLOSE_ITERATOR, SERIALIZER::encode, id);
+                    }
+                }
             }
+            return closeFuture;
         }
 
         @Override
-        public synchronized CompletableFuture<Boolean> hasNext() {
-            return CompletableFuture.completedFuture(count < total);
+        public CompletableFuture<Boolean> hasNext() {
+            return batch().thenApply(iterator -> iterator != null && iterator.hasNext());
         }
 
         @Override
-        public synchronized CompletableFuture<Map.Entry<String, byte[]>> next() {
-            if (count == total) {
-                return Tools.exceptionalFuture(new NoSuchElementException());
-            }
-            count++;
-
-            CompletableFuture<Map.Entry<String, byte[]>> next = in.poll();
-            if (next != null) {
-                return next;
-            }
-
-            CompletableFuture<Map.Entry<String, byte[]>> future = new AtomixFuture<>();
-            out.add(future);
-            return future;
+        public CompletableFuture<Map.Entry<String, byte[]>> next() {
+            return batch().thenCompose(iterator -> {
+                if (iterator == null) {
+                    return Tools.exceptionalFuture(new NoSuchElementException());
+                }
+                return CompletableFuture.completedFuture(iterator.next());
+            });
         }
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 6ad93b6..4d38d71 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -18,6 +18,8 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Maps;
@@ -53,7 +55,9 @@
     CLEAR(OperationType.COMMAND),
     ADD_LISTENER(OperationType.COMMAND),
     REMOVE_LISTENER(OperationType.COMMAND),
-    ITERATE(OperationType.COMMAND);
+    OPEN_ITERATOR(OperationType.COMMAND),
+    NEXT(OperationType.QUERY),
+    CLOSE_ITERATOR(OperationType.COMMAND);
 
     private final OperationType type;
 
@@ -86,6 +90,8 @@
             .register(Versioned.class)
             .register(ArrayList.class)
             .register(Maps.immutableEntry("", "").getClass())
+            .register(IteratorBatch.class)
+            .register(IteratorPosition.class)
             .build("AtomixConsistentSetMultimapOperations");
 
     /**
@@ -382,4 +388,77 @@
             super(key);
         }
     }
+
+    /**
+     * Iterator position.
+     */
+    public static class IteratorPosition {
+        private long iteratorId;
+        private int position;
+
+        private IteratorPosition() {
+        }
+
+        public IteratorPosition(long iteratorId, int position) {
+            this.iteratorId = iteratorId;
+            this.position = position;
+        }
+
+        public long iteratorId() {
+            return iteratorId;
+        }
+
+        public int position() {
+            return position;
+        }
+    }
+
+    /**
+     * Iterator batch.
+     */
+    public static class IteratorBatch implements Iterator<Map.Entry<String, byte[]>> {
+        private int position;
+        private Collection<Map.Entry<String, byte[]>> entries;
+        private transient volatile Iterator<Map.Entry<String, byte[]>> iterator;
+
+        private IteratorBatch() {
+        }
+
+        public IteratorBatch(int position, Collection<Map.Entry<String, byte[]>> entries) {
+            this.position = position;
+            this.entries = entries;
+        }
+
+        public int position() {
+            return position;
+        }
+
+        public Collection<Map.Entry<String, byte[]>> entries() {
+            return entries;
+        }
+
+        private Iterator<Map.Entry<String, byte[]>> iterator() {
+            Iterator<Map.Entry<String, byte[]>> iterator = this.iterator;
+            if (iterator == null) {
+                synchronized (entries) {
+                    iterator = this.iterator;
+                    if (iterator == null) {
+                        iterator = entries.iterator();
+                        this.iterator = iterator;
+                    }
+                }
+            }
+            return iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator().hasNext();
+        }
+
+        @Override
+        public Map.Entry<String, byte[]> next() {
+            return iterator().next();
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 5a93bef..6c864b0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -16,11 +16,13 @@
 
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,9 +60,9 @@
 import org.onosproject.store.service.Versioned;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
@@ -71,11 +73,14 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
@@ -93,6 +98,7 @@
  * State Machine for {@link AtomixConsistentSetMultimap} resource.
  */
 public class AtomixConsistentSetMultimapService extends AbstractRaftService {
+    private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
 
     private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
             .register(KryoNamespaces.BASIC)
@@ -119,13 +125,18 @@
 
     private AtomicLong globalVersion = new AtomicLong(1);
     private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
-    private Map<String, MapEntryValue> backingMap = Maps.newHashMap();
+    private Map<String, MapEntryValue> backingMap = Maps.newConcurrentMap();
+    private Map<Long, IteratorContext> iterators = Maps.newHashMap();
 
     @Override
     public void snapshot(SnapshotWriter writer) {
         writer.writeLong(globalVersion.get());
         writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
         writer.writeObject(backingMap, serializer::encode);
+
+        Map<Long, Long> iterators = Maps.newHashMap();
+        this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
+        writer.writeObject(iterators, serializer::encode);
     }
 
     @Override
@@ -138,6 +149,11 @@
         }
 
         backingMap = reader.readObject(serializer::decode);
+
+        Map<Long, Long> iterators = reader.readObject(serializer::decode);
+        this.iterators = Maps.newHashMap();
+        iterators.forEach((id, session) ->
+            this.iterators.put(id, new IteratorContext(session, backingMap.entrySet().iterator())));
     }
 
     @Override
@@ -161,17 +177,21 @@
         executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
         executor.register(ADD_LISTENER, this::listen);
         executor.register(REMOVE_LISTENER, this::unlisten);
-        executor.register(ITERATE, this::iterate, serializer::encode);
+        executor.register(OPEN_ITERATOR, this::openIterator, serializer::encode);
+        executor.register(NEXT, serializer::decode, this::next, serializer::encode);
+        executor.register(CLOSE_ITERATOR, serializer::decode, this::closeIterator);
     }
 
     @Override
     public void onExpire(RaftSession session) {
         listeners.remove(session.sessionId().id());
+        iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
     }
 
     @Override
     public void onClose(RaftSession session) {
         listeners.remove(session.sessionId().id());
+        iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
     }
 
     /**
@@ -492,20 +512,66 @@
     }
 
     /**
-     * Handles an iterate commit.
+     * Handles an open iterator commit.
      *
-     * @param commit the iterate commit
-     * @return count of commit entries
+     * @param commit the open iterator commit
+     * @return iterator identifier
      */
-    protected int iterate(Commit<Void> commit) {
-        int count = 0;
-        for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
-            for (byte[] value : entry.getValue().values()) {
-                commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
-                count++;
+    protected long openIterator(Commit<Void> commit) {
+        iterators.put(commit.index(), new IteratorContext(
+            commit.session().sessionId().id(),
+            backingMap.entrySet().iterator()));
+        return commit.index();
+    }
+
+    /**
+     * Handles an iterator next commit.
+     *
+     * @param commit the next commit
+     * @return a list of entries to iterate
+     */
+    protected IteratorBatch next(Commit<IteratorPosition> commit) {
+        final long iteratorId = commit.value().iteratorId();
+        final int position = commit.value().position();
+
+        IteratorContext context = iterators.get(iteratorId);
+        if (context == null) {
+            return null;
+        }
+
+        List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
+        int size = 0;
+        while (context.iterator.hasNext()) {
+            context.position++;
+            if (context.position > position) {
+                Map.Entry<String, MapEntryValue> entry = context.iterator.next();
+                String key = entry.getKey();
+                int keySize = key.length();
+                for (byte[] value : entry.getValue().values()) {
+                    entries.add(Maps.immutableEntry(key, value));
+                    size += keySize;
+                    size += value.length;
+                }
+
+                if (size >= MAX_ITERATOR_BATCH_SIZE) {
+                    break;
+                }
             }
         }
-        return count;
+
+        if (entries.isEmpty()) {
+            return null;
+        }
+        return new IteratorBatch(context.position, entries);
+    }
+
+    /**
+     * Handles a close iterator commit.
+     *
+     * @param commit the close iterator commit
+     */
+    protected void closeIterator(Commit<Long> commit) {
+        iterators.remove(commit.value());
     }
 
     /**
@@ -787,4 +853,15 @@
             }
         }
     }
+
+    private static class IteratorContext {
+        private final long sessionId;
+        private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
+        private int position = 0;
+
+        IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
+            this.sessionId = sessionId;
+            this.iterator = iterator;
+        }
+    }
 }
\ No newline at end of file