Implement event-based streaming iterator for ConsistentMultimap primitive

Change-Id: I4f41876f91ec752cb3d6ac0fd352ff6e8798dfd6
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
index 8725c61..600af28 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
@@ -19,6 +19,7 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.Multiset;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.ConsistentMultimap;
 import org.onosproject.store.service.MultimapEventListener;
@@ -26,6 +27,7 @@
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -140,6 +142,11 @@
     }
 
     @Override
+    public Iterator<Map.Entry<K, V>> iterator() {
+        return new DefaultIterator<>(complete(asyncMultimap.iterator()));
+    }
+
+    @Override
     public Map<K, Collection<V>> asMap() {
         throw new UnsupportedOperationException("This operation is not yet " +
                                                         "supported.");
@@ -156,6 +163,24 @@
         complete(asyncMultimap.removeListener(listener));
     }
 
+    private class DefaultIterator<K, V> implements Iterator<Map.Entry<K, V>> {
+        private final AsyncIterator<Map.Entry<K, V>> iterator;
+
+        public DefaultIterator(AsyncIterator<Map.Entry<K, V>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return complete(iterator.hasNext());
+        }
+
+        @Override
+        public Map.Entry<K, V> next() {
+            return complete(iterator.next());
+        }
+    }
+
     private <T> T complete(CompletableFuture<T> future) {
         try {
             return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
index 180d7cc..12b4645 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
@@ -34,7 +34,7 @@
  * Certain operations may be too expensive when backed by a distributed data
  * structure and have been labeled as such.
  */
-public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
+public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive, AsyncIterable<Map.Entry<K, V>> {
 
     @Override
     default DistributedPrimitive.Type primitiveType() {
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncIterable.java b/core/api/src/main/java/org/onosproject/store/service/AsyncIterable.java
new file mode 100644
index 0000000..53eb384
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncIterable.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Asynchronously iterable object.
+ */
+public interface AsyncIterable<T> {
+
+    /**
+     * Returns an asynchronous iterator.
+     *
+     * @return an asynchronous iterator
+     */
+    CompletableFuture<AsyncIterator<T>> iterator();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncIterator.java b/core/api/src/main/java/org/onosproject/store/service/AsyncIterator.java
new file mode 100644
index 0000000..5159c6c
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Asynchronous iterator.
+ */
+public interface AsyncIterator<T> {
+
+    /**
+     * Returns whether the iterator has a next item.
+     *
+     * @return whether a next item exists in the iterator
+     */
+    CompletableFuture<Boolean> hasNext();
+
+    /**
+     * Returns the next item in the iterator.
+     *
+     * @return the next item in the iterator
+     */
+    CompletableFuture<T> next();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
index 2adcf8c..bdbbee0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
@@ -23,13 +23,15 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /**
  * This provides a synchronous version of the functionality provided by
  * {@link AsyncConsistentMultimap}.  Instead of returning futures this map
  * blocks until the future completes then returns the result.
  */
-public interface ConsistentMultimap<K, V> extends DistributedPrimitive {
+public interface ConsistentMultimap<K, V> extends DistributedPrimitive, Iterable<Map.Entry<K, V>> {
     /**
      * Returns the number of key-value pairs in this multimap.
      * @return the number of key-value pairs
@@ -191,12 +193,25 @@
 
     /**
      * Returns a collection of each key-value pair in this map.
+     * <p>
+     * Do not use this method to read large maps. Use an {@link #iterator()} or {@link #stream()} instead.
      *
      * @return a collection of all entries in the map, this may be empty
      */
     Collection<Map.Entry<K, V>> entries();
 
     /**
+     * Streams entries from the map.
+     * <p>
+     * This method is optimized for large maps.
+     *
+     * @return the map entry stream
+     */
+    default Stream<Map.Entry<K, V>> stream() {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    /**
      * Returns a map of keys to collections of values that reflect the set of
      * key-value pairs contained in the multimap, where the key value pairs
      * would be the key paired with each of the values in the collection.
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
index 099758b..a97337e 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.Multiset;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -130,6 +131,11 @@
     }
 
     @Override
+    public Iterator<Map.Entry<K, V>> iterator() {
+        return null;
+    }
+
+    @Override
     public Map<K, Collection<V>> asMap() {
         return null;
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index a2396fd..bb249e1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -18,6 +18,7 @@
 
 import com.google.common.collect.Multiset;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Versioned;
 
@@ -131,6 +132,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<K, V>>> iterator() {
+        return delegateMap.iterator();
+    }
+
+    @Override
     public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
         return delegateMap.entries();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
index 50b919f..badbf05 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
@@ -16,8 +16,11 @@
 package org.onosproject.store.primitives.impl;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
@@ -36,6 +39,7 @@
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Versioned;
 
@@ -170,6 +174,12 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Entry<K, V>>> iterator() {
+        return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
+            .thenApply(PartitionedMultimapIterator::new);
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
         return CompletableFuture.allOf(getMultimaps().stream()
             .map(map -> map.addListener(listener, executor))
@@ -216,4 +226,42 @@
     private Collection<AsyncConsistentMultimap<K, V>> getMultimaps() {
         return partitions.values();
     }
+
+    private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, V>> {
+        private final Iterator<AsyncIterator<Entry<K, V>>> iterators;
+        private volatile AsyncIterator<Entry<K, V>> iterator;
+
+        public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, V>>> iterators) {
+            this.iterators = iterators.iterator();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            if (iterator == null && iterators.hasNext()) {
+                iterator = iterators.next();
+            }
+            if (iterator == null) {
+                return CompletableFuture.completedFuture(false);
+            }
+            return iterator.hasNext()
+                .thenCompose(hasNext -> {
+                    if (!hasNext) {
+                        iterator = null;
+                        return hasNext();
+                    }
+                    return CompletableFuture.completedFuture(true);
+                });
+        }
+
+        @Override
+        public CompletableFuture<Entry<K, V>> next() {
+            if (iterator == null && iterators.hasNext()) {
+                iterator = iterators.next();
+            }
+            if (iterator == null) {
+                return Tools.exceptionalFuture(new NoSuchElementException());
+            }
+            return iterator.next();
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index b3ef961..b00d06a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -16,15 +16,6 @@
 
 package org.onosproject.store.primitives.impl;
 
-import com.google.common.collect.ImmutableMultiset;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multiset;
-import org.onlab.util.Tools;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.MultimapEvent;
-import org.onosproject.store.service.MultimapEventListener;
-import org.onosproject.store.service.Versioned;
-
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Map;
@@ -39,6 +30,16 @@
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
 /**
  * An {@link AsyncConsistentMultimap} that maps its operation to operations to
  * a differently typed {@link AsyncConsistentMultimap} by transcoding operation
@@ -239,6 +240,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<K1, V1>>> iterator() {
+        return backingMap.iterator().thenApply(TranscodingIterator::new);
+    }
+
+    @Override
     public CompletableFuture<Map<K1, Collection<V1>>> asMap() {
         throw new UnsupportedOperationException("Unsupported operation.");
     }
@@ -317,6 +323,25 @@
         }
     }
 
+    private class TranscodingIterator implements AsyncIterator<Map.Entry<K1, V1>> {
+        private final AsyncIterator<Map.Entry<K2, V2>> iterator;
+
+        public TranscodingIterator(AsyncIterator<Map.Entry<K2, V2>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public CompletableFuture<Map.Entry<K1, V1>> next() {
+            return iterator.next().thenApply(entry ->
+                Maps.immutableEntry(keyDecoder.apply(entry.getKey()), valueDecoder.apply(entry.getValue())));
+        }
+    }
+
     private class InternalBackingMultimapEventListener implements MultimapEventListener<K2, V2> {
 
         private final MultimapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index ab61724..44cc614 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -18,8 +18,11 @@
 
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,15 +31,19 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multiset;
 import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.utils.concurrent.AtomixFuture;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MultimapEvent;
 import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
@@ -49,6 +56,7 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
@@ -70,20 +78,22 @@
  * Note: this implementation does not allow null entries or duplicate entries.
  */
 public class AtomixConsistentSetMultimap
-        extends AbstractRaftPrimitive
-        implements AsyncConsistentMultimap<String, byte[]> {
+    extends AbstractRaftPrimitive
+    implements AsyncConsistentMultimap<String, byte[]> {
 
     private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
-            .register(KryoNamespaces.BASIC)
-            .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
-            .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
-            .build());
+        .register(KryoNamespaces.BASIC)
+        .register(AtomixConsistentSetMultimapOperations.NAMESPACE)
+        .register(AtomixConsistentSetMultimapEvents.NAMESPACE)
+        .build());
 
+    private volatile EntryIterator iterator;
     private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
 
     public AtomixConsistentSetMultimap(RaftProxy proxy) {
         super(proxy);
-        proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
+        proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleChange);
+        proxy.addEventListener(ENTRY, SERIALIZER::decode, this::handleEntry);
         proxy.addStateChangeListener(state -> {
             if (state == RaftProxy.State.CONNECTED && isListening()) {
                 proxy.invoke(ADD_LISTENER);
@@ -91,9 +101,16 @@
         });
     }
 
-    private void handleEvent(List<MultimapEvent<String, byte[]>> events) {
+    private void handleChange(List<MultimapEvent<String, byte[]>> events) {
         events.forEach(event ->
-                mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+            mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+    }
+
+    private void handleEntry(Map.Entry<String, byte[]> entry) {
+        EntryIterator iterator = this.iterator;
+        if (iterator != null) {
+            iterator.add(entry);
+        }
     }
 
     @Override
@@ -124,26 +141,26 @@
     @Override
     public CompletableFuture<Boolean> put(String key, byte[] value) {
         return proxy.invoke(
-                PUT,
-                SERIALIZER::encode,
-                new Put(key, Lists.newArrayList(value), null),
-                SERIALIZER::decode);
+            PUT,
+            SERIALIZER::encode,
+            new Put(key, Lists.newArrayList(value), null),
+            SERIALIZER::decode);
     }
 
     @Override
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
         return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key,
-                Lists.newArrayList(value),
-                null), SERIALIZER::decode);
+            Lists.newArrayList(value),
+            null), SERIALIZER::decode);
     }
 
     @Override
     public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
         return proxy.invoke(
-                REMOVE,
-                SERIALIZER::encode,
-                new MultiRemove(key, (Collection<byte[]>) values, null),
-                SERIALIZER::decode);
+            REMOVE,
+            SERIALIZER::encode,
+            new MultiRemove(key, (Collection<byte[]>) values, null),
+            SERIALIZER::decode);
     }
 
     @Override
@@ -153,18 +170,18 @@
 
     @Override
     public CompletableFuture<Boolean> putAll(
-            String key, Collection<? extends byte[]> values) {
+        String key, Collection<? extends byte[]> values) {
         return proxy.invoke(PUT, SERIALIZER::encode, new Put(key, values, null), SERIALIZER::decode);
     }
 
     @Override
     public CompletableFuture<Versioned<Collection<? extends byte[]>>> replaceValues(
-            String key, Collection<byte[]> values) {
+        String key, Collection<byte[]> values) {
         return proxy.invoke(
-                REPLACE,
-                SERIALIZER::encode,
-                new Replace(key, values, null),
-                SERIALIZER::decode);
+            REPLACE,
+            SERIALIZER::encode,
+            new Replace(key, values, null),
+            SERIALIZER::decode);
     }
 
     @Override
@@ -198,6 +215,56 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<String, byte[]>>> iterator() {
+        return proxy.<Integer>invoke(ITERATE, SERIALIZER::decode).thenApply(count -> {
+            iterator = new EntryIterator(count);
+            return iterator;
+        });
+    }
+
+    private class EntryIterator implements AsyncIterator<Map.Entry<String, byte[]>> {
+        private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> in = new LinkedList<>();
+        private final Queue<CompletableFuture<Map.Entry<String, byte[]>>> out = new LinkedList<>();
+        private final int total;
+        private volatile int count;
+
+        EntryIterator(int total) {
+            this.total = total;
+        }
+
+        synchronized void add(Map.Entry<String, byte[]> entry) {
+            CompletableFuture<Map.Entry<String, byte[]>> future = out.poll();
+            if (future != null) {
+                future.complete(entry);
+            } else {
+                in.add(CompletableFuture.completedFuture(entry));
+            }
+        }
+
+        @Override
+        public synchronized CompletableFuture<Boolean> hasNext() {
+            return CompletableFuture.completedFuture(count < total);
+        }
+
+        @Override
+        public synchronized CompletableFuture<Map.Entry<String, byte[]>> next() {
+            if (count == total) {
+                return Tools.exceptionalFuture(new NoSuchElementException());
+            }
+            count++;
+
+            CompletableFuture<Map.Entry<String, byte[]>> next = in.poll();
+            if (next != null) {
+                return next;
+            }
+
+            CompletableFuture<Map.Entry<String, byte[]>> future = new AtomixFuture<>();
+            out.add(future);
+            return future;
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MultimapEventListener<String, byte[]> listener, Executor executor) {
         if (mapEventListeners.isEmpty()) {
             return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
@@ -222,13 +289,14 @@
 
     /**
      * Helper to check if there was a lock based issue.
+     *
      * @param status the status of an update result
      */
     private void throwIfLocked(MapEntryUpdateResult.Status status) {
         if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
             throw new ConcurrentModificationException("Cannot update map: " +
-                    "Another transaction " +
-                    "in progress");
+                "Another transaction " +
+                "in progress");
         }
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapEvents.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapEvents.java
index 7ccb5b7..83908ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapEvents.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapEvents.java
@@ -24,7 +24,8 @@
  * Atomix consistent set multimap events.
  */
 public enum AtomixConsistentSetMultimapEvents implements EventType {
-    CHANGE;
+    CHANGE,
+    ENTRY;
 
     @Override
     public String id() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 329113d..82372a2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -50,7 +50,8 @@
     REPLACE(OperationType.COMMAND),
     CLEAR(OperationType.COMMAND),
     ADD_LISTENER(OperationType.COMMAND),
-    REMOVE_LISTENER(OperationType.COMMAND);
+    REMOVE_LISTENER(OperationType.COMMAND),
+    ITERATE(OperationType.COMMAND);
 
     private final OperationType type;
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 6478eef..2312d8f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -58,6 +58,7 @@
 import org.onosproject.store.service.Versioned;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
@@ -70,6 +71,7 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
@@ -155,6 +157,7 @@
         executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
         executor.register(ADD_LISTENER, this::listen);
         executor.register(REMOVE_LISTENER, this::unlisten);
+        executor.register(ITERATE, this::iterate, serializer::encode);
     }
 
     @Override
@@ -425,6 +428,23 @@
     }
 
     /**
+     * Handles an iterate commit.
+     *
+     * @param commit the iterate commit
+     * @return count of commit entries
+     */
+    protected int iterate(Commit<Void> commit) {
+        int count = 0;
+        for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
+            for (byte[] value : entry.getValue().values()) {
+                commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
+                count++;
+            }
+        }
+        return count;
+    }
+
+    /**
      * Publishes events to listeners.
      *
      * @param events list of map event to publish
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index a70c6ee..dbfd051 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -16,11 +16,14 @@
 
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multiset;
@@ -30,6 +33,7 @@
 import org.apache.commons.collections.keyvalue.DefaultMapEntry;
 import org.junit.Test;
 import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncIterator;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -307,6 +311,23 @@
         map.destroy().join();
     }
 
+    @Test
+    public void testStreams() throws Exception {
+        AtomixConsistentSetMultimap map = createResource("testStreams");
+        for (int i = 0; i < 10000; i++) {
+            allKeys.forEach(key -> {
+                map.put(key, UUID.randomUUID().toString().getBytes()).join();
+            });
+        }
+
+        List<Map.Entry<String, byte[]>> entries = new ArrayList<>();
+        AsyncIterator<Map.Entry<String, byte[]>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
+        while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+            entries.add(iterator.next().get(5, TimeUnit.SECONDS));
+        }
+        assertEquals(40000, entries.size());
+    }
+
     /**
      * Tests the get, keySet, keys, values, and entries implementations as well
      * as a trivial test of the asMap functionality (throws error).