Topic: Support for passing a executor to subscribe method for invoking the callback
Change-Id: I9db485ee381c61fbfc38aba0c2bd90cb5af171e0
diff --git a/core/api/src/main/java/org/onosproject/store/service/Topic.java b/core/api/src/main/java/org/onosproject/store/service/Topic.java
index ac3742e..a50375c 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Topic.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Topic.java
@@ -16,8 +16,11 @@
package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.Consumer;
+import com.google.common.util.concurrent.MoreExecutors;
+
/**
* A distributed publish subscribe primitive.
* <p>
@@ -49,9 +52,19 @@
/**
* Subscribes to messages published to this topic.
* @param callback callback that will invoked when a message published to the topic is received.
+ * @param executor executor for running the callback
* @return a future that is completed when subscription request is completed.
*/
- CompletableFuture<Void> subscribe(Consumer<T> callback);
+ CompletableFuture<Void> subscribe(Consumer<T> callback, Executor executor);
+
+ /**
+ * Subscribes to messages published to this topic.
+ * @param callback callback that will invoked when a message published to the topic is received.
+ * @return a future that is completed when subscription request is completed.
+ */
+ default CompletableFuture<Void> subscribe(Consumer<T> callback) {
+ return subscribe(callback, MoreExecutors.directExecutor());
+ }
/**
* Unsubscribes from this topic.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopic.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopic.java
index df7bf9b..d539992 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopic.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopic.java
@@ -17,6 +17,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -61,8 +62,9 @@
}
@Override
- public CompletableFuture<Void> subscribe(Consumer<T> callback) {
- AtomicValueEventListener<T> valueListener = event -> callback.accept(event.newValue());
+ public CompletableFuture<Void> subscribe(Consumer<T> callback, Executor executor) {
+ AtomicValueEventListener<T> valueListener =
+ event -> executor.execute(() -> callback.accept(event.newValue()));
if (callbacks.putIfAbsent(callback, valueListener) == null) {
return atomicValue.addListener(valueListener);
}