Implement lazy iterators/streams for ConsistentMap
Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
index 1bc62b3..b692bf3 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
@@ -16,6 +16,7 @@
package org.onosproject.store.primitives;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -32,6 +33,7 @@
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
@@ -184,6 +186,11 @@
}
@Override
+ public Iterator<Entry<K, Versioned<V>>> iterator() {
+ return new DefaultIterator<>(complete(asyncMap.iterator()));
+ }
+
+ @Override
public void addListener(MapEventListener<K, V> listener, Executor executor) {
complete(asyncMap.addListener(listener, executor));
}
@@ -208,6 +215,24 @@
return asyncMap.statusChangeListeners();
}
+ private class DefaultIterator<K, V> implements Iterator<Entry<K, Versioned<V>>> {
+ private final AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+ public DefaultIterator(AsyncIterator<Map.Entry<K, Versioned<V>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return complete(iterator.hasNext());
+ }
+
+ @Override
+ public Map.Entry<K, Versioned<V>> next() {
+ return complete(iterator.next());
+ }
+ }
+
@Override
public Map<K, V> asJavaMap() {
synchronized (this) {
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
index c9dd5df..7ccf8c7 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
@@ -18,6 +18,7 @@
import com.google.common.base.Throwables;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentTreeMap;
import org.onosproject.store.service.MapEventListener;
@@ -25,6 +26,7 @@
import org.onosproject.store.service.Versioned;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
@@ -277,6 +279,11 @@
}
@Override
+ public Iterator<Map.Entry<String, Versioned<V>>> iterator() {
+ return new DefaultIterator<>(complete(treeMap.iterator()));
+ }
+
+ @Override
public void addListener(MapEventListener<String, V> listener,
Executor executor) {
complete(treeMap.addListener(listener, executor));
@@ -287,6 +294,24 @@
complete(treeMap.removeListener(listener));
}
+ private class DefaultIterator<K, V> implements Iterator<Map.Entry<K, Versioned<V>>> {
+ private final AsyncIterator<Map.Entry<K, Versioned<V>>> iterator;
+
+ public DefaultIterator(AsyncIterator<Map.Entry<K, Versioned<V>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return complete(iterator.hasNext());
+ }
+
+ @Override
+ public Map.Entry<K, Versioned<V>> next() {
+ return complete(iterator.next());
+ }
+ }
+
@Override
public Map<String, V> asJavaMap() {
synchronized (this) {
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 749ab5b..45688c0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -17,6 +17,7 @@
package org.onosproject.store.service;
import java.util.Collection;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
@@ -54,7 +55,8 @@
* the returned future will be {@link CompletableFuture#complete completed} when the
* operation finishes.
*/
-public interface AsyncConsistentMap<K, V> extends DistributedPrimitive, Transactional<MapUpdate<K, V>> {
+public interface AsyncConsistentMap<K, V>
+ extends DistributedPrimitive, Transactional<MapUpdate<K, V>>, AsyncIterable<Map.Entry<K, Versioned<V>>> {
@Override
default DistributedPrimitive.Type primitiveType() {
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 1f6579b1..304764e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -24,6 +24,8 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import com.google.common.util.concurrent.MoreExecutors;
@@ -34,7 +36,7 @@
* @param <K> type of key
* @param <V> type of value
*/
-public interface ConsistentMap<K, V> extends DistributedPrimitive {
+public interface ConsistentMap<K, V> extends DistributedPrimitive, Iterable<Map.Entry<K, Versioned<V>>> {
/**
* Returns the number of entries in the map.
@@ -283,6 +285,17 @@
boolean replace(K key, long oldVersion, V newValue);
/**
+ * Streams entries from the map.
+ * <p>
+ * This method is optimized for large maps.
+ *
+ * @return the map entry stream
+ */
+ default Stream<Entry<K, Versioned<V>>> stream() {
+ return StreamSupport.stream(spliterator(), false);
+ }
+
+ /**
* Registers the specified listener to be notified whenever the map is updated.
*
* @param listener listener to notify about map events
diff --git a/core/api/src/test/java/org/onosproject/store/primitives/AsyncConsistentTreeMapAdapter.java b/core/api/src/test/java/org/onosproject/store/primitives/AsyncConsistentTreeMapAdapter.java
index 9f6da1a..7e014cc 100644
--- a/core/api/src/test/java/org/onosproject/store/primitives/AsyncConsistentTreeMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/primitives/AsyncConsistentTreeMapAdapter.java
@@ -19,6 +19,7 @@
import com.google.common.collect.Maps;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentTreeMap;
import org.onosproject.store.service.MapEventListener;
@@ -367,4 +368,9 @@
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return null;
}
+
+ @Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V>>>> iterator() {
+ return null;
+ }
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
index 85e8e19..f39e7d0 100644
--- a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
@@ -134,6 +134,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<K, Versioned<V>>>> iterator() {
+ return null;
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return null;
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
index 18f9d63..a8db799 100644
--- a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
@@ -16,6 +16,7 @@
package org.onosproject.store.service;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -155,6 +156,11 @@
}
@Override
+ public Iterator<Map.Entry<K, Versioned<V>>> iterator() {
+ return null;
+ }
+
+ @Override
public void addListener(MapEventListener<K, V> listener, Executor executor) {
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 917f44b..193a250 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -30,6 +30,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -150,6 +151,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+ return delegateMap.iterator();
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return delegateMap.addListener(listener, executor);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index 2bee5eb..b3a1f95 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -30,6 +30,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -239,6 +240,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V>>>> iterator() {
+ return delegateMap.iterator();
+ }
+
+ @Override
public CompletableFuture<Void> addListener(
MapEventListener<String, V> listener, Executor executor) {
return delegateMap.addListener(listener, executor);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index d5d4c73..38ba0f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -18,8 +18,11 @@
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -35,6 +38,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -185,6 +189,12 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+ return Tools.allOf(getMaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
+ .thenApply(PartitionedMultimapIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return CompletableFuture.allOf(getMaps().stream()
.map(map -> map.addListener(listener, executor))
@@ -254,4 +264,42 @@
private Collection<AsyncConsistentMap<K, V>> getMaps() {
return partitions.values();
}
+
+ private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, Versioned<V>>> {
+ private final Iterator<AsyncIterator<Entry<K, Versioned<V>>>> iterators;
+ private volatile AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+ public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, Versioned<V>>>> iterators) {
+ this.iterators = iterators.iterator();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ if (iterator == null && iterators.hasNext()) {
+ iterator = iterators.next();
+ }
+ if (iterator == null) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return iterator.hasNext()
+ .thenCompose(hasNext -> {
+ if (!hasNext) {
+ iterator = null;
+ return hasNext();
+ }
+ return CompletableFuture.completedFuture(true);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Entry<K, Versioned<V>>> next() {
+ if (iterator == null && iterators.hasNext()) {
+ iterator = iterators.next();
+ }
+ if (iterator == null) {
+ return Tools.exceptionalFuture(new NoSuchElementException());
+ }
+ return iterator.next();
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 4e662b0..d68ada5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -21,6 +21,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
@@ -248,6 +249,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K1, Versioned<V1>>>> iterator() {
+ return backingMap.iterator().thenApply(TranscodingIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener, Executor executor) {
synchronized (listeners) {
InternalBackingMapEventListener backingMapListener =
@@ -328,6 +334,25 @@
return backingMap.statusChangeListeners();
}
+ private class TranscodingIterator implements AsyncIterator<Entry<K1, Versioned<V1>>> {
+ private final AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator;
+
+ public TranscodingIterator(AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<K1, Versioned<V1>>> next() {
+ return iterator.next().thenApply(entry ->
+ Maps.immutableEntry(keyDecoder.apply(entry.getKey()), entry.getValue().map(valueDecoder)));
+ }
+ }
+
private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index fcaca0e..d8e686e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -21,6 +21,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
@@ -340,6 +341,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V1>>>> iterator() {
+ return backingMap.iterator().thenApply(TranscodingIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(
MapEventListener<String, V1> listener,
Executor executor) {
@@ -387,6 +393,25 @@
throw new UnsupportedOperationException("This operation is not yet supported.");
}
+ private class TranscodingIterator implements AsyncIterator<Map.Entry<String, Versioned<V1>>> {
+ private final AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator;
+
+ public TranscodingIterator(AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V1>>> next() {
+ return iterator.next().thenApply(entry ->
+ Maps.immutableEntry(entry.getKey(), entry.getValue().map(valueDecoder)));
+ }
+ }
+
private class InternalBackingMapEventListener
implements MapEventListener<String, V2> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 588159c..0514333 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -16,15 +16,19 @@
package org.onosproject.store.primitives.resources.impl;
import java.util.Collection;
+import java.util.Collections;
import java.util.ConcurrentModificationException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Predicate;
import io.atomix.protocols.raft.proxy.RaftProxy;
@@ -50,6 +54,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
@@ -62,6 +67,7 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
@@ -69,7 +75,11 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
@@ -91,11 +101,11 @@
*/
public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncConsistentMap<String, byte[]> {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
- .register(KryoNamespaces.BASIC)
- .register(AtomixConsistentMapOperations.NAMESPACE)
- .register(AtomixConsistentMapEvents.NAMESPACE)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
- .build());
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixConsistentMapOperations.NAMESPACE)
+ .register(AtomixConsistentMapEvents.NAMESPACE)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+ .build());
private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
@@ -115,7 +125,7 @@
private void handleEvent(List<MapEvent<String, byte[]>> events) {
events.forEach(event ->
- mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
@Override
@@ -146,10 +156,10 @@
@Override
public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
return proxy.invoke(
- GET_OR_DEFAULT,
- serializer()::encode,
- new GetOrDefault(key, defaultValue),
- serializer()::decode);
+ GET_OR_DEFAULT,
+ serializer()::encode,
+ new GetOrDefault(key, defaultValue),
+ serializer()::decode);
}
@Override
@@ -171,122 +181,122 @@
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
- PUT,
- serializer()::encode,
- new Put(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ PUT,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
- PUT_AND_GET,
- serializer()::encode,
- new Put(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ PUT_AND_GET,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
- PUT_IF_ABSENT,
- serializer()::encode,
- new Put(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ PUT_IF_ABSENT,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> remove(String key) {
return proxy.<Remove, MapEntryUpdateResult<String, byte[]>>invoke(
- REMOVE,
- serializer()::encode,
- new Remove(key),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ REMOVE,
+ serializer()::encode,
+ new Remove(key),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return proxy.<RemoveValue, MapEntryUpdateResult<String, byte[]>>invoke(
- REMOVE_VALUE,
- serializer()::encode,
- new RemoveValue(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.updated());
+ REMOVE_VALUE,
+ serializer()::encode,
+ new RemoveValue(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, long version) {
return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
- REMOVE_VERSION,
- serializer()::encode,
- new RemoveVersion(key, version),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.updated());
+ REMOVE_VERSION,
+ serializer()::encode,
+ new RemoveVersion(key, version),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
return proxy.<Replace, MapEntryUpdateResult<String, byte[]>>invoke(
- REPLACE,
- serializer()::encode,
- new Replace(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ REPLACE,
+ serializer()::encode,
+ new Replace(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
return proxy.<ReplaceValue, MapEntryUpdateResult<String, byte[]>>invoke(
- REPLACE_VALUE,
- serializer()::encode,
- new ReplaceValue(key, oldValue, newValue),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.updated());
+ REPLACE_VALUE,
+ serializer()::encode,
+ new ReplaceValue(key, oldValue, newValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
- REPLACE_VERSION,
- serializer()::encode,
- new ReplaceVersion(key, oldVersion, newValue),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.updated());
+ REPLACE_VERSION,
+ serializer()::encode,
+ new ReplaceVersion(key, oldVersion, newValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Void> clear() {
return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> null);
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> null);
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> computeIf(String key,
- Predicate<? super byte[]> condition,
- BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
+ Predicate<? super byte[]> condition,
+ BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
return get(key).thenCompose(r1 -> {
byte[] existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
@@ -307,40 +317,40 @@
if (r1 == null) {
return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
- PUT_IF_ABSENT,
- serializer()::encode,
- new Put(key, computedValue),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenCompose(r -> checkLocked(r))
- .thenApply(result -> new Versioned<>(computedValue, result.version()));
+ PUT_IF_ABSENT,
+ serializer()::encode,
+ new Put(key, computedValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenCompose(r -> checkLocked(r))
+ .thenApply(result -> new Versioned<>(computedValue, result.version()));
} else if (computedValue == null) {
return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
- REMOVE_VERSION,
- serializer()::encode,
- new RemoveVersion(key, r1.version()),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenCompose(r -> checkLocked(r))
- .thenApply(v -> null);
+ REMOVE_VERSION,
+ serializer()::encode,
+ new RemoveVersion(key, r1.version()),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenCompose(r -> checkLocked(r))
+ .thenApply(v -> null);
} else {
return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
- REPLACE_VERSION,
- serializer()::encode,
- new ReplaceVersion(key, r1.version(), computedValue),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenCompose(r -> checkLocked(r))
- .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
- ? new Versioned(computedValue, result.version()) : result.result());
+ REPLACE_VERSION,
+ serializer()::encode,
+ new ReplaceVersion(key, r1.version(), computedValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenCompose(r -> checkLocked(r))
+ .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
+ ? new Versioned(computedValue, result.version()) : result.result());
}
});
}
private CompletableFuture<MapEntryUpdateResult<String, byte[]>> checkLocked(
- MapEntryUpdateResult<String, byte[]> result) {
+ MapEntryUpdateResult<String, byte[]> result) {
if (result.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
- result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
+ result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
}
return CompletableFuture.completedFuture(result);
@@ -348,7 +358,7 @@
@Override
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
- Executor executor) {
+ Executor executor) {
if (mapEventListeners.isEmpty()) {
return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
} else {
@@ -380,54 +390,141 @@
@Override
public CompletableFuture<Version> begin(TransactionId transactionId) {
return proxy.<TransactionBegin, Long>invoke(
- BEGIN,
- serializer()::encode,
- new TransactionBegin(transactionId),
- serializer()::decode)
- .thenApply(Version::new);
+ BEGIN,
+ serializer()::encode,
+ new TransactionBegin(transactionId),
+ serializer()::decode)
+ .thenApply(Version::new);
}
@Override
public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
return proxy.<TransactionPrepare, PrepareResult>invoke(
- PREPARE,
- serializer()::encode,
- new TransactionPrepare(transactionLog),
- serializer()::decode)
- .thenApply(v -> v == PrepareResult.OK);
+ PREPARE,
+ serializer()::encode,
+ new TransactionPrepare(transactionLog),
+ serializer()::decode)
+ .thenApply(v -> v == PrepareResult.OK);
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
- PREPARE_AND_COMMIT,
- serializer()::encode,
- new TransactionPrepareAndCommit(transactionLog),
- serializer()::decode)
- .thenApply(v -> v == PrepareResult.OK);
+ PREPARE_AND_COMMIT,
+ serializer()::encode,
+ new TransactionPrepareAndCommit(transactionLog),
+ serializer()::decode)
+ .thenApply(v -> v == PrepareResult.OK);
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
return proxy.<TransactionCommit, CommitResult>invoke(
- COMMIT,
- serializer()::encode,
- new TransactionCommit(transactionId),
- serializer()::decode)
- .thenApply(v -> null);
+ COMMIT,
+ serializer()::encode,
+ new TransactionCommit(transactionId),
+ serializer()::decode)
+ .thenApply(v -> null);
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return proxy.invoke(
- ROLLBACK,
- serializer()::encode,
- new TransactionRollback(transactionId),
- serializer()::decode)
- .thenApply(v -> null);
+ ROLLBACK,
+ serializer()::encode,
+ new TransactionRollback(transactionId),
+ serializer()::decode)
+ .thenApply(v -> null);
}
private boolean isListening() {
return !mapEventListeners.isEmpty();
}
+
+ @Override
+ public CompletableFuture<AsyncIterator<Entry<String, Versioned<byte[]>>>> iterator() {
+ return proxy.<Long>invoke(OPEN_ITERATOR, SERIALIZER::decode)
+ .thenApply(ConsistentMultimapIterator::new);
+ }
+
+ /**
+ * Consistent multimap iterator.
+ */
+ private class ConsistentMultimapIterator implements AsyncIterator<Map.Entry<String, Versioned<byte[]>>> {
+ private final long id;
+ private volatile CompletableFuture<IteratorBatch> batch;
+ private volatile CompletableFuture<Void> closeFuture;
+
+ ConsistentMultimapIterator(long id) {
+ this.id = id;
+ this.batch = CompletableFuture.completedFuture(
+ new IteratorBatch(0, Collections.emptyList()));
+ }
+
+ /**
+ * Returns the current batch iterator or lazily fetches the next batch from the cluster.
+ *
+ * @return the next batch iterator
+ */
+ private CompletableFuture<Iterator<Entry<String, Versioned<byte[]>>>> batch() {
+ return batch.thenCompose(iterator -> {
+ if (iterator != null && !iterator.hasNext()) {
+ batch = fetch(iterator.position());
+ return batch.thenApply(Function.identity());
+ }
+ return CompletableFuture.completedFuture(iterator);
+ });
+ }
+
+ /**
+ * Fetches the next batch of entries from the cluster.
+ *
+ * @param position the position from which to fetch the next batch
+ * @return the next batch of entries from the cluster
+ */
+ private CompletableFuture<IteratorBatch> fetch(int position) {
+ return proxy.<IteratorPosition, IteratorBatch>invoke(
+ NEXT,
+ SERIALIZER::encode,
+ new IteratorPosition(id, position),
+ SERIALIZER::decode)
+ .thenCompose(batch -> {
+ if (batch == null) {
+ return close().thenApply(v -> null);
+ }
+ return CompletableFuture.completedFuture(batch);
+ });
+ }
+
+ /**
+ * Closes the iterator.
+ *
+ * @return future to be completed once the iterator has been closed
+ */
+ private CompletableFuture<Void> close() {
+ if (closeFuture == null) {
+ synchronized (this) {
+ if (closeFuture == null) {
+ closeFuture = proxy.invoke(CLOSE_ITERATOR, SERIALIZER::encode, id);
+ }
+ }
+ }
+ return closeFuture;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return batch().thenApply(iterator -> iterator != null && iterator.hasNext());
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> next() {
+ return batch().thenCompose(iterator -> {
+ if (iterator == null) {
+ return Tools.exceptionalFuture(new NoSuchElementException());
+ }
+ return CompletableFuture.completedFuture(iterator.next());
+ });
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
index 248c4eb..4f131ef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
@@ -15,6 +15,11 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.utils.ArraySizeHashPrinter;
@@ -58,7 +63,10 @@
PREPARE(OperationType.COMMAND),
PREPARE_AND_COMMIT(OperationType.COMMAND),
COMMIT(OperationType.COMMAND),
- ROLLBACK(OperationType.COMMAND);
+ ROLLBACK(OperationType.COMMAND),
+ OPEN_ITERATOR(OperationType.COMMAND),
+ NEXT(OperationType.QUERY),
+ CLOSE_ITERATOR(OperationType.COMMAND);
private final OperationType type;
@@ -107,6 +115,9 @@
.register(MapEntryUpdateResult.Status.class)
.register(Versioned.class)
.register(byte[].class)
+ .register(Maps.immutableEntry("", "").getClass())
+ .register(IteratorBatch.class)
+ .register(IteratorPosition.class)
.build("AtomixConsistentMapOperations");
/**
@@ -561,4 +572,77 @@
.toString();
}
}
+
+ /**
+ * Iterator position.
+ */
+ public static class IteratorPosition {
+ private long iteratorId;
+ private int position;
+
+ private IteratorPosition() {
+ }
+
+ public IteratorPosition(long iteratorId, int position) {
+ this.iteratorId = iteratorId;
+ this.position = position;
+ }
+
+ public long iteratorId() {
+ return iteratorId;
+ }
+
+ public int position() {
+ return position;
+ }
+ }
+
+ /**
+ * Iterator batch.
+ */
+ public static class IteratorBatch implements Iterator<Map.Entry<String, Versioned<byte[]>>> {
+ private int position;
+ private Collection<Map.Entry<String, Versioned<byte[]>>> entries;
+ private transient volatile Iterator<Map.Entry<String, Versioned<byte[]>>> iterator;
+
+ private IteratorBatch() {
+ }
+
+ public IteratorBatch(int position, Collection<Map.Entry<String, Versioned<byte[]>>> entries) {
+ this.position = position;
+ this.entries = entries;
+ }
+
+ public int position() {
+ return position;
+ }
+
+ public Collection<Map.Entry<String, Versioned<byte[]>>> entries() {
+ return entries;
+ }
+
+ private Iterator<Map.Entry<String, Versioned<byte[]>>> iterator() {
+ Iterator<Map.Entry<String, Versioned<byte[]>>> iterator = this.iterator;
+ if (iterator == null) {
+ synchronized (entries) {
+ iterator = this.iterator;
+ if (iterator == null) {
+ iterator = entries.iterator();
+ this.iterator = iterator;
+ }
+ }
+ }
+ return iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator().hasNext();
+ }
+
+ @Override
+ public Map.Entry<String, Versioned<byte[]>> next() {
+ return iterator().next();
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index d25d475..0fcf3b5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -66,6 +67,7 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
@@ -73,7 +75,11 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
@@ -95,22 +101,25 @@
*/
public class AtomixConsistentMapService extends AbstractRaftService {
+ private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
+
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
- .register(KryoNamespaces.BASIC)
- .register(AtomixConsistentMapOperations.NAMESPACE)
- .register(AtomixConsistentMapEvents.NAMESPACE)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
- .register(TransactionScope.class)
- .register(TransactionLog.class)
- .register(TransactionId.class)
- .register(MapEntryValue.class)
- .register(MapEntryValue.Type.class)
- .register(new HashMap().keySet().getClass())
- .build());
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixConsistentMapOperations.NAMESPACE)
+ .register(AtomixConsistentMapEvents.NAMESPACE)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+ .register(TransactionScope.class)
+ .register(TransactionLog.class)
+ .register(TransactionId.class)
+ .register(MapEntryValue.class)
+ .register(MapEntryValue.Type.class)
+ .register(new HashMap().keySet().getClass())
+ .build());
protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
private Map<String, MapEntryValue> map;
protected Set<String> preparedKeys = Sets.newHashSet();
+ private Map<Long, IteratorContext> iterators = Maps.newHashMap();
protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
protected long currentVersion;
@@ -119,7 +128,7 @@
}
protected Map<String, MapEntryValue> createMap() {
- return Maps.newHashMap();
+ return Maps.newConcurrentMap();
}
protected Map<String, MapEntryValue> entries() {
@@ -137,6 +146,10 @@
writer.writeObject(entries(), serializer()::encode);
writer.writeObject(activeTransactions, serializer()::encode);
writer.writeLong(currentVersion);
+
+ Map<Long, Long> iterators = Maps.newHashMap();
+ this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
+ writer.writeObject(iterators, serializer()::encode);
}
@Override
@@ -149,6 +162,11 @@
map = reader.readObject(serializer()::decode);
activeTransactions = reader.readObject(serializer()::decode);
currentVersion = reader.readLong();
+
+ Map<Long, Long> iterators = reader.readObject(serializer()::decode);
+ this.iterators = Maps.newHashMap();
+ iterators.forEach((id, session) ->
+ this.iterators.put(id, new IteratorContext(session, entries().entrySet().iterator())));
}
@Override
@@ -182,6 +200,9 @@
executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
+ executor.register(OPEN_ITERATOR, this::openIterator, serializer()::encode);
+ executor.register(NEXT, serializer()::decode, this::next, serializer()::encode);
+ executor.register(CLOSE_ITERATOR, serializer()::decode, this::closeIterator);
}
/**
@@ -204,8 +225,8 @@
protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
return entries().values().stream()
- .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
- .anyMatch(value -> valueMatch.matches(value.value()));
+ .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+ .anyMatch(value -> valueMatch.matches(value.value()));
}
/**
@@ -242,8 +263,8 @@
*/
protected int size() {
return (int) entries().values().stream()
- .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
- .count();
+ .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+ .count();
}
/**
@@ -253,7 +274,7 @@
*/
protected boolean isEmpty() {
return entries().values().stream()
- .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
+ .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
}
/**
@@ -263,9 +284,9 @@
*/
protected Set<String> keySet() {
return entries().entrySet().stream()
- .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
}
/**
@@ -275,9 +296,9 @@
*/
protected Collection<Versioned<byte[]>> values() {
return entries().entrySet().stream()
- .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
- .map(entry -> toVersioned(entry.getValue()))
- .collect(Collectors.toList());
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(entry -> toVersioned(entry.getValue()))
+ .collect(Collectors.toList());
}
/**
@@ -287,9 +308,9 @@
*/
protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
return entries().entrySet().stream()
- .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
- .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
- .collect(Collectors.toSet());
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
+ .collect(Collectors.toSet());
}
/**
@@ -301,7 +322,7 @@
*/
protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
return (oldValue == null && newValue == null)
- || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
+ || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
}
/**
@@ -313,7 +334,7 @@
*/
protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
return (oldValue == null && newValue == null)
- || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
+ || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
}
/**
@@ -343,13 +364,13 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
entries().put(commit.value().key(),
- new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
Versioned<byte[]> result = toVersioned(oldValue);
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
@@ -357,13 +378,13 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
entries().put(commit.value().key(),
- new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
Versioned<byte[]> result = toVersioned(oldValue);
publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
@@ -387,25 +408,25 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
MapEntryValue newValue = new MapEntryValue(
- MapEntryValue.Type.VALUE,
- commit.index(),
- commit.value().value());
+ MapEntryValue.Type.VALUE,
+ commit.index(),
+ commit.value().value());
entries().put(commit.value().key(), newValue);
Versioned<byte[]> result = toVersioned(newValue);
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
}
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.PRECONDITION_FAILED,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
/**
@@ -425,10 +446,10 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
entries().put(commit.value().key(), newValue);
Versioned<byte[]> result = toVersioned(newValue);
@@ -438,10 +459,10 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
entries().put(commit.value().key(), newValue);
Versioned<byte[]> result = toVersioned(newValue);
@@ -454,8 +475,8 @@
/**
* Handles a remove commit.
*
- * @param index the commit index
- * @param key the key to remove
+ * @param index the commit index
+ * @param key the key to remove
* @param predicate predicate to determine whether to remove the entry
* @return map entry update result
*/
@@ -502,7 +523,7 @@
*/
protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
return removeIf(commit.index(), commit.value().key(), v ->
- valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
+ valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
}
/**
@@ -518,23 +539,23 @@
/**
* Handles a replace commit.
*
- * @param index the commit index
- * @param key the key to replace
- * @param newValue the value with which to replace the key
+ * @param index the commit index
+ * @param key the key to replace
+ * @param newValue the value with which to replace the key
* @param predicate a predicate to determine whether to replace the key
* @return map entry update result
*/
private MapEntryUpdateResult<String, byte[]> replaceIf(
- long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
+ long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
MapEntryValue oldValue = entries().get(key);
// If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.PRECONDITION_FAILED,
- index,
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+ index,
+ key,
+ toVersioned(oldValue));
}
// If the key has been locked by a transaction, return a WRITE_LOCK error.
@@ -568,7 +589,7 @@
protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
return replaceIf(commit.index(), commit.value().key(), value,
- v -> valuesEqual(v.value(), commit.value().oldValue()));
+ v -> valuesEqual(v.value(), commit.value().oldValue()));
}
/**
@@ -580,7 +601,7 @@
protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
return replaceIf(commit.index(), commit.value().key(), value,
- v -> v.version() == commit.value().oldVersion());
+ v -> v.version() == commit.value().oldVersion());
}
/**
@@ -610,6 +631,67 @@
}
/**
+ * Handles an open iterator commit.
+ *
+ * @param commit the open iterator commit
+ * @return iterator identifier
+ */
+ protected long openIterator(Commit<Void> commit) {
+ iterators.put(commit.index(), new IteratorContext(
+ commit.session().sessionId().id(),
+ entries().entrySet().iterator()));
+ return commit.index();
+ }
+
+ /**
+ * Handles an iterator next commit.
+ *
+ * @param commit the next commit
+ * @return a list of entries to iterate
+ */
+ protected IteratorBatch next(Commit<IteratorPosition> commit) {
+ final long iteratorId = commit.value().iteratorId();
+ final int position = commit.value().position();
+
+ IteratorContext context = iterators.get(iteratorId);
+ if (context == null) {
+ return null;
+ }
+
+ List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+ int size = 0;
+ while (context.iterator.hasNext()) {
+ context.position++;
+ if (context.position > position) {
+ Map.Entry<String, MapEntryValue> entry = context.iterator.next();
+ String key = entry.getKey();
+ Versioned<byte[]> value = toVersioned(entry.getValue());
+ size += key.length();
+ size += value.value() != null ? value.value().length : 0;
+ entries.add(Maps.immutableEntry(key, value));
+
+ if (size >= MAX_ITERATOR_BATCH_SIZE) {
+ break;
+ }
+ }
+ }
+
+ if (entries.isEmpty()) {
+ return null;
+ }
+ return new IteratorBatch(context.position, entries);
+ }
+
+ /**
+ * Handles a close iterator commit.
+ *
+ * @param commit the close iterator commit
+ */
+ protected void closeIterator(Commit<Long> commit) {
+ iterators.remove(commit.value());
+ }
+
+ /**
* Handles a listen commit.
*
* @param session listen session
@@ -719,13 +801,13 @@
TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
if (transactionScope == null) {
activeTransactions.put(
- transactionLog.transactionId(),
- new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
+ transactionLog.transactionId(),
+ new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
return PrepareResult.PARTIAL_FAILURE;
} else {
activeTransactions.put(
- transactionLog.transactionId(),
- transactionScope.prepared(commit));
+ transactionLog.transactionId(),
+ transactionScope.prepared(commit));
return PrepareResult.OK;
}
} catch (Exception e) {
@@ -795,34 +877,34 @@
if (!valueIsNull(newValue)) {
if (!valueIsNull(previousValue)) {
event = new MapEvent<>(
- MapEvent.Type.UPDATE,
- "",
- key,
- toVersioned(newValue),
- toVersioned(previousValue));
+ MapEvent.Type.UPDATE,
+ "",
+ key,
+ toVersioned(newValue),
+ toVersioned(previousValue));
} else {
event = new MapEvent<>(
- MapEvent.Type.INSERT,
- "",
- key,
- toVersioned(newValue),
- null);
+ MapEvent.Type.INSERT,
+ "",
+ key,
+ toVersioned(newValue),
+ null);
}
} else {
event = new MapEvent<>(
- MapEvent.Type.REMOVE,
- "",
- key,
- null,
- toVersioned(previousValue));
- }
- } else {
- event = new MapEvent<>(
MapEvent.Type.REMOVE,
"",
key,
null,
toVersioned(previousValue));
+ }
+ } else {
+ event = new MapEvent<>(
+ MapEvent.Type.REMOVE,
+ "",
+ key,
+ null,
+ toVersioned(previousValue));
}
eventsToPublish.add(event);
}
@@ -847,11 +929,11 @@
} else {
try {
transactionScope.transactionLog().records()
- .forEach(record -> {
- if (record.type() != MapUpdate.Type.VERSION_MATCH) {
- preparedKeys.remove(record.key());
- }
- });
+ .forEach(record -> {
+ if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+ preparedKeys.remove(record.key());
+ }
+ });
return RollbackResult.OK;
} finally {
discardTombstones();
@@ -874,8 +956,8 @@
}
} else {
long lowWaterMark = activeTransactions.values().stream()
- .mapToLong(TransactionScope::version)
- .min().getAsLong();
+ .mapToLong(TransactionScope::version)
+ .min().getAsLong();
Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
while (iterator.hasNext()) {
MapEntryValue value = iterator.next().getValue();
@@ -888,12 +970,13 @@
/**
* Utility for turning a {@code MapEntryValue} to {@code Versioned}.
+ *
* @param value map entry value
* @return versioned instance
*/
protected Versioned<byte[]> toVersioned(MapEntryValue value) {
return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
- ? new Versioned<>(value.value(), value.version()) : null;
+ ? new Versioned<>(value.value(), value.version()) : null;
}
/**
@@ -1034,4 +1117,15 @@
return new TransactionScope(version, commit.value().transactionLog());
}
}
+
+ private static class IteratorContext {
+ private final long sessionId;
+ private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
+ private int position = 0;
+
+ IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
+ this.sessionId = sessionId;
+ this.iterator = iterator;
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
index 1b5cb12..89c8da6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
@@ -20,6 +20,7 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import com.google.common.collect.Maps;
import io.atomix.protocols.raft.service.Commit;
@@ -75,17 +76,17 @@
.register(MapEntryValue.class)
.register(MapEntryValue.Type.class)
.register(new HashMap().keySet().getClass())
- .register(TreeMap.class)
+ .register(ConcurrentSkipListMap.class)
.build());
@Override
- protected TreeMap<String, MapEntryValue> createMap() {
- return Maps.newTreeMap();
+ protected NavigableMap<String, MapEntryValue> createMap() {
+ return new ConcurrentSkipListMap<>();
}
@Override
- protected TreeMap<String, MapEntryValue> entries() {
- return (TreeMap<String, MapEntryValue>) super.entries();
+ protected NavigableMap<String, MapEntryValue> entries() {
+ return (NavigableMap<String, MapEntryValue>) super.entries();
}
@Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
index b686687..5d9e5d8 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
@@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -380,6 +381,11 @@
}
@Override
+ public Iterator<Map.Entry<K, Versioned<V>>> iterator() {
+ return map.entrySet().iterator();
+ }
+
+ @Override
public void addListener(MapEventListener<K, V> listener, Executor executor) {
throw new UnsupportedOperationException();
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
index c292959..a7cd94a 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -15,7 +15,16 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftContext;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.session.SessionId;
import io.atomix.protocols.raft.session.impl.RaftSessionContext;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
@@ -24,15 +33,23 @@
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.storage.StorageLevel;
import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
import org.junit.Test;
import org.onosproject.store.service.Versioned;
+import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import static org.onosproject.store.service.DistributedPrimitive.Type.LEADER_ELECTOR;
/**
* Consistent map service test.
@@ -47,13 +64,42 @@
.build());
Snapshot snapshot = store.newSnapshot(2, new WallClockTimestamp());
+ DefaultServiceContext context = mock(DefaultServiceContext.class);
+ expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
+ expect(context.serviceName()).andReturn("test").anyTimes();
+ expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+
+ RaftContext server = mock(RaftContext.class);
+ expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+ replay(context, server);
+
+ RaftSession session = new RaftSessionContext(
+ SessionId.from(1),
+ MemberId.from("1"),
+ "test",
+ ServiceType.from(LEADER_ELECTOR.name()),
+ ReadConsistency.LINEARIZABLE,
+ 100,
+ 5000,
+ System.currentTimeMillis(),
+ context,
+ server,
+ new SingleThreadContextFactory(new AtomixThreadFactory()));
+
AtomixConsistentMapService service = new AtomixConsistentMapService();
service.put(new DefaultCommit<>(
2,
PUT,
new Put("foo", "Hello world!".getBytes()),
- mock(RaftSessionContext.class),
+ session,
System.currentTimeMillis()));
+ service.openIterator(new DefaultCommit<>(
+ 3,
+ OPEN_ITERATOR,
+ null,
+ session,
+ System.currentTimeMillis()));
try (SnapshotWriter writer = snapshot.openWriter()) {
service.snapshot(writer);
@@ -74,5 +120,12 @@
System.currentTimeMillis()));
assertNotNull(value);
assertArrayEquals("Hello world!".getBytes(), value.value());
+
+ assertEquals(1, service.next(new DefaultCommit<>(
+ 4,
+ NEXT,
+ new AtomixConsistentMapOperations.IteratorPosition(3L, 0),
+ session,
+ System.currentTimeMillis())).entries().size());
}
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 3e7a872..8753783 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,7 @@
import org.onosproject.store.primitives.impl.DistributedPrimitives;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
@@ -34,12 +35,16 @@
import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.is;
@@ -634,6 +639,25 @@
assertEquals("1.0.1:Hello world again!", map1.get("bar").join().value());
}
+ @Test
+ public void testIterator() throws Exception {
+ AtomixConsistentMap map = newPrimitive("testIterator");
+ for (int i = 0; i < 100; i++) {
+ for (int j = 0; j < 100; j++) {
+ map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+ }
+ }
+
+ List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+ AsyncIterator<Map.Entry<String, Versioned<byte[]>>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
+ while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+ map.put("foo", UUID.randomUUID().toString().getBytes()).join();
+ entries.add(iterator.next().get(5, TimeUnit.SECONDS));
+ }
+ assertEquals(100, entries.size());
+ assertEquals(101, map.asConsistentMap().stream().count());
+ }
+
private static class TestMapEventListener implements MapEventListener<String, byte[]> {
private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
index 941009c..1548fa5 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
@@ -20,14 +20,20 @@
import io.atomix.protocols.raft.service.RaftService;
import org.junit.Test;
import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals;
@@ -480,6 +486,25 @@
//map.delete().join();
}
+ @Test
+ public void testIterator() throws Exception {
+ AtomixConsistentTreeMap map = newPrimitive("testIterator");
+ for (int i = 0; i < 100; i++) {
+ for (int j = 0; j < 100; j++) {
+ map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+ }
+ }
+
+ List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+ AsyncIterator<Map.Entry<String, Versioned<byte[]>>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
+ while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+ map.put("foo", UUID.randomUUID().toString().getBytes()).join();
+ entries.add(iterator.next().get(5, TimeUnit.SECONDS));
+ }
+ assertEquals(100, entries.size());
+ assertEquals(101, map.asConsistentMap().stream().count());
+ }
+
private AtomixConsistentTreeMap createResource(String mapName) {
try {
AtomixConsistentTreeMap map = newPrimitive(mapName);