Refactor multimap streams to avoid blocking iteration on initialization

Change-Id: I6a357b37e85808972267ef2daf5328fd5035aac4
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 5a93bef..6c864b0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -16,11 +16,13 @@
 
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,9 +60,9 @@
 import org.onosproject.store.service.Versioned;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
@@ -71,11 +73,14 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
@@ -93,6 +98,7 @@
  * State Machine for {@link AtomixConsistentSetMultimap} resource.
  */
 public class AtomixConsistentSetMultimapService extends AbstractRaftService {
+    private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
 
     private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
             .register(KryoNamespaces.BASIC)
@@ -119,13 +125,18 @@
 
     private AtomicLong globalVersion = new AtomicLong(1);
     private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
-    private Map<String, MapEntryValue> backingMap = Maps.newHashMap();
+    private Map<String, MapEntryValue> backingMap = Maps.newConcurrentMap();
+    private Map<Long, IteratorContext> iterators = Maps.newHashMap();
 
     @Override
     public void snapshot(SnapshotWriter writer) {
         writer.writeLong(globalVersion.get());
         writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
         writer.writeObject(backingMap, serializer::encode);
+
+        Map<Long, Long> iterators = Maps.newHashMap();
+        this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
+        writer.writeObject(iterators, serializer::encode);
     }
 
     @Override
@@ -138,6 +149,11 @@
         }
 
         backingMap = reader.readObject(serializer::decode);
+
+        Map<Long, Long> iterators = reader.readObject(serializer::decode);
+        this.iterators = Maps.newHashMap();
+        iterators.forEach((id, session) ->
+            this.iterators.put(id, new IteratorContext(session, backingMap.entrySet().iterator())));
     }
 
     @Override
@@ -161,17 +177,21 @@
         executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
         executor.register(ADD_LISTENER, this::listen);
         executor.register(REMOVE_LISTENER, this::unlisten);
-        executor.register(ITERATE, this::iterate, serializer::encode);
+        executor.register(OPEN_ITERATOR, this::openIterator, serializer::encode);
+        executor.register(NEXT, serializer::decode, this::next, serializer::encode);
+        executor.register(CLOSE_ITERATOR, serializer::decode, this::closeIterator);
     }
 
     @Override
     public void onExpire(RaftSession session) {
         listeners.remove(session.sessionId().id());
+        iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
     }
 
     @Override
     public void onClose(RaftSession session) {
         listeners.remove(session.sessionId().id());
+        iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
     }
 
     /**
@@ -492,20 +512,66 @@
     }
 
     /**
-     * Handles an iterate commit.
+     * Handles an open iterator commit.
      *
-     * @param commit the iterate commit
-     * @return count of commit entries
+     * @param commit the open iterator commit
+     * @return iterator identifier
      */
-    protected int iterate(Commit<Void> commit) {
-        int count = 0;
-        for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
-            for (byte[] value : entry.getValue().values()) {
-                commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
-                count++;
+    protected long openIterator(Commit<Void> commit) {
+        iterators.put(commit.index(), new IteratorContext(
+            commit.session().sessionId().id(),
+            backingMap.entrySet().iterator()));
+        return commit.index();
+    }
+
+    /**
+     * Handles an iterator next commit.
+     *
+     * @param commit the next commit
+     * @return a list of entries to iterate
+     */
+    protected IteratorBatch next(Commit<IteratorPosition> commit) {
+        final long iteratorId = commit.value().iteratorId();
+        final int position = commit.value().position();
+
+        IteratorContext context = iterators.get(iteratorId);
+        if (context == null) {
+            return null;
+        }
+
+        List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
+        int size = 0;
+        while (context.iterator.hasNext()) {
+            context.position++;
+            if (context.position > position) {
+                Map.Entry<String, MapEntryValue> entry = context.iterator.next();
+                String key = entry.getKey();
+                int keySize = key.length();
+                for (byte[] value : entry.getValue().values()) {
+                    entries.add(Maps.immutableEntry(key, value));
+                    size += keySize;
+                    size += value.length;
+                }
+
+                if (size >= MAX_ITERATOR_BATCH_SIZE) {
+                    break;
+                }
             }
         }
-        return count;
+
+        if (entries.isEmpty()) {
+            return null;
+        }
+        return new IteratorBatch(context.position, entries);
+    }
+
+    /**
+     * Handles a close iterator commit.
+     *
+     * @param commit the close iterator commit
+     */
+    protected void closeIterator(Commit<Long> commit) {
+        iterators.remove(commit.value());
     }
 
     /**
@@ -787,4 +853,15 @@
             }
         }
     }
+
+    private static class IteratorContext {
+        private final long sessionId;
+        private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
+        private int position = 0;
+
+        IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
+            this.sessionId = sessionId;
+            this.iterator = iterator;
+        }
+    }
 }
\ No newline at end of file