Topic: Support for passing a executor to subscribe method for invoking the callback

Change-Id: I9db485ee381c61fbfc38aba0c2bd90cb5af171e0
diff --git a/core/api/src/main/java/org/onosproject/store/service/Topic.java b/core/api/src/main/java/org/onosproject/store/service/Topic.java
index ac3742e..a50375c 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Topic.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Topic.java
@@ -16,8 +16,11 @@
 package org.onosproject.store.service;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 /**
  * A distributed publish subscribe primitive.
  * <p>
@@ -49,9 +52,19 @@
     /**
      * Subscribes to messages published to this topic.
      * @param callback callback that will invoked when a message published to the topic is received.
+     * @param executor executor for running the callback
      * @return a future that is completed when subscription request is completed.
      */
-    CompletableFuture<Void> subscribe(Consumer<T> callback);
+    CompletableFuture<Void> subscribe(Consumer<T> callback, Executor executor);
+
+    /**
+     * Subscribes to messages published to this topic.
+     * @param callback callback that will invoked when a message published to the topic is received.
+     * @return a future that is completed when subscription request is completed.
+     */
+    default CompletableFuture<Void> subscribe(Consumer<T> callback) {
+        return subscribe(callback, MoreExecutors.directExecutor());
+    }
 
     /**
      * Unsubscribes from this topic.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopic.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopic.java
index df7bf9b..d539992 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopic.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopic.java
@@ -17,6 +17,7 @@
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
 import org.onosproject.store.service.AsyncAtomicValue;
@@ -61,8 +62,9 @@
     }
 
     @Override
-    public CompletableFuture<Void> subscribe(Consumer<T> callback) {
-        AtomicValueEventListener<T> valueListener = event -> callback.accept(event.newValue());
+    public CompletableFuture<Void> subscribe(Consumer<T> callback, Executor executor) {
+        AtomicValueEventListener<T> valueListener =
+                event -> executor.execute(() -> callback.accept(event.newValue()));
         if (callbacks.putIfAbsent(callback, valueListener) == null) {
             return atomicValue.addListener(valueListener);
         }