Refactor multimap streams to avoid blocking iteration on initialization
Change-Id: I6a357b37e85808972267ef2daf5328fd5035aac4
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index 5ace6b3..7839290 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
- <version>2.0.22</version>
+ <version>2.0.23</version>
</dependency>
<dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index 39d49fe..b04c9dd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -17,21 +17,21 @@
package org.onosproject.store.primitives.resources.impl;
import java.util.Collection;
+import java.util.Collections;
import java.util.ConcurrentModificationException;
-import java.util.LinkedList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.function.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import io.atomix.protocols.raft.proxy.RaftProxy;
-import io.atomix.utils.concurrent.AtomixFuture;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.store.serializers.KryoNamespaces;
@@ -43,9 +43,9 @@
import org.onosproject.store.service.Versioned;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
@@ -56,10 +56,13 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
@@ -89,13 +92,11 @@
.register(AtomixConsistentSetMultimapEvents.NAMESPACE)
.build());
- private volatile EntryIterator iterator;
private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
public AtomixConsistentSetMultimap(RaftProxy proxy) {
super(proxy);
proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleChange);
- proxy.addEventListener(ENTRY, SERIALIZER::decode, this::handleEntry);
proxy.addStateChangeListener(state -> {
if (state == RaftProxy.State.CONNECTED && isListening()) {
proxy.invoke(ADD_LISTENER);
@@ -108,13 +109,6 @@
mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
- private void handleEntry(Map.Entry<String, byte[]> entry) {
- EntryIterator iterator = this.iterator;
- if (iterator != null) {
- iterator.add(entry);
- }
- }
-
@Override
public CompletableFuture<Integer> size() {
return proxy.invoke(SIZE, SERIALIZER::decode);
@@ -234,51 +228,87 @@
@Override
public CompletableFuture<AsyncIterator<Map.Entry<String, byte[]>>> iterator() {
- return proxy.<Integer>invoke(ITERATE, SERIALIZER::decode).thenApply(count -> {
- iterator = new EntryIterator(count);
- return iterator;
- });
+ return proxy.<Long>invoke(OPEN_ITERATOR, SERIALIZER::decode).thenApply(ConsistentMultimapIterator::new);
}
- private class EntryIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
- private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> in = new LinkedList<>();
- private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> out = new LinkedList<>();
- private final int total;
- private volatile int count;
+ /**
+ * Consistent multimap iterator.
+ */
+ private class ConsistentMultimapIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
+ private final long id;
+ private volatile CompletableFuture<IteratorBatch> batch;
+ private volatile CompletableFuture<Void> closeFuture;
- EntryIterator(int total) {
- this.total = total;
+ ConsistentMultimapIterator(long id) {
+ this.id = id;
+ this.batch = CompletableFuture.completedFuture(
+ new IteratorBatch(0, Collections.emptyList()));
}
- synchronized void add(Map.Entry<String, byte[]> entry) {
- CompletableFuture<Map.Entry<String, byte[]>> future = out.poll();
- if (future != null) {
- future.complete(entry);
- } else {
- in.add(CompletableFuture.completedFuture(entry));
+ /**
+ * Returns the current batch iterator or lazily fetches the next batch from the cluster.
+ *
+ * @return the next batch iterator
+ */
+ private CompletableFuture<Iterator<Map.Entry<String, byte[]>>> batch() {
+ return batch.thenCompose(iterator -> {
+ if (iterator != null && !iterator.hasNext()) {
+ batch = fetch(iterator.position());
+ return batch.thenApply(Function.identity());
+ }
+ return CompletableFuture.completedFuture(iterator);
+ });
+ }
+
+ /**
+ * Fetches the next batch of entries from the cluster.
+ *
+ * @param position the position from which to fetch the next batch
+ * @return the next batch of entries from the cluster
+ */
+ private CompletableFuture<IteratorBatch> fetch(int position) {
+ return proxy.<IteratorPosition, IteratorBatch>invoke(
+ NEXT,
+ SERIALIZER::encode,
+ new IteratorPosition(id, position),
+ SERIALIZER::decode)
+ .thenCompose(batch -> {
+ if (batch == null) {
+ return close().thenApply(v -> null);
+ }
+ return CompletableFuture.completedFuture(batch);
+ });
+ }
+
+ /**
+ * Closes the iterator.
+ *
+ * @return future to be completed once the iterator has been closed
+ */
+ private CompletableFuture<Void> close() {
+ if (closeFuture == null) {
+ synchronized (this) {
+ if (closeFuture == null) {
+ closeFuture = proxy.invoke(CLOSE_ITERATOR, SERIALIZER::encode, id);
+ }
+ }
}
+ return closeFuture;
}
@Override
- public synchronized CompletableFuture<Boolean> hasNext() {
- return CompletableFuture.completedFuture(count < total);
+ public CompletableFuture<Boolean> hasNext() {
+ return batch().thenApply(iterator -> iterator != null && iterator.hasNext());
}
@Override
- public synchronized CompletableFuture<Map.Entry<String, byte[]>> next() {
- if (count == total) {
- return Tools.exceptionalFuture(new NoSuchElementException());
- }
- count++;
-
- CompletableFuture<Map.Entry<String, byte[]>> next = in.poll();
- if (next != null) {
- return next;
- }
-
- CompletableFuture<Map.Entry<String, byte[]>> future = new AtomixFuture<>();
- out.add(future);
- return future;
+ public CompletableFuture<Map.Entry<String, byte[]>> next() {
+ return batch().thenCompose(iterator -> {
+ if (iterator == null) {
+ return Tools.exceptionalFuture(new NoSuchElementException());
+ }
+ return CompletableFuture.completedFuture(iterator.next());
+ });
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 6ad93b6..4d38d71 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -18,6 +18,8 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Maps;
@@ -53,7 +55,9 @@
CLEAR(OperationType.COMMAND),
ADD_LISTENER(OperationType.COMMAND),
REMOVE_LISTENER(OperationType.COMMAND),
- ITERATE(OperationType.COMMAND);
+ OPEN_ITERATOR(OperationType.COMMAND),
+ NEXT(OperationType.QUERY),
+ CLOSE_ITERATOR(OperationType.COMMAND);
private final OperationType type;
@@ -86,6 +90,8 @@
.register(Versioned.class)
.register(ArrayList.class)
.register(Maps.immutableEntry("", "").getClass())
+ .register(IteratorBatch.class)
+ .register(IteratorPosition.class)
.build("AtomixConsistentSetMultimapOperations");
/**
@@ -382,4 +388,77 @@
super(key);
}
}
+
+ /**
+ * Iterator position.
+ */
+ public static class IteratorPosition {
+ private long iteratorId;
+ private int position;
+
+ private IteratorPosition() {
+ }
+
+ public IteratorPosition(long iteratorId, int position) {
+ this.iteratorId = iteratorId;
+ this.position = position;
+ }
+
+ public long iteratorId() {
+ return iteratorId;
+ }
+
+ public int position() {
+ return position;
+ }
+ }
+
+ /**
+ * Iterator batch.
+ */
+ public static class IteratorBatch implements Iterator<Map.Entry<String, byte[]>> {
+ private int position;
+ private Collection<Map.Entry<String, byte[]>> entries;
+ private transient volatile Iterator<Map.Entry<String, byte[]>> iterator;
+
+ private IteratorBatch() {
+ }
+
+ public IteratorBatch(int position, Collection<Map.Entry<String, byte[]>> entries) {
+ this.position = position;
+ this.entries = entries;
+ }
+
+ public int position() {
+ return position;
+ }
+
+ public Collection<Map.Entry<String, byte[]>> entries() {
+ return entries;
+ }
+
+ private Iterator<Map.Entry<String, byte[]>> iterator() {
+ Iterator<Map.Entry<String, byte[]>> iterator = this.iterator;
+ if (iterator == null) {
+ synchronized (entries) {
+ iterator = this.iterator;
+ if (iterator == null) {
+ iterator = entries.iterator();
+ this.iterator = iterator;
+ }
+ }
+ }
+ return iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator().hasNext();
+ }
+
+ @Override
+ public Map.Entry<String, byte[]> next() {
+ return iterator().next();
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 7bf85ce..6c864b0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -16,11 +16,13 @@
package org.onosproject.store.primitives.resources.impl;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -58,9 +60,9 @@
import org.onosproject.store.service.Versioned;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLOSE_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_VALUE;
@@ -71,11 +73,14 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IteratorPosition;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
@@ -93,6 +98,7 @@
* State Machine for {@link AtomixConsistentSetMultimap} resource.
*/
public class AtomixConsistentSetMultimapService extends AbstractRaftService {
+ private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
@@ -119,13 +125,18 @@
private AtomicLong globalVersion = new AtomicLong(1);
private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
- private Map<String, MapEntryValue> backingMap = Maps.newHashMap();
+ private Map<String, MapEntryValue> backingMap = Maps.newConcurrentMap();
+ private Map<Long, IteratorContext> iterators = Maps.newHashMap();
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeLong(globalVersion.get());
writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer::encode);
writer.writeObject(backingMap, serializer::encode);
+
+ Map<Long, Long> iterators = Maps.newHashMap();
+ this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
+ writer.writeObject(iterators, serializer::encode);
}
@Override
@@ -138,6 +149,11 @@
}
backingMap = reader.readObject(serializer::decode);
+
+ Map<Long, Long> iterators = reader.readObject(serializer::decode);
+ this.iterators = Maps.newHashMap();
+ iterators.forEach((id, session) ->
+ this.iterators.put(id, new IteratorContext(session, backingMap.entrySet().iterator())));
}
@Override
@@ -161,17 +177,21 @@
executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
executor.register(ADD_LISTENER, this::listen);
executor.register(REMOVE_LISTENER, this::unlisten);
- executor.register(ITERATE, this::iterate, serializer::encode);
+ executor.register(OPEN_ITERATOR, this::openIterator, serializer::encode);
+ executor.register(NEXT, serializer::decode, this::next, serializer::encode);
+ executor.register(CLOSE_ITERATOR, serializer::decode, this::closeIterator);
}
@Override
public void onExpire(RaftSession session) {
listeners.remove(session.sessionId().id());
+ iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
}
@Override
public void onClose(RaftSession session) {
listeners.remove(session.sessionId().id());
+ iterators.entrySet().removeIf(entry -> entry.getValue().sessionId == session.sessionId().id());
}
/**
@@ -492,20 +512,66 @@
}
/**
- * Handles an iterate commit.
+ * Handles an open iterator commit.
*
- * @param commit the iterate commit
- * @return the entry count
+ * @param commit the open iterator commit
+ * @return iterator identifier
*/
- protected int iterate(Commit<Void> commit) {
- int count = 0;
- for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
- for (byte[] value : entry.getValue().values()) {
- commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
- count++;
+ protected long openIterator(Commit<Void> commit) {
+ iterators.put(commit.index(), new IteratorContext(
+ commit.session().sessionId().id(),
+ backingMap.entrySet().iterator()));
+ return commit.index();
+ }
+
+ /**
+ * Handles an iterator next commit.
+ *
+ * @param commit the next commit
+ * @return a list of entries to iterate
+ */
+ protected IteratorBatch next(Commit<IteratorPosition> commit) {
+ final long iteratorId = commit.value().iteratorId();
+ final int position = commit.value().position();
+
+ IteratorContext context = iterators.get(iteratorId);
+ if (context == null) {
+ return null;
+ }
+
+ List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
+ int size = 0;
+ while (context.iterator.hasNext()) {
+ context.position++;
+ if (context.position > position) {
+ Map.Entry<String, MapEntryValue> entry = context.iterator.next();
+ String key = entry.getKey();
+ int keySize = key.length();
+ for (byte[] value : entry.getValue().values()) {
+ entries.add(Maps.immutableEntry(key, value));
+ size += keySize;
+ size += value.length;
+ }
+
+ if (size >= MAX_ITERATOR_BATCH_SIZE) {
+ break;
+ }
}
}
- return count;
+
+ if (entries.isEmpty()) {
+ return null;
+ }
+ return new IteratorBatch(context.position, entries);
+ }
+
+ /**
+ * Handles a close iterator commit.
+ *
+ * @param commit the close iterator commit
+ */
+ protected void closeIterator(Commit<Long> commit) {
+ iterators.remove(commit.value());
}
/**
@@ -787,4 +853,15 @@
}
}
}
+
+ private static class IteratorContext {
+ private final long sessionId;
+ private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
+ private int position = 0;
+
+ IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
+ this.sessionId = sessionId;
+ this.iterator = iterator;
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
index f9418a4..5efdeb3 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
@@ -18,7 +18,16 @@
import java.util.Collection;
import java.util.Collections;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftContext;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.session.SessionId;
import io.atomix.protocols.raft.session.impl.RaftSessionContext;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
@@ -27,16 +36,23 @@
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.storage.StorageLevel;
import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
import org.junit.Test;
import org.onlab.util.Match;
import org.onosproject.store.service.Versioned;
+import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.service.DistributedPrimitive.Type.LEADER_ELECTOR;
/**
* Consistent set multimap service test.
@@ -46,19 +62,48 @@
@SuppressWarnings("unchecked")
public void testSnapshot() throws Exception {
SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
- .withPrefix("test")
- .withStorageLevel(StorageLevel.MEMORY)
- .build());
+ .withPrefix("test")
+ .withStorageLevel(StorageLevel.MEMORY)
+ .build());
Snapshot snapshot = store.newSnapshot(2, new WallClockTimestamp());
+ DefaultServiceContext context = mock(DefaultServiceContext.class);
+ expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
+ expect(context.serviceName()).andReturn("test").anyTimes();
+ expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+
+ RaftContext server = mock(RaftContext.class);
+ expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+ replay(context, server);
+
+ RaftSession session = new RaftSessionContext(
+ SessionId.from(1),
+ MemberId.from("1"),
+ "test",
+ ServiceType.from(LEADER_ELECTOR.name()),
+ ReadConsistency.LINEARIZABLE,
+ 100,
+ 5000,
+ System.currentTimeMillis(),
+ context,
+ server,
+ new SingleThreadContextFactory(new AtomixThreadFactory()));
+
AtomixConsistentSetMultimapService service = new AtomixConsistentSetMultimapService();
service.put(new DefaultCommit<>(
- 2,
- PUT,
- new AtomixConsistentSetMultimapOperations.Put(
- "foo", Collections.singletonList("Hello world!".getBytes()), Match.ANY),
- mock(RaftSessionContext.class),
- System.currentTimeMillis()));
+ 2,
+ PUT,
+ new AtomixConsistentSetMultimapOperations.Put(
+ "foo", Collections.singletonList("Hello world!".getBytes()), Match.ANY),
+ session,
+ System.currentTimeMillis()));
+ service.openIterator(new DefaultCommit<>(
+ 3,
+ OPEN_ITERATOR,
+ null,
+ session,
+ System.currentTimeMillis()));
try (SnapshotWriter writer = snapshot.openWriter()) {
service.snapshot(writer);
@@ -72,13 +117,20 @@
}
Versioned<Collection<? extends byte[]>> value = service.get(new DefaultCommit<>(
- 2,
- GET,
- new AtomixConsistentSetMultimapOperations.Get("foo"),
- mock(RaftSessionContext.class),
- System.currentTimeMillis()));
+ 3,
+ GET,
+ new AtomixConsistentSetMultimapOperations.Get("foo"),
+ session,
+ System.currentTimeMillis()));
assertNotNull(value);
assertEquals(1, value.value().size());
assertArrayEquals("Hello world!".getBytes(), value.value().iterator().next());
+
+ assertEquals(1, service.next(new DefaultCommit<>(
+ 4,
+ NEXT,
+ new AtomixConsistentSetMultimapOperations.IteratorPosition(3L, 0),
+ session,
+ System.currentTimeMillis())).entries().size());
}
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index a7ae60b..95b5b5a 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -336,18 +336,19 @@
@Test
public void testStreams() throws Exception {
AtomixConsistentSetMultimap map = createResource("testStreams");
- for (int i = 0; i < 10000; i++) {
- allKeys.forEach(key -> {
- map.put(key, UUID.randomUUID().toString().getBytes()).join();
- });
+ for (int i = 0; i < 100; i++) {
+ for (int j = 0; j < 100; j++) {
+ map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+ }
}
List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
AsyncIterator<Map.Entry<String, byte[]>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+ map.put(keyOne, UUID.randomUUID().toString().getBytes()).join();
entries.add(iterator.next().get(5, TimeUnit.SECONDS));
}
- assertEquals(40000, entries.size());
+ assertEquals(10000, entries.size());
}
/**
diff --git a/features/features.xml b/features/features.xml
index bf262d4..a95713a 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -59,7 +59,7 @@
<bundle>mvn:com.typesafe/config/1.2.1</bundle>
<bundle>mvn:com.googlecode.concurrent-trees/concurrent-trees/2.6.0</bundle>
<bundle>mvn:commons-io/commons-io/2.4</bundle>
- <bundle>mvn:io.atomix/atomix/2.0.22</bundle>
+ <bundle>mvn:io.atomix/atomix/2.0.23</bundle>
<bundle>mvn:org.glassfish.jersey.core/jersey-client/2.25.1</bundle>
diff --git a/lib/BUCK b/lib/BUCK
index 1eb58ad..7b98d79 100644
--- a/lib/BUCK
+++ b/lib/BUCK
@@ -1,4 +1,4 @@
-# ***** This file was auto-generated at Tue, 5 Jun 2018 06:18:18 GMT. Do not edit this file manually. *****
+# ***** This file was auto-generated at Fri, 29 Jun 2018 23:42:06 GMT. Do not edit this file manually. *****
# ***** Use onos-lib-gen *****
pass_thru_pom(
@@ -207,10 +207,10 @@
remote_jar (
name = 'atomix',
- out = 'atomix-2.0.22.jar',
- url = 'mvn:io.atomix:atomix:jar:2.0.22',
- sha1 = '6c4f4d95ad933ac612ef9ab85b22f5c4f714a3e1',
- maven_coords = 'io.atomix:atomix:2.0.22',
+ out = 'atomix-2.0.23.jar',
+ url = 'mvn:io.atomix:atomix:jar:2.0.23',
+ sha1 = '6b41cebf257e0094c2276b6fa589b2c73aff5f99',
+ maven_coords = 'io.atomix:atomix:2.0.23',
visibility = [ 'PUBLIC' ],
)
diff --git a/lib/deps.json b/lib/deps.json
index 5323ac1..80d17d0 100644
--- a/lib/deps.json
+++ b/lib/deps.json
@@ -117,7 +117,7 @@
"aopalliance-repackaged": "mvn:org.glassfish.hk2.external:aopalliance-repackaged:2.5.0-b32",
"amqp-client": "mvn:com.rabbitmq:amqp-client:jar:3.6.1",
"asm": "mvn:org.ow2.asm:asm:5.0.4",
- "atomix": "mvn:io.atomix:atomix:2.0.22",
+ "atomix": "mvn:io.atomix:atomix:2.0.23",
"commons-codec": "mvn:commons-codec:commons-codec:1.10",
"commons-collections": "mvn:commons-collections:commons-collections:3.2.2",
"commons-configuration": "mvn:commons-configuration:commons-configuration:1.10",