Implement event-based streaming iterator for ConsistentMultimap primitive

Change-Id: I4f41876f91ec752cb3d6ac0fd352ff6e8798dfd6
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
index 8725c61..600af28 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
@@ -19,6 +19,7 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.Multiset;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncIterator;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.ConsistentMultimap;
 import org.onosproject.store.service.MultimapEventListener;
@@ -26,6 +27,7 @@
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -140,6 +142,11 @@
     }
 
     @Override
+    public Iterator<Map.Entry<K, V>> iterator() {
+        return new DefaultIterator<>(complete(asyncMultimap.iterator()));
+    }
+
+    @Override
     public Map<K, Collection<V>> asMap() {
         throw new UnsupportedOperationException("This operation is not yet " +
                                                         "supported.");
@@ -156,6 +163,24 @@
         complete(asyncMultimap.removeListener(listener));
     }
 
+    private class DefaultIterator<K, V> implements Iterator<Map.Entry<K, V>> {
+        private final AsyncIterator<Map.Entry<K, V>> iterator;
+
+        public DefaultIterator(AsyncIterator<Map.Entry<K, V>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return complete(iterator.hasNext());
+        }
+
+        @Override
+        public Map.Entry<K, V> next() {
+            return complete(iterator.next());
+        }
+    }
+
     private <T> T complete(CompletableFuture<T> future) {
         try {
             return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
index 180d7cc..12b4645 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
@@ -34,7 +34,7 @@
  * Certain operations may be too expensive when backed by a distributed data
  * structure and have been labeled as such.
  */
-public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
+public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive, AsyncIterable<Map.Entry<K, V>> {
 
     @Override
     default DistributedPrimitive.Type primitiveType() {
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncIterable.java b/core/api/src/main/java/org/onosproject/store/service/AsyncIterable.java
new file mode 100644
index 0000000..53eb384
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncIterable.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Asynchronously iterable object.
+ */
+public interface AsyncIterable<T> {
+
+    /**
+     * Returns an asynchronous iterator.
+     *
+     * @return an asynchronous iterator
+     */
+    CompletableFuture<AsyncIterator<T>> iterator();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncIterator.java b/core/api/src/main/java/org/onosproject/store/service/AsyncIterator.java
new file mode 100644
index 0000000..5159c6c
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Asynchronous iterator.
+ */
+public interface AsyncIterator<T> {
+
+    /**
+     * Returns whether the iterator has a next item.
+     *
+     * @return whether a next item exists in the iterator
+     */
+    CompletableFuture<Boolean> hasNext();
+
+    /**
+     * Returns the next item in the iterator.
+     *
+     * @return the next item in the iterator
+     */
+    CompletableFuture<T> next();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
index 2adcf8c..bdbbee0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
@@ -23,13 +23,15 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /**
  * This provides a synchronous version of the functionality provided by
  * {@link AsyncConsistentMultimap}.  Instead of returning futures this map
  * blocks until the future completes then returns the result.
  */
-public interface ConsistentMultimap<K, V> extends DistributedPrimitive {
+public interface ConsistentMultimap<K, V> extends DistributedPrimitive, Iterable<Map.Entry<K, V>> {
     /**
      * Returns the number of key-value pairs in this multimap.
      * @return the number of key-value pairs
@@ -191,12 +193,25 @@
 
     /**
      * Returns a collection of each key-value pair in this map.
+     * <p>
+     * Do not use this method to read large maps. Use an {@link #iterator()} or {@link #stream()} instead.
      *
      * @return a collection of all entries in the map, this may be empty
      */
     Collection<Map.Entry<K, V>> entries();
 
     /**
+     * Streams entries from the map.
+     * <p>
+     * This method is optimized for large maps.
+     *
+     * @return the map entry stream
+     */
+    default Stream<Map.Entry<K, V>> stream() {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    /**
      * Returns a map of keys to collections of values that reflect the set of
      * key-value pairs contained in the multimap, where the key value pairs
      * would be the key paired with each of the values in the collection.
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
index 099758b..a97337e 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.Multiset;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -130,6 +131,11 @@
     }
 
     @Override
+    public Iterator<Map.Entry<K, V>> iterator() {
+        return null;
+    }
+
+    @Override
     public Map<K, Collection<V>> asMap() {
         return null;
     }