Implement event-based streaming iterator for ConsistentMultimap primitive
Change-Id: I4f41876f91ec752cb3d6ac0fd352ff6e8798dfd6
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 6478eef..2312d8f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -58,6 +58,7 @@
import org.onosproject.store.service.Versioned;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents.ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.CONTAINS_ENTRY;
@@ -70,6 +71,7 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Get;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.ITERATE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEYS;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
@@ -155,6 +157,7 @@
executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
executor.register(ADD_LISTENER, this::listen);
executor.register(REMOVE_LISTENER, this::unlisten);
+ executor.register(ITERATE, this::iterate, serializer::encode);
}
@Override
@@ -425,6 +428,23 @@
}
/**
+ * Handles an iterate commit.
+ *
+ * @param commit the iterate commit
+ * @return count of commit entries
+ */
+ protected int iterate(Commit<Void> commit) {
+ int count = 0;
+ for (Map.Entry<String, MapEntryValue> entry : backingMap.entrySet()) {
+ for (byte[] value : entry.getValue().values()) {
+ commit.session().publish(ENTRY, serializer::encode, Maps.immutableEntry(entry.getKey(), value));
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
* Publishes events to listeners.
*
* @param events list of map event to publish