Implement event-based streaming iterator for ConsistentMultimap primitive
Change-Id: I4f41876f91ec752cb3d6ac0fd352ff6e8798dfd6
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index a2396fd..bb249e1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Multiset;
import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
@@ -131,6 +132,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<K, V>>> iterator() {
+ return delegateMap.iterator();
+ }
+
+ @Override
public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
return delegateMap.entries();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
index 50b919f..badbf05 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
@@ -16,8 +16,11 @@
package org.onosproject.store.primitives.impl;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -36,6 +39,7 @@
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
@@ -170,6 +174,12 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Entry<K, V>>> iterator() {
+ return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
+ .thenApply(PartitionedMultimapIterator::new);
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
return CompletableFuture.allOf(getMultimaps().stream()
.map(map -> map.addListener(listener, executor))
@@ -216,4 +226,42 @@
private Collection<AsyncConsistentMultimap<K, V>> getMultimaps() {
return partitions.values();
}
+
+ private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, V>> {
+ private final Iterator<AsyncIterator<Entry<K, V>>> iterators;
+ private volatile AsyncIterator<Entry<K, V>> iterator;
+
+ public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, V>>> iterators) {
+ this.iterators = iterators.iterator();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ if (iterator == null && iterators.hasNext()) {
+ iterator = iterators.next();
+ }
+ if (iterator == null) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return iterator.hasNext()
+ .thenCompose(hasNext -> {
+ if (!hasNext) {
+ iterator = null;
+ return hasNext();
+ }
+ return CompletableFuture.completedFuture(true);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Entry<K, V>> next() {
+ if (iterator == null && iterators.hasNext()) {
+ iterator = iterators.next();
+ }
+ if (iterator == null) {
+ return Tools.exceptionalFuture(new NoSuchElementException());
+ }
+ return iterator.next();
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index b3ef961..b00d06a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -16,15 +16,6 @@
package org.onosproject.store.primitives.impl;
-import com.google.common.collect.ImmutableMultiset;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multiset;
-import org.onlab.util.Tools;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.MultimapEvent;
-import org.onosproject.store.service.MultimapEventListener;
-import org.onosproject.store.service.Versioned;
-
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
@@ -39,6 +30,16 @@
import java.util.stream.Collector;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
/**
* An {@link AsyncConsistentMultimap} that maps its operation to operations to
* a differently typed {@link AsyncConsistentMultimap} by transcoding operation
@@ -239,6 +240,11 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<K1, V1>>> iterator() {
+ return backingMap.iterator().thenApply(TranscodingIterator::new);
+ }
+
+ @Override
public CompletableFuture<Map<K1, Collection<V1>>> asMap() {
throw new UnsupportedOperationException("Unsupported operation.");
}
@@ -317,6 +323,25 @@
}
}
+ private class TranscodingIterator implements AsyncIterator<Map.Entry<K1, V1>> {
+ private final AsyncIterator<Map.Entry<K2, V2>> iterator;
+
+ public TranscodingIterator(AsyncIterator<Map.Entry<K2, V2>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<K1, V1>> next() {
+ return iterator.next().thenApply(entry ->
+ Maps.immutableEntry(keyDecoder.apply(entry.getKey()), valueDecoder.apply(entry.getValue())));
+ }
+ }
+
private class InternalBackingMultimapEventListener implements MultimapEventListener<K2, V2> {
private final MultimapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index ab61724..44cc614 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -18,8 +18,11 @@
import java.util.Collection;
import java.util.ConcurrentModificationException;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,15 +31,19 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.utils.concurrent.AtomixFuture;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
@@ -49,6 +56,7 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
@@ -70,20 +78,22 @@
* Note: this implementation does not allow null entries or duplicate entries.
*/
public class AtomixConsistentSetMultimap
- extends AbstractRaftPrimitive
- implements AsyncConsistentMultimap<String, byte[]> {
+ extends AbstractRaftPrimitive
+ implements AsyncConsistentMultimap<String, byte[]> {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
- .register(KryoNamespaces.BASIC)
- .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
- .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
- .build());
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
+ .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
+ .build());
+ private volatile EntryIterator iterator;
private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
public AtomixConsistentSetMultimap(RaftProxy proxy) {
super(proxy);
- proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
+ proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleChange);
+ proxy.addEventListener(ENTRY, SERIALIZER::decode, this::handleEntry);
proxy.addStateChangeListener(state -> {
if (state == RaftProxy.State.CONNECTED && isListening()) {
proxy.invoke(ADD_LISTENER);
@@ -91,9 +101,16 @@
});
}
- private void handleEvent(List<MultimapEvent<String, byte[]>> events) {
+ private void handleChange(List<MultimapEvent<String, byte[]>> events) {
events.forEach(event ->
- mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+ }
+
+ private void handleEntry(Map.Entry<String, byte[]> entry) {
+ EntryIterator iterator = this.iterator;
+ if (iterator != null) {
+ iterator.add(entry);
+ }
}
@Override
@@ -124,26 +141,26 @@
@Override
public CompletableFuture<Boolean> put(String key, byte[] value) {
return proxy.invoke(
- PUT,
- SERIALIZER::encode,
- new Put(key, Lists.newArrayList(value), null),
- SERIALIZER::decode);
+ PUT,
+ SERIALIZER::encode,
+ new Put(key, Lists.newArrayList(value), null),
+ SERIALIZER::decode);
}
@Override
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key,
- Lists.newArrayList(value),
- null), SERIALIZER::decode);
+ Lists.newArrayList(value),
+ null), SERIALIZER::decode);
}
@Override
public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
return proxy.invoke(
- REMOVE,
- SERIALIZER::encode,
- new MultiRemove(key, (Collection<byte[]>) values, null),
- SERIALIZER::decode);
+ REMOVE,
+ SERIALIZER::encode,
+ new MultiRemove(key, (Collection<byte[]>) values, null),
+ SERIALIZER::decode);
}
@Override
@@ -153,18 +170,18 @@
@Override
public CompletableFuture<Boolean> putAll(
- String key, Collection<? extends byte[]> values) {
+ String key, Collection<? extends byte[]> values) {
return proxy.invoke(PUT, SERIALIZER::encode, new Put(key, values, null), SERIALIZER::decode);
}
@Override
public CompletableFuture<Versioned<Collection<? extends byte[]>>> replaceValues(
- String key, Collection<byte[]> values) {
+ String key, Collection<byte[]> values) {
return proxy.invoke(
- REPLACE,
- SERIALIZER::encode,
- new Replace(key, values, null),
- SERIALIZER::decode);
+ REPLACE,
+ SERIALIZER::encode,
+ new Replace(key, values, null),
+ SERIALIZER::decode);
}
@Override
@@ -198,6 +215,56 @@
}
@Override
+ public CompletableFuture<AsyncIterator<Map.Entry<String, byte[]>>> iterator() {
+ return proxy.<Integer>invoke(ITERATE, SERIALIZER::decode).thenApply(count -> {
+ iterator = new EntryIterator(count);
+ return iterator;
+ });
+ }
+
+ private class EntryIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
+ private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> in = new LinkedList<>();
+ private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> out = new LinkedList<>();
+ private final int total;
+ private volatile int count;
+
+ EntryIterator(int total) {
+ this.total = total;
+ }
+
+ synchronized void add(Map.Entry<String, byte[]> entry) {
+ CompletableFuture<Map.Entry<String, byte[]>> future = out.poll();
+ if (future != null) {
+ future.complete(entry);
+ } else {
+ in.add(CompletableFuture.completedFuture(entry));
+ }
+ }
+
+ @Override
+ public synchronized CompletableFuture<Boolean> hasNext() {
+ return CompletableFuture.completedFuture(count < total);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Map.Entry<String, byte[]>> next() {
+ if (count == total) {
+ return Tools.exceptionalFuture(new NoSuchElementException());
+ }
+ count++;
+
+ CompletableFuture<Map.Entry<String, byte[]>> next = in.poll();
+ if (next != null) {
+ return next;
+ }
+
+ CompletableFuture<Map.Entry<String, byte[]>> future = new AtomixFuture<>();
+ out.add(future);
+ return future;
+ }
+ }
+
+ @Override
public CompletableFuture<Void> addListener(MultimapEventListener<String, byte[]> listener, Executor executor) {
if (mapEventListeners.isEmpty()) {
return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
@@ -222,13 +289,14 @@
/**
* Helper to check if there was a lock based issue.
+ *
* @param status the status of an update result
*/
private void throwIfLocked(MapEntryUpdateResult.Status status) {
if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
throw new ConcurrentModificationException("Cannot update map: " +
- "Another transaction " +
- "in progress");
+ "Another transaction " +
+ "in progress");
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapEvents.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapEvents.java
index 7ccb5b7..83908ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapEvents.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapEvents.java
@@ -24,7 +24,8 @@
* Atomix consistent set multimap events.
*/
public enum AtomixConsistentSetMultimapEvents implements EventType {
- CHANGE;
+ CHANGE,
+ ENTRY;
@Override
public String id() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 329113d..82372a2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -50,7 +50,8 @@
REPLACE(OperationType.COMMAND),
CLEAR(OperationType.COMMAND),
ADD_LISTENER(OperationType.COMMAND),
- REMOVE_LISTENER(OperationType.COMMAND);
+ REMOVE_LISTENER(OperationType.COMMAND),
+ ITERATE(OperationType.COMMAND);
private final OperationType type;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 6478eef..2312d8f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -58,6 +58,7 @@
import org.onosproject.store.service.Versioned;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
@@ -70,6 +71,7 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
@@ -155,6 +157,7 @@
executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
executor.register(ADD_LISTENER, this::listen);
executor.register(REMOVE_LISTENER, this::unlisten);
+ executor.register(ITERATE, this::iterate, serializer::encode);
}
@Override
@@ -425,6 +428,23 @@
}
/**
+ * Handles an iterate commit.
+ *
+ * @param commit the iterate commit
+ * @return count of commit entries
+ */
+ protected int iterate(Commit<Void> commit) {
+ int count = 0;
+ for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
+ for (byte[] value : entry.getValue().values()) {
+ commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
* Publishes events to listeners.
*
* @param events list of map event to publish