Implement lazy iterators/streams for ConsistentMap

Change-Id: Id643726441c99186667c104cfdd1dab95b06cf87
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
index 1bc62b3..b692bf3 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.primitives;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -32,6 +33,7 @@
 
 import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
@@ -184,6 +186,11 @@
     }
 
     @Override
+    public Iterator<Entry<K, Versioned<V>>> iterator() {
+        return new DefaultIterator<>(complete(asyncMap.iterator()));
+    }
+
+    @Override
     public void addListener(MapEventListener<K, V> listener, Executor executor) {
         complete(asyncMap.addListener(listener, executor));
     }
@@ -208,6 +215,24 @@
         return asyncMap.statusChangeListeners();
     }
 
+    private class DefaultIterator<K, V> implements Iterator<Entry<K, Versioned<V>>> {
+        private final AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+        public DefaultIterator(AsyncIterator<Map.Entry<K, Versioned<V>>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return complete(iterator.hasNext());
+        }
+
+        @Override
+        public Map.Entry<K, Versioned<V>> next() {
+            return complete(iterator.next());
+        }
+    }
+
     @Override
     public Map<K, V> asJavaMap() {
         synchronized (this) {
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
index c9dd5df..7ccf8c7 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
@@ -18,6 +18,7 @@
 
 import com.google.common.base.Throwables;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.ConsistentTreeMap;
 import org.onosproject.store.service.MapEventListener;
@@ -25,6 +26,7 @@
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
@@ -277,6 +279,11 @@
     }
 
     @Override
+    public Iterator<Map.Entry<String, Versioned<V>>> iterator() {
+        return new DefaultIterator<>(complete(treeMap.iterator()));
+    }
+
+    @Override
     public void addListener(MapEventListener<String, V> listener,
                             Executor executor) {
         complete(treeMap.addListener(listener, executor));
@@ -287,6 +294,24 @@
         complete(treeMap.removeListener(listener));
     }
 
+    private class DefaultIterator<K, V> implements Iterator<Map.Entry<K, Versioned<V>>> {
+        private final AsyncIterator<Map.Entry<K, Versioned<V>>> iterator;
+
+        public DefaultIterator(AsyncIterator<Map.Entry<K, Versioned<V>>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return complete(iterator.hasNext());
+        }
+
+        @Override
+        public Map.Entry<K, Versioned<V>> next() {
+            return complete(iterator.next());
+        }
+    }
+
     @Override
     public Map<String, V> asJavaMap() {
         synchronized (this) {
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 749ab5b..45688c0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -17,6 +17,7 @@
 package org.onosproject.store.service;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
@@ -54,7 +55,8 @@
  * the returned future will be {@link CompletableFuture#complete completed} when the
  * operation finishes.
  */
-public interface AsyncConsistentMap<K, V> extends DistributedPrimitive, Transactional<MapUpdate<K, V>> {
+public interface AsyncConsistentMap<K, V>
+    extends DistributedPrimitive, Transactional<MapUpdate<K, V>>, AsyncIterable<Map.Entry<K, Versioned<V>>> {
 
     @Override
     default DistributedPrimitive.Type primitiveType() {
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 1f6579b1..304764e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -24,6 +24,8 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -34,7 +36,7 @@
  * @param <K> type of key
  * @param <V> type of value
  */
-public interface ConsistentMap<K, V> extends DistributedPrimitive {
+public interface ConsistentMap<K, V> extends DistributedPrimitive, Iterable<Map.Entry<K, Versioned<V>>> {
 
     /**
      * Returns the number of entries in the map.
@@ -283,6 +285,17 @@
     boolean replace(K key, long oldVersion, V newValue);
 
     /**
+     * Streams entries from the map.
+     * <p>
+     * This method is optimized for large maps.
+     *
+     * @return the map entry stream
+     */
+    default Stream<Entry<K, Versioned<V>>> stream() {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    /**
      * Registers the specified listener to be notified whenever the map is updated.
      *
      * @param listener listener to notify about map events
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
index 85e8e19..f39e7d0 100644
--- a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMapAdapter.java
@@ -134,6 +134,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<K, Versioned<V>>>> iterator() {
+        return null;
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
         return null;
     }
diff --git a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
index 18f9d63..a8db799 100644
--- a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.service;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -155,6 +156,11 @@
     }
 
     @Override
+    public Iterator<Map.Entry<K, Versioned<V>>> iterator() {
+        return null;
+    }
+
+    @Override
     public void addListener(MapEventListener<K, V> listener, Executor executor) {
 
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 917f44b..193a250 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -30,6 +30,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Version;
@@ -150,6 +151,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+        return delegateMap.iterator();
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
         return delegateMap.addListener(listener, executor);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index 2bee5eb..b3a1f95 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -30,6 +30,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Version;
@@ -239,6 +240,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V>>>> iterator() {
+        return delegateMap.iterator();
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(
             MapEventListener<String, V> listener, Executor executor) {
         return delegateMap.addListener(listener, executor);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index d5d4c73..38ba0f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -18,8 +18,11 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
@@ -35,6 +38,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Version;
@@ -185,6 +189,12 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Entry<K, Versioned<V>>>> iterator() {
+        return Tools.allOf(getMaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
+            .thenApply(PartitionedMultimapIterator::new);
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
         return CompletableFuture.allOf(getMaps().stream()
                                                 .map(map -> map.addListener(listener, executor))
@@ -254,4 +264,42 @@
     private Collection<AsyncConsistentMap<K, V>> getMaps() {
         return partitions.values();
     }
+
+    private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, Versioned<V>>> {
+        private final Iterator<AsyncIterator<Entry<K, Versioned<V>>>> iterators;
+        private volatile AsyncIterator<Entry<K, Versioned<V>>> iterator;
+
+        public PartitionedMultimapIterator(List<AsyncIterator<Entry<K, Versioned<V>>>> iterators) {
+            this.iterators = iterators.iterator();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            if (iterator == null && iterators.hasNext()) {
+                iterator = iterators.next();
+            }
+            if (iterator == null) {
+                return CompletableFuture.completedFuture(false);
+            }
+            return iterator.hasNext()
+                .thenCompose(hasNext -> {
+                    if (!hasNext) {
+                        iterator = null;
+                        return hasNext();
+                    }
+                    return CompletableFuture.completedFuture(true);
+                });
+        }
+
+        @Override
+        public CompletableFuture<Entry<K, Versioned<V>>> next() {
+            if (iterator == null && iterators.hasNext()) {
+                iterator = iterators.next();
+            }
+            if (iterator == null) {
+                return Tools.exceptionalFuture(new NoSuchElementException());
+            }
+            return iterator.next();
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 4e662b0..d68ada5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -21,6 +21,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
@@ -248,6 +249,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Entry<K1, Versioned<V1>>>> iterator() {
+        return backingMap.iterator().thenApply(TranscodingIterator::new);
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener, Executor executor) {
         synchronized (listeners) {
             InternalBackingMapEventListener backingMapListener =
@@ -328,6 +334,25 @@
         return backingMap.statusChangeListeners();
     }
 
+    private class TranscodingIterator implements AsyncIterator<Entry<K1, Versioned<V1>>> {
+        private final AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator;
+
+        public TranscodingIterator(AsyncIterator<Map.Entry<K2, Versioned<V2>>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public CompletableFuture<Map.Entry<K1, Versioned<V1>>> next() {
+            return iterator.next().thenApply(entry ->
+                Maps.immutableEntry(keyDecoder.apply(entry.getKey()), entry.getValue().map(valueDecoder)));
+        }
+    }
+
     private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
 
         private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index fcaca0e..d8e686e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -21,6 +21,7 @@
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.TransactionLog;
@@ -340,6 +341,11 @@
     }
 
     @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<String, Versioned<V1>>>> iterator() {
+        return backingMap.iterator().thenApply(TranscodingIterator::new);
+    }
+
+    @Override
     public CompletableFuture<Void> addListener(
             MapEventListener<String, V1> listener,
             Executor executor) {
@@ -387,6 +393,25 @@
         throw new UnsupportedOperationException("This operation is not yet supported.");
     }
 
+    private class TranscodingIterator implements AsyncIterator<Map.Entry<String, Versioned<V1>>> {
+        private final AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator;
+
+        public TranscodingIterator(AsyncIterator<Map.Entry<String, Versioned<V2>>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public CompletableFuture<Map.Entry<String, Versioned<V1>>> next() {
+            return iterator.next().thenApply(entry ->
+                Maps.immutableEntry(entry.getKey(), entry.getValue().map(valueDecoder)));
+        }
+    }
+
     private class InternalBackingMapEventListener
             implements MapEventListener<String, V2> {
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 588159c..0514333 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -16,15 +16,19 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.ConcurrentModificationException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Predicate;
 
 import io.atomix.protocols.raft.proxy.RaftProxy;
@@ -50,6 +54,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
@@ -62,6 +67,7 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
@@ -69,7 +75,11 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
@@ -91,11 +101,11 @@
  */
 public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncConsistentMap<String, byte[]> {
     private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
-            .register(KryoNamespaces.BASIC)
-            .register(AtomixConsistentMapOperations.NAMESPACE)
-            .register(AtomixConsistentMapEvents.NAMESPACE)
-            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
-            .build());
+        .register(KryoNamespaces.BASIC)
+        .register(AtomixConsistentMapOperations.NAMESPACE)
+        .register(AtomixConsistentMapEvents.NAMESPACE)
+        .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+        .build());
 
     private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
 
@@ -115,7 +125,7 @@
 
     private void handleEvent(List<MapEvent<String, byte[]>> events) {
         events.forEach(event ->
-                mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+            mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
     }
 
     @Override
@@ -146,10 +156,10 @@
     @Override
     public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
         return proxy.invoke(
-                GET_OR_DEFAULT,
-                serializer()::encode,
-                new GetOrDefault(key, defaultValue),
-                serializer()::decode);
+            GET_OR_DEFAULT,
+            serializer()::encode,
+            new GetOrDefault(key, defaultValue),
+            serializer()::decode);
     }
 
     @Override
@@ -171,122 +181,122 @@
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
         return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
-                PUT,
-                serializer()::encode,
-                new Put(key, value),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.result());
+            PUT,
+            serializer()::encode,
+            new Put(key, value),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
         return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
-                PUT_AND_GET,
-                serializer()::encode,
-                new Put(key, value),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.result());
+            PUT_AND_GET,
+            serializer()::encode,
+            new Put(key, value),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
         return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
-                PUT_IF_ABSENT,
-                serializer()::encode,
-                new Put(key, value),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.result());
+            PUT_IF_ABSENT,
+            serializer()::encode,
+            new Put(key, value),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> remove(String key) {
         return proxy.<Remove, MapEntryUpdateResult<String, byte[]>>invoke(
-                REMOVE,
-                serializer()::encode,
-                new Remove(key),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.result());
+            REMOVE,
+            serializer()::encode,
+            new Remove(key),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
         return proxy.<RemoveValue, MapEntryUpdateResult<String, byte[]>>invoke(
-                REMOVE_VALUE,
-                serializer()::encode,
-                new RemoveValue(key, value),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.updated());
+            REMOVE_VALUE,
+            serializer()::encode,
+            new RemoveValue(key, value),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.updated());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, long version) {
         return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
-                REMOVE_VERSION,
-                serializer()::encode,
-                new RemoveVersion(key, version),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.updated());
+            REMOVE_VERSION,
+            serializer()::encode,
+            new RemoveVersion(key, version),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.updated());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
         return proxy.<Replace, MapEntryUpdateResult<String, byte[]>>invoke(
-                REPLACE,
-                serializer()::encode,
-                new Replace(key, value),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.result());
+            REPLACE,
+            serializer()::encode,
+            new Replace(key, value),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.result());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
         return proxy.<ReplaceValue, MapEntryUpdateResult<String, byte[]>>invoke(
-                REPLACE_VALUE,
-                serializer()::encode,
-                new ReplaceValue(key, oldValue, newValue),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.updated());
+            REPLACE_VALUE,
+            serializer()::encode,
+            new ReplaceValue(key, oldValue, newValue),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.updated());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
         return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
-                REPLACE_VERSION,
-                serializer()::encode,
-                new ReplaceVersion(key, oldVersion, newValue),
-                serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> v.updated());
+            REPLACE_VERSION,
+            serializer()::encode,
+            new ReplaceVersion(key, oldVersion, newValue),
+            serializer()::decode)
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> v.updated());
     }
 
     @Override
     public CompletableFuture<Void> clear() {
         return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, serializer()::decode)
-                .whenComplete((r, e) -> throwIfLocked(r))
-                .thenApply(v -> null);
+            .whenComplete((r, e) -> throwIfLocked(r))
+            .thenApply(v -> null);
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> computeIf(String key,
-            Predicate<? super byte[]> condition,
-            BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
+        Predicate<? super byte[]> condition,
+        BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
         return get(key).thenCompose(r1 -> {
             byte[] existingValue = r1 == null ? null : r1.value();
             // if the condition evaluates to false, return existing value.
@@ -307,40 +317,40 @@
 
             if (r1 == null) {
                 return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
-                        PUT_IF_ABSENT,
-                        serializer()::encode,
-                        new Put(key, computedValue),
-                        serializer()::decode)
-                        .whenComplete((r, e) -> throwIfLocked(r))
-                        .thenCompose(r -> checkLocked(r))
-                        .thenApply(result -> new Versioned<>(computedValue, result.version()));
+                    PUT_IF_ABSENT,
+                    serializer()::encode,
+                    new Put(key, computedValue),
+                    serializer()::decode)
+                    .whenComplete((r, e) -> throwIfLocked(r))
+                    .thenCompose(r -> checkLocked(r))
+                    .thenApply(result -> new Versioned<>(computedValue, result.version()));
             } else if (computedValue == null) {
                 return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
-                        REMOVE_VERSION,
-                        serializer()::encode,
-                        new RemoveVersion(key, r1.version()),
-                        serializer()::decode)
-                        .whenComplete((r, e) -> throwIfLocked(r))
-                        .thenCompose(r -> checkLocked(r))
-                        .thenApply(v -> null);
+                    REMOVE_VERSION,
+                    serializer()::encode,
+                    new RemoveVersion(key, r1.version()),
+                    serializer()::decode)
+                    .whenComplete((r, e) -> throwIfLocked(r))
+                    .thenCompose(r -> checkLocked(r))
+                    .thenApply(v -> null);
             } else {
                 return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
-                        REPLACE_VERSION,
-                        serializer()::encode,
-                        new ReplaceVersion(key, r1.version(), computedValue),
-                        serializer()::decode)
-                        .whenComplete((r, e) -> throwIfLocked(r))
-                        .thenCompose(r -> checkLocked(r))
-                        .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
-                                ? new Versioned(computedValue, result.version()) : result.result());
+                    REPLACE_VERSION,
+                    serializer()::encode,
+                    new ReplaceVersion(key, r1.version(), computedValue),
+                    serializer()::decode)
+                    .whenComplete((r, e) -> throwIfLocked(r))
+                    .thenCompose(r -> checkLocked(r))
+                    .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
+                        ? new Versioned(computedValue, result.version()) : result.result());
             }
         });
     }
 
     private CompletableFuture<MapEntryUpdateResult<String, byte[]>> checkLocked(
-            MapEntryUpdateResult<String, byte[]> result) {
+        MapEntryUpdateResult<String, byte[]> result) {
         if (result.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
-                result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
+            result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
             return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
         }
         return CompletableFuture.completedFuture(result);
@@ -348,7 +358,7 @@
 
     @Override
     public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
-            Executor executor) {
+        Executor executor) {
         if (mapEventListeners.isEmpty()) {
             return proxy.invoke(ADD_LISTENER).thenRun(() -> mapEventListeners.put(listener, executor));
         } else {
@@ -380,54 +390,141 @@
     @Override
     public CompletableFuture<Version> begin(TransactionId transactionId) {
         return proxy.<TransactionBegin, Long>invoke(
-                BEGIN,
-                serializer()::encode,
-                new TransactionBegin(transactionId),
-                serializer()::decode)
-                .thenApply(Version::new);
+            BEGIN,
+            serializer()::encode,
+            new TransactionBegin(transactionId),
+            serializer()::decode)
+            .thenApply(Version::new);
     }
 
     @Override
     public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
         return proxy.<TransactionPrepare, PrepareResult>invoke(
-                PREPARE,
-                serializer()::encode,
-                new TransactionPrepare(transactionLog),
-                serializer()::decode)
-                .thenApply(v -> v == PrepareResult.OK);
+            PREPARE,
+            serializer()::encode,
+            new TransactionPrepare(transactionLog),
+            serializer()::decode)
+            .thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
     public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
         return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
-                PREPARE_AND_COMMIT,
-                serializer()::encode,
-                new TransactionPrepareAndCommit(transactionLog),
-                serializer()::decode)
-                .thenApply(v -> v == PrepareResult.OK);
+            PREPARE_AND_COMMIT,
+            serializer()::encode,
+            new TransactionPrepareAndCommit(transactionLog),
+            serializer()::decode)
+            .thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
         return proxy.<TransactionCommit, CommitResult>invoke(
-                COMMIT,
-                serializer()::encode,
-                new TransactionCommit(transactionId),
-                serializer()::decode)
-                .thenApply(v -> null);
+            COMMIT,
+            serializer()::encode,
+            new TransactionCommit(transactionId),
+            serializer()::decode)
+            .thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
         return proxy.invoke(
-                ROLLBACK,
-                serializer()::encode,
-                new TransactionRollback(transactionId),
-                serializer()::decode)
-                .thenApply(v -> null);
+            ROLLBACK,
+            serializer()::encode,
+            new TransactionRollback(transactionId),
+            serializer()::decode)
+            .thenApply(v -> null);
     }
 
     private boolean isListening() {
         return !mapEventListeners.isEmpty();
     }
+
+    @Override
+    public CompletableFuture<AsyncIterator<Entry<String, Versioned<byte[]>>>> iterator() {
+        return proxy.<Long>invoke(OPEN_ITERATOR, SERIALIZER::decode)
+            .thenApply(ConsistentMultimapIterator::new);
+    }
+
+    /**
+     * Consistent multimap iterator.
+     */
+    private class ConsistentMultimapIterator implements AsyncIterator<Map.Entry<String, Versioned<byte[]>>> {
+        private final long id;
+        private volatile CompletableFuture<IteratorBatch> batch;
+        private volatile CompletableFuture<Void> closeFuture;
+
+        ConsistentMultimapIterator(long id) {
+            this.id = id;
+            this.batch = CompletableFuture.completedFuture(
+                new IteratorBatch(0, Collections.emptyList()));
+        }
+
+        /**
+         * Returns the current batch iterator or lazily fetches the next batch from the cluster.
+         *
+         * @return the next batch iterator
+         */
+        private CompletableFuture<Iterator<Entry<String, Versioned<byte[]>>>> batch() {
+            return batch.thenCompose(iterator -> {
+                if (iterator != null && !iterator.hasNext()) {
+                    batch = fetch(iterator.position());
+                    return batch.thenApply(Function.identity());
+                }
+                return CompletableFuture.completedFuture(iterator);
+            });
+        }
+
+        /**
+         * Fetches the next batch of entries from the cluster.
+         *
+         * @param position the position from which to fetch the next batch
+         * @return the next batch of entries from the cluster
+         */
+        private CompletableFuture<IteratorBatch> fetch(int position) {
+            return proxy.<IteratorPosition, IteratorBatch>invoke(
+                NEXT,
+                SERIALIZER::encode,
+                new IteratorPosition(id, position),
+                SERIALIZER::decode)
+                .thenCompose(batch -> {
+                    if (batch == null) {
+                        return close().thenApply(v -> null);
+                    }
+                    return CompletableFuture.completedFuture(batch);
+                });
+        }
+
+        /**
+         * Closes the iterator.
+         *
+         * @return future to be completed once the iterator has been closed
+         */
+        private CompletableFuture<Void> close() {
+            if (closeFuture == null) {
+                synchronized (this) {
+                    if (closeFuture == null) {
+                        closeFuture = proxy.invoke(CLOSE_ITERATOR, SERIALIZER::encode, id);
+                    }
+                }
+            }
+            return closeFuture;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasNext() {
+            return batch().thenApply(iterator -> iterator != null && iterator.hasNext());
+        }
+
+        @Override
+        public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> next() {
+            return batch().thenCompose(iterator -> {
+                if (iterator == null) {
+                    return Tools.exceptionalFuture(new NoSuchElementException());
+                }
+                return CompletableFuture.completedFuture(iterator.next());
+            });
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
index 248c4eb..4f131ef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
@@ -15,6 +15,11 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
 import io.atomix.protocols.raft.operation.OperationId;
 import io.atomix.protocols.raft.operation.OperationType;
 import io.atomix.utils.ArraySizeHashPrinter;
@@ -58,7 +63,10 @@
     PREPARE(OperationType.COMMAND),
     PREPARE_AND_COMMIT(OperationType.COMMAND),
     COMMIT(OperationType.COMMAND),
-    ROLLBACK(OperationType.COMMAND);
+    ROLLBACK(OperationType.COMMAND),
+    OPEN_ITERATOR(OperationType.COMMAND),
+    NEXT(OperationType.QUERY),
+    CLOSE_ITERATOR(OperationType.COMMAND);
 
     private final OperationType type;
 
@@ -107,6 +115,9 @@
             .register(MapEntryUpdateResult.Status.class)
             .register(Versioned.class)
             .register(byte[].class)
+            .register(Maps.immutableEntry("", "").getClass())
+            .register(IteratorBatch.class)
+            .register(IteratorPosition.class)
             .build("AtomixConsistentMapOperations");
 
     /**
@@ -561,4 +572,77 @@
                     .toString();
         }
     }
+
+    /**
+     * Iterator position.
+     */
+    public static class IteratorPosition {
+        private long iteratorId;
+        private int position;
+
+        private IteratorPosition() {
+        }
+
+        public IteratorPosition(long iteratorId, int position) {
+            this.iteratorId = iteratorId;
+            this.position = position;
+        }
+
+        public long iteratorId() {
+            return iteratorId;
+        }
+
+        public int position() {
+            return position;
+        }
+    }
+
+    /**
+     * Iterator batch.
+     */
+    public static class IteratorBatch implements Iterator<Map.Entry<String, Versioned<byte[]>>> {
+        private int position;
+        private Collection<Map.Entry<String, Versioned<byte[]>>> entries;
+        private transient volatile Iterator<Map.Entry<String, Versioned<byte[]>>> iterator;
+
+        private IteratorBatch() {
+        }
+
+        public IteratorBatch(int position, Collection<Map.Entry<String, Versioned<byte[]>>> entries) {
+            this.position = position;
+            this.entries = entries;
+        }
+
+        public int position() {
+            return position;
+        }
+
+        public Collection<Map.Entry<String, Versioned<byte[]>>> entries() {
+            return entries;
+        }
+
+        private Iterator<Map.Entry<String, Versioned<byte[]>>> iterator() {
+            Iterator<Map.Entry<String, Versioned<byte[]>>> iterator = this.iterator;
+            if (iterator == null) {
+                synchronized (entries) {
+                    iterator = this.iterator;
+                    if (iterator == null) {
+                        iterator = entries.iterator();
+                        this.iterator = iterator;
+                    }
+                }
+            }
+            return iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator().hasNext();
+        }
+
+        @Override
+        public Map.Entry<String, Versioned<byte[]>> next() {
+            return iterator().next();
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index d25d475..0fcf3b5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -66,6 +67,7 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
@@ -73,7 +75,11 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
@@ -95,22 +101,25 @@
  */
 public class AtomixConsistentMapService extends AbstractRaftService {
 
+    private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
+
     private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
-            .register(KryoNamespaces.BASIC)
-            .register(AtomixConsistentMapOperations.NAMESPACE)
-            .register(AtomixConsistentMapEvents.NAMESPACE)
-            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
-            .register(TransactionScope.class)
-            .register(TransactionLog.class)
-            .register(TransactionId.class)
-            .register(MapEntryValue.class)
-            .register(MapEntryValue.Type.class)
-            .register(new HashMap().keySet().getClass())
-            .build());
+        .register(KryoNamespaces.BASIC)
+        .register(AtomixConsistentMapOperations.NAMESPACE)
+        .register(AtomixConsistentMapEvents.NAMESPACE)
+        .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+        .register(TransactionScope.class)
+        .register(TransactionLog.class)
+        .register(TransactionId.class)
+        .register(MapEntryValue.class)
+        .register(MapEntryValue.Type.class)
+        .register(new HashMap().keySet().getClass())
+        .build());
 
     protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
     private Map<String, MapEntryValue> map;
     protected Set<String> preparedKeys = Sets.newHashSet();
+    private Map<Long, IteratorContext> iterators = Maps.newHashMap();
     protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
     protected long currentVersion;
 
@@ -119,7 +128,7 @@
     }
 
     protected Map<String, MapEntryValue> createMap() {
-        return Maps.newHashMap();
+        return Maps.newConcurrentMap();
     }
 
     protected Map<String, MapEntryValue> entries() {
@@ -137,6 +146,10 @@
         writer.writeObject(entries(), serializer()::encode);
         writer.writeObject(activeTransactions, serializer()::encode);
         writer.writeLong(currentVersion);
+
+        Map<Long, Long> iterators = Maps.newHashMap();
+        this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
+        writer.writeObject(iterators, serializer()::encode);
     }
 
     @Override
@@ -149,6 +162,11 @@
         map = reader.readObject(serializer()::decode);
         activeTransactions = reader.readObject(serializer()::decode);
         currentVersion = reader.readLong();
+
+        Map<Long, Long> iterators = reader.readObject(serializer()::decode);
+        this.iterators = Maps.newHashMap();
+        iterators.forEach((id, session) ->
+            this.iterators.put(id, new IteratorContext(session, entries().entrySet().iterator())));
     }
 
     @Override
@@ -182,6 +200,9 @@
         executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
         executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
         executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
+        executor.register(OPEN_ITERATOR, this::openIterator, serializer()::encode);
+        executor.register(NEXT, serializer()::decode, this::next, serializer()::encode);
+        executor.register(CLOSE_ITERATOR, serializer()::decode, this::closeIterator);
     }
 
     /**
@@ -204,8 +225,8 @@
     protected boolean containsValue(Commit<? extends ContainsValue> commit) {
         Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
         return entries().values().stream()
-                .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
-                .anyMatch(value -> valueMatch.matches(value.value()));
+            .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+            .anyMatch(value -> valueMatch.matches(value.value()));
     }
 
     /**
@@ -242,8 +263,8 @@
      */
     protected int size() {
         return (int) entries().values().stream()
-                .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
-                .count();
+            .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+            .count();
     }
 
     /**
@@ -253,7 +274,7 @@
      */
     protected boolean isEmpty() {
         return entries().values().stream()
-                .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
+            .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
     }
 
     /**
@@ -263,9 +284,9 @@
      */
     protected Set<String> keySet() {
         return entries().entrySet().stream()
-                .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
-                .map(Map.Entry::getKey)
-                .collect(Collectors.toSet());
+            .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toSet());
     }
 
     /**
@@ -275,9 +296,9 @@
      */
     protected Collection<Versioned<byte[]>> values() {
         return entries().entrySet().stream()
-                .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
-                .map(entry -> toVersioned(entry.getValue()))
-                .collect(Collectors.toList());
+            .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+            .map(entry -> toVersioned(entry.getValue()))
+            .collect(Collectors.toList());
     }
 
     /**
@@ -287,9 +308,9 @@
      */
     protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
         return entries().entrySet().stream()
-                .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
-                .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
-                .collect(Collectors.toSet());
+            .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+            .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
+            .collect(Collectors.toSet());
     }
 
     /**
@@ -301,7 +322,7 @@
      */
     protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
         return (oldValue == null && newValue == null)
-                || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
+            || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
     }
 
     /**
@@ -313,7 +334,7 @@
      */
     protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
         return (oldValue == null && newValue == null)
-                || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
+            || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
     }
 
     /**
@@ -343,13 +364,13 @@
             // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
-                        MapEntryUpdateResult.Status.WRITE_LOCK,
-                        commit.index(),
-                        key,
-                        toVersioned(oldValue));
+                    MapEntryUpdateResult.Status.WRITE_LOCK,
+                    commit.index(),
+                    key,
+                    toVersioned(oldValue));
             }
             entries().put(commit.value().key(),
-                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+                new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
             Versioned<byte[]> result = toVersioned(oldValue);
             publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
             return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
@@ -357,13 +378,13 @@
             // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
-                        MapEntryUpdateResult.Status.WRITE_LOCK,
-                        commit.index(),
-                        key,
-                        toVersioned(oldValue));
+                    MapEntryUpdateResult.Status.WRITE_LOCK,
+                    commit.index(),
+                    key,
+                    toVersioned(oldValue));
             }
             entries().put(commit.value().key(),
-                    new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+                new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
             Versioned<byte[]> result = toVersioned(oldValue);
             publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
             return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
@@ -387,25 +408,25 @@
             // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
-                        MapEntryUpdateResult.Status.WRITE_LOCK,
-                        commit.index(),
-                        key,
-                        toVersioned(oldValue));
+                    MapEntryUpdateResult.Status.WRITE_LOCK,
+                    commit.index(),
+                    key,
+                    toVersioned(oldValue));
             }
             MapEntryValue newValue = new MapEntryValue(
-                    MapEntryValue.Type.VALUE,
-                    commit.index(),
-                    commit.value().value());
+                MapEntryValue.Type.VALUE,
+                commit.index(),
+                commit.value().value());
             entries().put(commit.value().key(), newValue);
             Versioned<byte[]> result = toVersioned(newValue);
             publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
             return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
         }
         return new MapEntryUpdateResult<>(
-                MapEntryUpdateResult.Status.PRECONDITION_FAILED,
-                commit.index(),
-                key,
-                toVersioned(oldValue));
+            MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+            commit.index(),
+            key,
+            toVersioned(oldValue));
     }
 
     /**
@@ -425,10 +446,10 @@
             // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
-                        MapEntryUpdateResult.Status.WRITE_LOCK,
-                        commit.index(),
-                        key,
-                        toVersioned(oldValue));
+                    MapEntryUpdateResult.Status.WRITE_LOCK,
+                    commit.index(),
+                    key,
+                    toVersioned(oldValue));
             }
             entries().put(commit.value().key(), newValue);
             Versioned<byte[]> result = toVersioned(newValue);
@@ -438,10 +459,10 @@
             // If the key has been locked by a transaction, return a WRITE_LOCK error.
             if (preparedKeys.contains(key)) {
                 return new MapEntryUpdateResult<>(
-                        MapEntryUpdateResult.Status.WRITE_LOCK,
-                        commit.index(),
-                        key,
-                        toVersioned(oldValue));
+                    MapEntryUpdateResult.Status.WRITE_LOCK,
+                    commit.index(),
+                    key,
+                    toVersioned(oldValue));
             }
             entries().put(commit.value().key(), newValue);
             Versioned<byte[]> result = toVersioned(newValue);
@@ -454,8 +475,8 @@
     /**
      * Handles a remove commit.
      *
-     * @param index the commit index
-     * @param key the key to remove
+     * @param index     the commit index
+     * @param key       the key to remove
      * @param predicate predicate to determine whether to remove the entry
      * @return map entry update result
      */
@@ -502,7 +523,7 @@
      */
     protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
         return removeIf(commit.index(), commit.value().key(), v ->
-                valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
+            valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
     }
 
     /**
@@ -518,23 +539,23 @@
     /**
      * Handles a replace commit.
      *
-     * @param index the commit index
-     * @param key the key to replace
-     * @param newValue the value with which to replace the key
+     * @param index     the commit index
+     * @param key       the key to replace
+     * @param newValue  the value with which to replace the key
      * @param predicate a predicate to determine whether to replace the key
      * @return map entry update result
      */
     private MapEntryUpdateResult<String, byte[]> replaceIf(
-            long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
+        long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
         MapEntryValue oldValue = entries().get(key);
 
         // If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
         if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
             return new MapEntryUpdateResult<>(
-                    MapEntryUpdateResult.Status.PRECONDITION_FAILED,
-                    index,
-                    key,
-                    toVersioned(oldValue));
+                MapEntryUpdateResult.Status.PRECONDITION_FAILED,
+                index,
+                key,
+                toVersioned(oldValue));
         }
 
         // If the key has been locked by a transaction, return a WRITE_LOCK error.
@@ -568,7 +589,7 @@
     protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
         MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
         return replaceIf(commit.index(), commit.value().key(), value,
-                v -> valuesEqual(v.value(), commit.value().oldValue()));
+            v -> valuesEqual(v.value(), commit.value().oldValue()));
     }
 
     /**
@@ -580,7 +601,7 @@
     protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
         MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
         return replaceIf(commit.index(), commit.value().key(), value,
-                v -> v.version() == commit.value().oldVersion());
+            v -> v.version() == commit.value().oldVersion());
     }
 
     /**
@@ -610,6 +631,67 @@
     }
 
     /**
+     * Handles an open iterator commit.
+     *
+     * @param commit the open iterator commit
+     * @return iterator identifier
+     */
+    protected long openIterator(Commit<Void> commit) {
+        iterators.put(commit.index(), new IteratorContext(
+            commit.session().sessionId().id(),
+            entries().entrySet().iterator()));
+        return commit.index();
+    }
+
+    /**
+     * Handles an iterator next commit.
+     *
+     * @param commit the next commit
+     * @return a list of entries to iterate
+     */
+    protected IteratorBatch next(Commit<IteratorPosition> commit) {
+        final long iteratorId = commit.value().iteratorId();
+        final int position = commit.value().position();
+
+        IteratorContext context = iterators.get(iteratorId);
+        if (context == null) {
+            return null;
+        }
+
+        List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+        int size = 0;
+        while (context.iterator.hasNext()) {
+            context.position++;
+            if (context.position > position) {
+                Map.Entry<String, MapEntryValue> entry = context.iterator.next();
+                String key = entry.getKey();
+                Versioned<byte[]> value = toVersioned(entry.getValue());
+                size += key.length();
+                size += value.value() != null ? value.value().length : 0;
+                entries.add(Maps.immutableEntry(key, value));
+
+                if (size >= MAX_ITERATOR_BATCH_SIZE) {
+                    break;
+                }
+            }
+        }
+
+        if (entries.isEmpty()) {
+            return null;
+        }
+        return new IteratorBatch(context.position, entries);
+    }
+
+    /**
+     * Handles a close iterator commit.
+     *
+     * @param commit the close iterator commit
+     */
+    protected void closeIterator(Commit<Long> commit) {
+        iterators.remove(commit.value());
+    }
+
+    /**
      * Handles a listen commit.
      *
      * @param session listen session
@@ -719,13 +801,13 @@
             TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
             if (transactionScope == null) {
                 activeTransactions.put(
-                        transactionLog.transactionId(),
-                        new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
+                    transactionLog.transactionId(),
+                    new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
                 return PrepareResult.PARTIAL_FAILURE;
             } else {
                 activeTransactions.put(
-                        transactionLog.transactionId(),
-                        transactionScope.prepared(commit));
+                    transactionLog.transactionId(),
+                    transactionScope.prepared(commit));
                 return PrepareResult.OK;
             }
         } catch (Exception e) {
@@ -795,34 +877,34 @@
                 if (!valueIsNull(newValue)) {
                     if (!valueIsNull(previousValue)) {
                         event = new MapEvent<>(
-                                MapEvent.Type.UPDATE,
-                                "",
-                                key,
-                                toVersioned(newValue),
-                                toVersioned(previousValue));
+                            MapEvent.Type.UPDATE,
+                            "",
+                            key,
+                            toVersioned(newValue),
+                            toVersioned(previousValue));
                     } else {
                         event = new MapEvent<>(
-                                MapEvent.Type.INSERT,
-                                "",
-                                key,
-                                toVersioned(newValue),
-                                null);
+                            MapEvent.Type.INSERT,
+                            "",
+                            key,
+                            toVersioned(newValue),
+                            null);
                     }
                 } else {
                     event = new MapEvent<>(
-                            MapEvent.Type.REMOVE,
-                            "",
-                            key,
-                            null,
-                            toVersioned(previousValue));
-                }
-            } else {
-                event = new MapEvent<>(
                         MapEvent.Type.REMOVE,
                         "",
                         key,
                         null,
                         toVersioned(previousValue));
+                }
+            } else {
+                event = new MapEvent<>(
+                    MapEvent.Type.REMOVE,
+                    "",
+                    key,
+                    null,
+                    toVersioned(previousValue));
             }
             eventsToPublish.add(event);
         }
@@ -847,11 +929,11 @@
         } else {
             try {
                 transactionScope.transactionLog().records()
-                        .forEach(record -> {
-                            if (record.type() != MapUpdate.Type.VERSION_MATCH) {
-                                preparedKeys.remove(record.key());
-                            }
-                        });
+                    .forEach(record -> {
+                        if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+                            preparedKeys.remove(record.key());
+                        }
+                    });
                 return RollbackResult.OK;
             } finally {
                 discardTombstones();
@@ -874,8 +956,8 @@
             }
         } else {
             long lowWaterMark = activeTransactions.values().stream()
-                    .mapToLong(TransactionScope::version)
-                    .min().getAsLong();
+                .mapToLong(TransactionScope::version)
+                .min().getAsLong();
             Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
             while (iterator.hasNext()) {
                 MapEntryValue value = iterator.next().getValue();
@@ -888,12 +970,13 @@
 
     /**
      * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
+     *
      * @param value map entry value
      * @return versioned instance
      */
     protected Versioned<byte[]> toVersioned(MapEntryValue value) {
         return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
-                ? new Versioned<>(value.value(), value.version()) : null;
+            ? new Versioned<>(value.value(), value.version()) : null;
     }
 
     /**
@@ -1034,4 +1117,15 @@
             return new TransactionScope(version, commit.value().transactionLog());
         }
     }
+
+    private static class IteratorContext {
+        private final long sessionId;
+        private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
+        private int position = 0;
+
+        IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
+            this.sessionId = sessionId;
+            this.iterator = iterator;
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
index 1b5cb12..89c8da6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
@@ -20,6 +20,7 @@
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import com.google.common.collect.Maps;
 import io.atomix.protocols.raft.service.Commit;
@@ -75,17 +76,17 @@
             .register(MapEntryValue.class)
             .register(MapEntryValue.Type.class)
             .register(new HashMap().keySet().getClass())
-            .register(TreeMap.class)
+            .register(ConcurrentSkipListMap.class)
             .build());
 
     @Override
-    protected TreeMap<String, MapEntryValue> createMap() {
-        return Maps.newTreeMap();
+    protected NavigableMap<String, MapEntryValue> createMap() {
+        return new ConcurrentSkipListMap<>();
     }
 
     @Override
-    protected TreeMap<String, MapEntryValue> entries() {
-        return (TreeMap<String, MapEntryValue>) super.entries();
+    protected NavigableMap<String, MapEntryValue> entries() {
+        return (NavigableMap<String, MapEntryValue>) super.entries();
     }
 
     @Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
index b686687..5d9e5d8 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
@@ -20,6 +20,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -380,6 +381,11 @@
         }
 
         @Override
+        public Iterator<Map.Entry<K, Versioned<V>>> iterator() {
+            return map.entrySet().iterator();
+        }
+
+        @Override
         public void addListener(MapEventListener<K, V> listener, Executor executor) {
             throw new UnsupportedOperationException();
         }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
index c292959..a7cd94a 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -15,7 +15,16 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.impl.RaftContext;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.service.ServiceId;
+import io.atomix.protocols.raft.service.ServiceType;
 import io.atomix.protocols.raft.service.impl.DefaultCommit;
+import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.session.SessionId;
 import io.atomix.protocols.raft.session.impl.RaftSessionContext;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.protocols.raft.storage.snapshot.Snapshot;
@@ -24,15 +33,23 @@
 import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
 import io.atomix.storage.StorageLevel;
 import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
 import org.junit.Test;
 import org.onosproject.store.service.Versioned;
 
+import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import static org.onosproject.store.service.DistributedPrimitive.Type.LEADER_ELECTOR;
 
 /**
  * Consistent map service test.
@@ -47,13 +64,42 @@
                 .build());
         Snapshot snapshot = store.newSnapshot(2, new WallClockTimestamp());
 
+        DefaultServiceContext context = mock(DefaultServiceContext.class);
+        expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
+        expect(context.serviceName()).andReturn("test").anyTimes();
+        expect(context.serviceId()).andReturn(ServiceId.from(1)).anyTimes();
+
+        RaftContext server = mock(RaftContext.class);
+        expect(server.getProtocol()).andReturn(mock(RaftServerProtocol.class));
+
+        replay(context, server);
+
+        RaftSession session = new RaftSessionContext(
+            SessionId.from(1),
+            MemberId.from("1"),
+            "test",
+            ServiceType.from(LEADER_ELECTOR.name()),
+            ReadConsistency.LINEARIZABLE,
+            100,
+            5000,
+            System.currentTimeMillis(),
+            context,
+            server,
+            new SingleThreadContextFactory(new AtomixThreadFactory()));
+
         AtomixConsistentMapService service = new AtomixConsistentMapService();
         service.put(new DefaultCommit<>(
                 2,
                 PUT,
                 new Put("foo", "Hello world!".getBytes()),
-                mock(RaftSessionContext.class),
+                session,
                 System.currentTimeMillis()));
+        service.openIterator(new DefaultCommit<>(
+            3,
+            OPEN_ITERATOR,
+            null,
+            session,
+            System.currentTimeMillis()));
 
         try (SnapshotWriter writer = snapshot.openWriter()) {
             service.snapshot(writer);
@@ -74,5 +120,12 @@
                 System.currentTimeMillis()));
         assertNotNull(value);
         assertArrayEquals("Hello world!".getBytes(), value.value());
+
+        assertEquals(1, service.next(new DefaultCommit<>(
+            4,
+            NEXT,
+            new AtomixConsistentMapOperations.IteratorPosition(3L, 0),
+            session,
+            System.currentTimeMillis())).entries().size());
     }
 }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 3e7a872..8753783 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -27,6 +27,7 @@
 import org.onosproject.store.primitives.impl.DistributedPrimitives;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
@@ -34,12 +35,16 @@
 import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.is;
@@ -634,6 +639,25 @@
         assertEquals("1.0.1:Hello world again!", map1.get("bar").join().value());
     }
 
+    @Test
+    public void testIterator() throws Exception {
+        AtomixConsistentMap map = newPrimitive("testIterator");
+        for (int i = 0; i < 100; i++) {
+            for (int j = 0; j < 100; j++) {
+                map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+            }
+        }
+
+        List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+        AsyncIterator<Map.Entry<String, Versioned<byte[]>>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
+        while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+            map.put("foo", UUID.randomUUID().toString().getBytes()).join();
+            entries.add(iterator.next().get(5, TimeUnit.SECONDS));
+        }
+        assertEquals(100, entries.size());
+        assertEquals(101, map.asConsistentMap().stream().count());
+    }
+
     private static class TestMapEventListener implements MapEventListener<String, byte[]> {
 
         private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
index 57bf60c..bf028c4 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
@@ -20,14 +20,20 @@
 import io.atomix.protocols.raft.service.RaftService;
 import org.junit.Test;
 import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -480,6 +486,25 @@
         //map.delete().join();
     }
 
+    @Test
+    public void testIterator() throws Exception {
+        AtomixConsistentTreeMap map = newPrimitive("testIterator");
+        for (int i = 0; i < 100; i++) {
+            for (int j = 0; j < 100; j++) {
+                map.put(String.valueOf(i), String.valueOf(j).getBytes()).join();
+            }
+        }
+
+        List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
+        AsyncIterator<Map.Entry<String, Versioned<byte[]>>> iterator = map.iterator().get(5, TimeUnit.SECONDS);
+        while (iterator.hasNext().get(5, TimeUnit.SECONDS)) {
+            map.put("foo", UUID.randomUUID().toString().getBytes()).join();
+            entries.add(iterator.next().get(5, TimeUnit.SECONDS));
+        }
+        assertEquals(100, entries.size());
+        assertEquals(101, map.asConsistentMap().stream().count());
+    }
+
     private AtomixConsistentTreeMap createResource(String mapName) {
         try {
             AtomixConsistentTreeMap map = newPrimitive(mapName);
diff --git a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/ConsistentMapAdapter.java b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/ConsistentMapAdapter.java
index 8111693..9136826 100644
--- a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/ConsistentMapAdapter.java
+++ b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/ConsistentMapAdapter.java
@@ -16,6 +16,7 @@
 package org.onosproject.pcelabelstore.util;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -160,6 +161,11 @@
     }
 
     @Override
+    public Iterator<Map.Entry<K, Versioned<V>>> iterator() {
+        return null;
+    }
+
+    @Override
     public void addListener(MapEventListener<K, V> listener, Executor executor) {
 
     }