Refactor multimap streams to avoid blocking iteration on initialization
Change-Id: I6a357b37e85808972267ef2daf5328fd5035aac4
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 5a93bef..6c864b0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -16,11 +16,13 @@
package org.onosproject.store.primitives.resources.impl;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -58,9 +60,9 @@
import org.onosproject.store.service.Versioned;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
@@ -71,11 +73,14 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
@@ -93,6 +98,7 @@
* State Machine for {@link AtomixConsistentSetMultimap} resource.
*/
public class AtomixConsistentSetMultimapService extends AbstractRaftService {
+ private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
@@ -119,13 +125,18 @@
private AtomicLong globalVersion = new AtomicLong(1);
private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
- private Map<String, MapEntryValue> backingMap = Maps.newHashMap();
+ private Map<String, MapEntryValue> backingMap = Maps.newConcurrentMap();
+ private Map<Long, IteratorContext> iterators = Maps.newHashMap();
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeLong(globalVersion.get());
writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
writer.writeObject(backingMap, serializer::encode);
+
+ Map<Long, Long> iterators = Maps.newHashMap();
+ this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
+ writer.writeObject(iterators, serializer::encode);
}
@Override
@@ -138,6 +149,11 @@
}
backingMap = reader.readObject(serializer::decode);
+
+ Map<Long, Long> iterators = reader.readObject(serializer::decode);
+ this.iterators = Maps.newHashMap();
+ iterators.forEach((id, session) ->
+ this.iterators.put(id, new IteratorContext(session, backingMap.entrySet().iterator())));
}
@Override
@@ -161,17 +177,21 @@
executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
executor.register(ADD_LISTENER, this::listen);
executor.register(REMOVE_LISTENER, this::unlisten);
- executor.register(ITERATE, this::iterate, serializer::encode);
+ executor.register(OPEN_ITERATOR, this::openIterator, serializer::encode);
+ executor.register(NEXT, serializer::decode, this::next, serializer::encode);
+ executor.register(CLOSE_ITERATOR, serializer::decode, this::closeIterator);
}
@Override
public void onExpire(RaftSession session) {
listeners.remove(session.sessionId().id());
+ iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
}
@Override
public void onClose(RaftSession session) {
listeners.remove(session.sessionId().id());
+ iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
}
/**
@@ -492,20 +512,66 @@
}
/**
- * Handles an iterate commit.
+ * Handles an open iterator commit.
*
- * @param commit the iterate commit
- * @return count of commit entries
+ * @param commit the open iterator commit
+ * @return iterator identifier
*/
- protected int iterate(Commit<Void> commit) {
- int count = 0;
- for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
- for (byte[] value : entry.getValue().values()) {
- commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
- count++;
+ protected long openIterator(Commit<Void> commit) {
+ iterators.put(commit.index(), new IteratorContext(
+ commit.session().sessionId().id(),
+ backingMap.entrySet().iterator()));
+ return commit.index();
+ }
+
+ /**
+ * Handles an iterator next commit.
+ *
+ * @param commit the next commit
+ * @return a list of entries to iterate
+ */
+ protected IteratorBatch next(Commit<IteratorPosition> commit) {
+ final long iteratorId = commit.value().iteratorId();
+ final int position = commit.value().position();
+
+ IteratorContext context = iterators.get(iteratorId);
+ if (context == null) {
+ return null;
+ }
+
+ List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
+ int size = 0;
+ while (context.iterator.hasNext()) {
+ context.position++;
+ if (context.position > position) {
+ Map.Entry<String, MapEntryValue> entry = context.iterator.next();
+ String key = entry.getKey();
+ int keySize = key.length();
+ for (byte[] value : entry.getValue().values()) {
+ entries.add(Maps.immutableEntry(key, value));
+ size += keySize;
+ size += value.length;
+ }
+
+ if (size >= MAX_ITERATOR_BATCH_SIZE) {
+ break;
+ }
}
}
- return count;
+
+ if (entries.isEmpty()) {
+ return null;
+ }
+ return new IteratorBatch(context.position, entries);
+ }
+
+ /**
+ * Handles a close iterator commit.
+ *
+ * @param commit the close iterator commit
+ */
+ protected void closeIterator(Commit<Long> commit) {
+ iterators.remove(commit.value());
}
/**
@@ -787,4 +853,15 @@
}
}
}
+
+ private static class IteratorContext {
+ private final long sessionId;
+ private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
+ private int position = 0;
+
+ IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
+ this.sessionId = sessionId;
+ this.iterator = iterator;
+ }
+ }
}
\ No newline at end of file