Implement lazy iterators/streams for ConsistentMap
Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 917f44b..193a250 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -30,6 +30,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -150,6 +151,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+ return delegateMap.iterator();
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return delegateMap.addListener(listener, executor);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index 2bee5eb..b3a1f95 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -30,6 +30,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -239,6 +240,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V>>>> iterator() {
+ return delegateMap.iterator();
+ }
+
+ @Override
public CompletableFuture<Void> addListener(
MapEventListener<String, V> listener, Executor executor) {
return delegateMap.addListener(listener, executor);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index d5d4c73..38ba0f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -18,8 +18,11 @@
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -35,6 +38,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
@@ -185,6 +189,12 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+ return Tools.allOf(getMaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
+ .thenApply(PartitionedMultimapIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return CompletableFuture.allOf(getMaps().stream()
.map(map -> map.addListener(listener, executor))
@@ -254,4 +264,42 @@
private Collection<AsyncConsistentMap<K, V>> getMaps() {
return partitions.values();
}
+
+ private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, Versioned<V>>> {
+ private final Iterator<AsyncIterator<Entry<K, Versioned<V>>>> iterators;
+ private volatile AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+ public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, Versioned<V>>>> iterators) {
+ this.iterators = iterators.iterator();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ if (iterator == null && iterators.hasNext()) {
+ iterator = iterators.next();
+ }
+ if (iterator == null) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return iterator.hasNext()
+ .thenCompose(hasNext -> {
+ if (!hasNext) {
+ iterator = null;
+ return hasNext();
+ }
+ return CompletableFuture.completedFuture(true);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Entry<K, Versioned<V>>> next() {
+ if (iterator == null && iterators.hasNext()) {
+ iterator = iterators.next();
+ }
+ if (iterator == null) {
+ return Tools.exceptionalFuture(new NoSuchElementException());
+ }
+ return iterator.next();
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 4e662b0..d68ada5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -21,6 +21,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
@@ -248,6 +249,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K1, Versioned<V1>>>> iterator() {
+ return backingMap.iterator().thenApply(TranscodingIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener, Executor executor) {
synchronized (listeners) {
InternalBackingMapEventListener backingMapListener =
@@ -328,6 +334,25 @@
return backingMap.statusChangeListeners();
}
+ private class TranscodingIterator implements AsyncIterator<Entry<K1, Versioned<V1>>> {
+ private final AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator;
+
+ public TranscodingIterator(AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<K1, Versioned<V1>>> next() {
+ return iterator.next().thenApply(entry ->
+ Maps.immutableEntry(keyDecoder.apply(entry.getKey()), entry.getValue().map(valueDecoder)));
+ }
+ }
+
private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index fcaca0e..d8e686e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -21,6 +21,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
@@ -340,6 +341,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V1>>>> iterator() {
+ return backingMap.iterator().thenApply(TranscodingIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(
MapEventListener<String, V1> listener,
Executor executor) {
@@ -387,6 +393,25 @@
throw new UnsupportedOperationException("This operation is not yet supported.");
}
+ private class TranscodingIterator implements AsyncIterator<Map.Entry<String, Versioned<V1>>> {
+ private final AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator;
+
+ public TranscodingIterator(AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V1>>> next() {
+ return iterator.next().thenApply(entry ->
+ Maps.immutableEntry(entry.getKey(), entry.getValue().map(valueDecoder)));
+ }
+ }
+
private class InternalBackingMapEventListener
implements MapEventListener<String, V2> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 588159c..0514333 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -16,15 +16,19 @@
package org.onosproject.store.primitives.resources.impl;
import java.util.Collection;
+import java.util.Collections;
import java.util.ConcurrentModificationException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Predicate;
import io.atomix.protocols.raft.proxy.RaftProxy;
@@ -50,6 +54,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
@@ -62,6 +67,7 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
@@ -69,7 +75,11 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
@@ -91,11 +101,11 @@
*/
public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncConsistentMap<String, byte[]> {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
- .register(KryoNamespaces.BASIC)
- .register(AtomixConsistentMapOperations.NAMESPACE)
- .register(AtomixConsistentMapEvents.NAMESPACE)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
- .build());
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixConsistentMapOperations.NAMESPACE)
+ .register(AtomixConsistentMapEvents.NAMESPACE)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+ .build());
private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
@@ -115,7 +125,7 @@
private void handleEvent(List<MapEvent<String, byte[]>> events) {
events.forEach(event ->
- mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
@Override
@@ -146,10 +156,10 @@
@Override
public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
return proxy.invoke(
- GET_OR_DEFAULT,
- serializer()::encode,
- new GetOrDefault(key, defaultValue),
- serializer()::decode);
+ GET_OR_DEFAULT,
+ serializer()::encode,
+ new GetOrDefault(key, defaultValue),
+ serializer()::decode);
}
@Override
@@ -171,122 +181,122 @@
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
- PUT,
- serializer()::encode,
- new Put(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ PUT,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
- PUT_AND_GET,
- serializer()::encode,
- new Put(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ PUT_AND_GET,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
- PUT_IF_ABSENT,
- serializer()::encode,
- new Put(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ PUT_IF_ABSENT,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> remove(String key) {
return proxy.<Remove, MapEntryUpdateResult<String, byte[]>>invoke(
- REMOVE,
- serializer()::encode,
- new Remove(key),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ REMOVE,
+ serializer()::encode,
+ new Remove(key),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return proxy.<RemoveValue, MapEntryUpdateResult<String, byte[]>>invoke(
- REMOVE_VALUE,
- serializer()::encode,
- new RemoveValue(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.updated());
+ REMOVE_VALUE,
+ serializer()::encode,
+ new RemoveValue(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, long version) {
return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
- REMOVE_VERSION,
- serializer()::encode,
- new RemoveVersion(key, version),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.updated());
+ REMOVE_VERSION,
+ serializer()::encode,
+ new RemoveVersion(key, version),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
return proxy.<Replace, MapEntryUpdateResult<String, byte[]>>invoke(
- REPLACE,
- serializer()::encode,
- new Replace(key, value),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.result());
+ REPLACE,
+ serializer()::encode,
+ new Replace(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
return proxy.<ReplaceValue, MapEntryUpdateResult<String, byte[]>>invoke(
- REPLACE_VALUE,
- serializer()::encode,
- new ReplaceValue(key, oldValue, newValue),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.updated());
+ REPLACE_VALUE,
+ serializer()::encode,
+ new ReplaceValue(key, oldValue, newValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
- REPLACE_VERSION,
- serializer()::encode,
- new ReplaceVersion(key, oldVersion, newValue),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> v.updated());
+ REPLACE_VERSION,
+ serializer()::encode,
+ new ReplaceVersion(key, oldVersion, newValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Void> clear() {
return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> null);
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> null);
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> computeIf(String key,
- Predicate<? super byte[]> condition,
- BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
+ Predicate<? super byte[]> condition,
+ BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
return get(key).thenCompose(r1 -> {
byte[] existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
@@ -307,40 +317,40 @@
if (r1 == null) {
return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
- PUT_IF_ABSENT,
- serializer()::encode,
- new Put(key, computedValue),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenCompose(r -> checkLocked(r))
- .thenApply(result -> new Versioned<>(computedValue, result.version()));
+ PUT_IF_ABSENT,
+ serializer()::encode,
+ new Put(key, computedValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenCompose(r -> checkLocked(r))
+ .thenApply(result -> new Versioned<>(computedValue, result.version()));
} else if (computedValue == null) {
return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
- REMOVE_VERSION,
- serializer()::encode,
- new RemoveVersion(key, r1.version()),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenCompose(r -> checkLocked(r))
- .thenApply(v -> null);
+ REMOVE_VERSION,
+ serializer()::encode,
+ new RemoveVersion(key, r1.version()),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenCompose(r -> checkLocked(r))
+ .thenApply(v -> null);
} else {
return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
- REPLACE_VERSION,
- serializer()::encode,
- new ReplaceVersion(key, r1.version(), computedValue),
- serializer()::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenCompose(r -> checkLocked(r))
- .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
- ? new Versioned(computedValue, result.version()) : result.result());
+ REPLACE_VERSION,
+ serializer()::encode,
+ new ReplaceVersion(key, r1.version(), computedValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenCompose(r -> checkLocked(r))
+ .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
+ ? new Versioned(computedValue, result.version()) : result.result());
}
});
}
private CompletableFuture<MapEntryUpdateResult<String, byte[]>> checkLocked(
- MapEntryUpdateResult<String, byte[]> result) {
+ MapEntryUpdateResult<String, byte[]> result) {
if (result.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
- result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
+ result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
}
return CompletableFuture.completedFuture(result);
@@ -348,7 +358,7 @@
@Override
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
- Executor executor) {
+ Executor executor) {
if (mapEventListeners.isEmpty()) {
return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
} else {
@@ -380,54 +390,141 @@
@Override
public CompletableFuture<Version> begin(TransactionId transactionId) {
return proxy.<TransactionBegin, Long>invoke(
- BEGIN,
- serializer()::encode,
- new TransactionBegin(transactionId),
- serializer()::decode)
- .thenApply(Version::new);
+ BEGIN,
+ serializer()::encode,
+ new TransactionBegin(transactionId),
+ serializer()::decode)
+ .thenApply(Version::new);
}
@Override
public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
return proxy.<TransactionPrepare, PrepareResult>invoke(
- PREPARE,
- serializer()::encode,
- new TransactionPrepare(transactionLog),
- serializer()::decode)
- .thenApply(v -> v == PrepareResult.OK);
+ PREPARE,
+ serializer()::encode,
+ new TransactionPrepare(transactionLog),
+ serializer()::decode)
+ .thenApply(v -> v == PrepareResult.OK);
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
- PREPARE_AND_COMMIT,
- serializer()::encode,
- new TransactionPrepareAndCommit(transactionLog),
- serializer()::decode)
- .thenApply(v -> v == PrepareResult.OK);
+ PREPARE_AND_COMMIT,
+ serializer()::encode,
+ new TransactionPrepareAndCommit(transactionLog),
+ serializer()::decode)
+ .thenApply(v -> v == PrepareResult.OK);
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
return proxy.<TransactionCommit, CommitResult>invoke(
- COMMIT,
- serializer()::encode,
- new TransactionCommit(transactionId),
- serializer()::decode)
- .thenApply(v -> null);
+ COMMIT,
+ serializer()::encode,
+ new TransactionCommit(transactionId),
+ serializer()::decode)
+ .thenApply(v -> null);
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return proxy.invoke(
- ROLLBACK,
- serializer()::encode,
- new TransactionRollback(transactionId),
- serializer()::decode)
- .thenApply(v -> null);
+ ROLLBACK,
+ serializer()::encode,
+ new TransactionRollback(transactionId),
+ serializer()::decode)
+ .thenApply(v -> null);
}
private boolean isListening() {
return !mapEventListeners.isEmpty();
}
+
+ @Override
+ public CompletableFuture<AsyncIterator<Entry<String, Versioned<byte[]>>>> iterator() {
+ return proxy.<Long>invoke(OPEN_ITERATOR, SERIALIZER::decode)
+ .thenApply(ConsistentMultimapIterator::new);
+ }
+
+ /**
+ * Consistent multimap iterator.
+ */
+ private class ConsistentMultimapIterator implements AsyncIterator<Map.Entry<String, Versioned<byte[]>>> {
+ private final long id;
+ private volatile CompletableFuture<IteratorBatch> batch;
+ private volatile CompletableFuture<Void> closeFuture;
+
+ ConsistentMultimapIterator(long id) {
+ this.id = id;
+ this.batch = CompletableFuture.completedFuture(
+ new IteratorBatch(0, Collections.emptyList()));
+ }
+
+ /**
+ * Returns the current batch iterator or lazily fetches the next batch from the cluster.
+ *
+ * @return the next batch iterator
+ */
+ private CompletableFuture<Iterator<Entry<String, Versioned<byte[]>>>> batch() {
+ return batch.thenCompose(iterator -> {
+ if (iterator != null && !iterator.hasNext()) {
+ batch = fetch(iterator.position());
+ return batch.thenApply(Function.identity());
+ }
+ return CompletableFuture.completedFuture(iterator);
+ });
+ }
+
+ /**
+ * Fetches the next batch of entries from the cluster.
+ *
+ * @param position the position from which to fetch the next batch
+ * @return the next batch of entries from the cluster
+ */
+ private CompletableFuture<IteratorBatch> fetch(int position) {
+ return proxy.<IteratorPosition, IteratorBatch>invoke(
+ NEXT,
+ SERIALIZER::encode,
+ new IteratorPosition(id, position),
+ SERIALIZER::decode)
+ .thenCompose(batch -> {
+ if (batch == null) {
+ return close().thenApply(v -> null);
+ }
+ return CompletableFuture.completedFuture(batch);
+ });
+ }
+
+ /**
+ * Closes the iterator.
+ *
+ * @return future to be completed once the iterator has been closed
+ */
+ private CompletableFuture<Void> close() {
+ if (closeFuture == null) {
+ synchronized (this) {
+ if (closeFuture == null) {
+ closeFuture = proxy.invoke(CLOSE_ITERATOR, SERIALIZER::encode, id);
+ }
+ }
+ }
+ return closeFuture;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return batch().thenApply(iterator -> iterator != null && iterator.hasNext());
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> next() {
+ return batch().thenCompose(iterator -> {
+ if (iterator == null) {
+ return Tools.exceptionalFuture(new NoSuchElementException());
+ }
+ return CompletableFuture.completedFuture(iterator.next());
+ });
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
index 248c4eb..4f131ef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
@@ -15,6 +15,11 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.utils.ArraySizeHashPrinter;
@@ -58,7 +63,10 @@
PREPARE(OperationType.COMMAND),
PREPARE_AND_COMMIT(OperationType.COMMAND),
COMMIT(OperationType.COMMAND),
- ROLLBACK(OperationType.COMMAND);
+ ROLLBACK(OperationType.COMMAND),
+ OPEN_ITERATOR(OperationType.COMMAND),
+ NEXT(OperationType.QUERY),
+ CLOSE_ITERATOR(OperationType.COMMAND);
private final OperationType type;
@@ -107,6 +115,9 @@
.register(MapEntryUpdateResult.Status.class)
.register(Versioned.class)
.register(byte[].class)
+ .register(Maps.immutableEntry("", "").getClass())
+ .register(IteratorBatch.class)
+ .register(IteratorPosition.class)
.build("AtomixConsistentMapOperations");
/**
@@ -561,4 +572,77 @@
.toString();
}
}
+
+ /**
+ * Iterator position.
+ */
+ public static class IteratorPosition {
+ private long iteratorId;
+ private int position;
+
+ private IteratorPosition() {
+ }
+
+ public IteratorPosition(long iteratorId, int position) {
+ this.iteratorId = iteratorId;
+ this.position = position;
+ }
+
+ public long iteratorId() {
+ return iteratorId;
+ }
+
+ public int position() {
+ return position;
+ }
+ }
+
+ /**
+ * Iterator batch.
+ */
+ public static class IteratorBatch implements Iterator<Map.Entry<String, Versioned<byte[]>>> {
+ private int position;
+ private Collection<Map.Entry<String, Versioned<byte[]>>> entries;
+ private transient volatile Iterator<Map.Entry<String, Versioned<byte[]>>> iterator;
+
+ private IteratorBatch() {
+ }
+
+ public IteratorBatch(int position, Collection<Map.Entry<String, Versioned<byte[]>>> entries) {
+ this.position = position;
+ this.entries = entries;
+ }
+
+ public int position() {
+ return position;
+ }
+
+ public Collection<Map.Entry<String, Versioned<byte[]>>> entries() {
+ return entries;
+ }
+
+ private Iterator<Map.Entry<String, Versioned<byte[]>>> iterator() {
+ Iterator<Map.Entry<String, Versioned<byte[]>>> iterator = this.iterator;
+ if (iterator == null) {
+ synchronized (entries) {
+ iterator = this.iterator;
+ if (iterator == null) {
+ iterator = entries.iterator();
+ this.iterator = iterator;
+ }
+ }
+ }
+ return iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator().hasNext();
+ }
+
+ @Override
+ public Map.Entry<String, Versioned<byte[]>> next() {
+ return iterator().next();
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index d25d475..0fcf3b5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -66,6 +67,7 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
@@ -73,7 +75,11 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
@@ -95,22 +101,25 @@
*/
public class AtomixConsistentMapService extends AbstractRaftService {
+ private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
+
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
- .register(KryoNamespaces.BASIC)
- .register(AtomixConsistentMapOperations.NAMESPACE)
- .register(AtomixConsistentMapEvents.NAMESPACE)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
- .register(TransactionScope.class)
- .register(TransactionLog.class)
- .register(TransactionId.class)
- .register(MapEntryValue.class)
- .register(MapEntryValue.Type.class)
- .register(new HashMap().keySet().getClass())
- .build());
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixConsistentMapOperations.NAMESPACE)
+ .register(AtomixConsistentMapEvents.NAMESPACE)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+ .register(TransactionScope.class)
+ .register(TransactionLog.class)
+ .register(TransactionId.class)
+ .register(MapEntryValue.class)
+ .register(MapEntryValue.Type.class)
+ .register(new HashMap().keySet().getClass())
+ .build());
protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
private Map<String, MapEntryValue> map;
protected Set<String> preparedKeys = Sets.newHashSet();
+ private Map<Long, IteratorContext> iterators = Maps.newHashMap();
protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
protected long currentVersion;
@@ -119,7 +128,7 @@
}
protected Map<String, MapEntryValue> createMap() {
- return Maps.newHashMap();
+ return Maps.newConcurrentMap();
}
protected Map<String, MapEntryValue> entries() {
@@ -137,6 +146,10 @@
writer.writeObject(entries(), serializer()::encode);
writer.writeObject(activeTransactions, serializer()::encode);
writer.writeLong(currentVersion);
+
+ Map<Long, Long> iterators = Maps.newHashMap();
+ this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
+ writer.writeObject(iterators, serializer()::encode);
}
@Override
@@ -149,6 +162,11 @@
map = reader.readObject(serializer()::decode);
activeTransactions = reader.readObject(serializer()::decode);
currentVersion = reader.readLong();
+
+ Map<Long, Long> iterators = reader.readObject(serializer()::decode);
+ this.iterators = Maps.newHashMap();
+ iterators.forEach((id, session) ->
+ this.iterators.put(id, new IteratorContext(session, entries().entrySet().iterator())));
}
@Override
@@ -182,6 +200,9 @@
executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
+ executor.register(OPEN_ITERATOR, this::openIterator, serializer()::encode);
+ executor.register(NEXT, serializer()::decode, this::next, serializer()::encode);
+ executor.register(CLOSE_ITERATOR, serializer()::decode, this::closeIterator);
}
/**
@@ -204,8 +225,8 @@
protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
return entries().values().stream()
- .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
- .anyMatch(value -> valueMatch.matches(value.value()));
+ .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+ .anyMatch(value -> valueMatch.matches(value.value()));
}
/**
@@ -242,8 +263,8 @@
*/
protected int size() {
return (int) entries().values().stream()
- .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
- .count();
+ .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+ .count();
}
/**
@@ -253,7 +274,7 @@
*/
protected boolean isEmpty() {
return entries().values().stream()
- .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
+ .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
}
/**
@@ -263,9 +284,9 @@
*/
protected Set<String> keySet() {
return entries().entrySet().stream()
- .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
}
/**
@@ -275,9 +296,9 @@
*/
protected Collection<Versioned<byte[]>> values() {
return entries().entrySet().stream()
- .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
- .map(entry -> toVersioned(entry.getValue()))
- .collect(Collectors.toList());
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(entry -> toVersioned(entry.getValue()))
+ .collect(Collectors.toList());
}
/**
@@ -287,9 +308,9 @@
*/
protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
return entries().entrySet().stream()
- .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
- .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
- .collect(Collectors.toSet());
+ .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+ .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
+ .collect(Collectors.toSet());
}
/**
@@ -301,7 +322,7 @@
*/
protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
return (oldValue == null && newValue == null)
- || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
+ || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
}
/**
@@ -313,7 +334,7 @@
*/
protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
return (oldValue == null && newValue == null)
- || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
+ || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
}
/**
@@ -343,13 +364,13 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
entries().put(commit.value().key(),
- new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
Versioned<byte[]> result = toVersioned(oldValue);
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
@@ -357,13 +378,13 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
entries().put(commit.value().key(),
- new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
Versioned<byte[]> result = toVersioned(oldValue);
publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
@@ -387,25 +408,25 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
MapEntryValue newValue = new MapEntryValue(
- MapEntryValue.Type.VALUE,
- commit.index(),
- commit.value().value());
+ MapEntryValue.Type.VALUE,
+ commit.index(),
+ commit.value().value());
entries().put(commit.value().key(), newValue);
Versioned<byte[]> result = toVersioned(newValue);
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
}
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.PRECONDITION_FAILED,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
/**
@@ -425,10 +446,10 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
entries().put(commit.value().key(), newValue);
Versioned<byte[]> result = toVersioned(newValue);
@@ -438,10 +459,10 @@
// If the key has been locked by a transaction, return a WRITE_LOCK error.
if (preparedKeys.contains(key)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.WRITE_LOCK,
- commit.index(),
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
}
entries().put(commit.value().key(), newValue);
Versioned<byte[]> result = toVersioned(newValue);
@@ -454,8 +475,8 @@
/**
* Handles a remove commit.
*
- * @param index the commit index
- * @param key the key to remove
+ * @param index the commit index
+ * @param key the key to remove
* @param predicate predicate to determine whether to remove the entry
* @return map entry update result
*/
@@ -502,7 +523,7 @@
*/
protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
return removeIf(commit.index(), commit.value().key(), v ->
- valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
+ valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
}
/**
@@ -518,23 +539,23 @@
/**
* Handles a replace commit.
*
- * @param index the commit index
- * @param key the key to replace
- * @param newValue the value with which to replace the key
+ * @param index the commit index
+ * @param key the key to replace
+ * @param newValue the value with which to replace the key
* @param predicate a predicate to determine whether to replace the key
* @return map entry update result
*/
private MapEntryUpdateResult<String, byte[]> replaceIf(
- long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
+ long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
MapEntryValue oldValue = entries().get(key);
// If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
return new MapEntryUpdateResult<>(
- MapEntryUpdateResult.Status.PRECONDITION_FAILED,
- index,
- key,
- toVersioned(oldValue));
+ MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+ index,
+ key,
+ toVersioned(oldValue));
}
// If the key has been locked by a transaction, return a WRITE_LOCK error.
@@ -568,7 +589,7 @@
protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
return replaceIf(commit.index(), commit.value().key(), value,
- v -> valuesEqual(v.value(), commit.value().oldValue()));
+ v -> valuesEqual(v.value(), commit.value().oldValue()));
}
/**
@@ -580,7 +601,7 @@
protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
return replaceIf(commit.index(), commit.value().key(), value,
- v -> v.version() == commit.value().oldVersion());
+ v -> v.version() == commit.value().oldVersion());
}
/**
@@ -610,6 +631,67 @@
}
/**
+ * Handles an open iterator commit.
+ *
+ * @param commit the open iterator commit
+ * @return iterator identifier
+ */
+ protected long openIterator(Commit<Void> commit) {
+ iterators.put(commit.index(), new IteratorContext(
+ commit.session().sessionId().id(),
+ entries().entrySet().iterator()));
+ return commit.index();
+ }
+
+ /**
+ * Handles an iterator next commit.
+ *
+ * @param commit the next commit
+ * @return a list of entries to iterate
+ */
+ protected IteratorBatch next(Commit<IteratorPosition> commit) {
+ final long iteratorId = commit.value().iteratorId();
+ final int position = commit.value().position();
+
+ IteratorContext context = iterators.get(iteratorId);
+ if (context == null) {
+ return null;
+ }
+
+ List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+ int size = 0;
+ while (context.iterator.hasNext()) {
+ context.position++;
+ if (context.position > position) {
+ Map.Entry<String, MapEntryValue> entry = context.iterator.next();
+ String key = entry.getKey();
+ Versioned<byte[]> value = toVersioned(entry.getValue());
+ size += key.length();
+ size += value.value() != null ? value.value().length : 0;
+ entries.add(Maps.immutableEntry(key, value));
+
+ if (size >= MAX_ITERATOR_BATCH_SIZE) {
+ break;
+ }
+ }
+ }
+
+ if (entries.isEmpty()) {
+ return null;
+ }
+ return new IteratorBatch(context.position, entries);
+ }
+
+ /**
+ * Handles a close iterator commit.
+ *
+ * @param commit the close iterator commit
+ */
+ protected void closeIterator(Commit<Long> commit) {
+ iterators.remove(commit.value());
+ }
+
+ /**
* Handles a listen commit.
*
* @param session listen session
@@ -719,13 +801,13 @@
TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
if (transactionScope == null) {
activeTransactions.put(
- transactionLog.transactionId(),
- new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
+ transactionLog.transactionId(),
+ new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
return PrepareResult.PARTIAL_FAILURE;
} else {
activeTransactions.put(
- transactionLog.transactionId(),
- transactionScope.prepared(commit));
+ transactionLog.transactionId(),
+ transactionScope.prepared(commit));
return PrepareResult.OK;
}
} catch (Exception e) {
@@ -795,34 +877,34 @@
if (!valueIsNull(newValue)) {
if (!valueIsNull(previousValue)) {
event = new MapEvent<>(
- MapEvent.Type.UPDATE,
- "",
- key,
- toVersioned(newValue),
- toVersioned(previousValue));
+ MapEvent.Type.UPDATE,
+ "",
+ key,
+ toVersioned(newValue),
+ toVersioned(previousValue));
} else {
event = new MapEvent<>(
- MapEvent.Type.INSERT,
- "",
- key,
- toVersioned(newValue),
- null);
+ MapEvent.Type.INSERT,
+ "",
+ key,
+ toVersioned(newValue),
+ null);
}
} else {
event = new MapEvent<>(
- MapEvent.Type.REMOVE,
- "",
- key,
- null,
- toVersioned(previousValue));
- }
- } else {
- event = new MapEvent<>(
MapEvent.Type.REMOVE,
"",
key,
null,
toVersioned(previousValue));
+ }
+ } else {
+ event = new MapEvent<>(
+ MapEvent.Type.REMOVE,
+ "",
+ key,
+ null,
+ toVersioned(previousValue));
}
eventsToPublish.add(event);
}
@@ -847,11 +929,11 @@
} else {
try {
transactionScope.transactionLog().records()
- .forEach(record -> {
- if (record.type() != MapUpdate.Type.VERSION_MATCH) {
- preparedKeys.remove(record.key());
- }
- });
+ .forEach(record -> {
+ if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+ preparedKeys.remove(record.key());
+ }
+ });
return RollbackResult.OK;
} finally {
discardTombstones();
@@ -874,8 +956,8 @@
}
} else {
long lowWaterMark = activeTransactions.values().stream()
- .mapToLong(TransactionScope::version)
- .min().getAsLong();
+ .mapToLong(TransactionScope::version)
+ .min().getAsLong();
Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
while (iterator.hasNext()) {
MapEntryValue value = iterator.next().getValue();
@@ -888,12 +970,13 @@
/**
* Utility for turning a {@code MapEntryValue} to {@code Versioned}.
+ *
* @param value map entry value
* @return versioned instance
*/
protected Versioned<byte[]> toVersioned(MapEntryValue value) {
return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
- ? new Versioned<>(value.value(), value.version()) : null;
+ ? new Versioned<>(value.value(), value.version()) : null;
}
/**
@@ -1034,4 +1117,15 @@
return new TransactionScope(version, commit.value().transactionLog());
}
}
+
+ private static class IteratorContext {
+ private final long sessionId;
+ private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
+ private int position = 0;
+
+ IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
+ this.sessionId = sessionId;
+ this.iterator = iterator;
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
index 1b5cb12..89c8da6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
@@ -20,6 +20,7 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import com.google.common.collect.Maps;
import io.atomix.protocols.raft.service.Commit;
@@ -75,17 +76,17 @@
.register(MapEntryValue.class)
.register(MapEntryValue.Type.class)
.register(new HashMap().keySet().getClass())
- .register(TreeMap.class)
+ .register(ConcurrentSkipListMap.class)
.build());
@Override
- protected TreeMap<String, MapEntryValue> createMap() {
- return Maps.newTreeMap();
+ protected NavigableMap<String, MapEntryValue> createMap() {
+ return new ConcurrentSkipListMap<>();
}
@Override
- protected TreeMap<String, MapEntryValue> entries() {
- return (TreeMap<String, MapEntryValue>) super.entries();
+ protected NavigableMap<String, MapEntryValue> entries() {
+ return (NavigableMap<String, MapEntryValue>) super.entries();
}
@Override