Use the Executor interface when specifying where to handle incoming messages
This is done so that one can simply specify a direct executor.
Change-Id: I1c3ea977dd7c2d604588d587fd67f7012355eedf
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index 59970f3..4f8fd8e 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -17,6 +17,7 @@
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -173,7 +174,7 @@
                               Function<byte[], M> decoder,
                               Function<M, R> handler,
                               Function<R, byte[]> encoder,
-                              ExecutorService executor);
+                              Executor executor);
 
     /**
      * Adds a new subscriber for the specified message subject.
@@ -187,7 +188,7 @@
     <M> void addSubscriber(MessageSubject subject,
                            Function<byte[], M> decoder,
                            Consumer<M> handler,
-                           ExecutorService executor);
+                           Executor executor);
 
     /**
      * Removes a subscriber for the specified message subject.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 204e0fd..e479d55 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -43,6 +43,7 @@
 import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -270,7 +271,7 @@
             Function<byte[], M> decoder,
             Function<M, R> handler,
             Function<R, byte[]> encoder,
-            ExecutorService executor) {
+            Executor executor) {
         messagingService.registerHandler(subject.value(),
                 new InternalMessageResponder<>(decoder, encoder, handler),
                 executor);
@@ -280,7 +281,7 @@
     public <M> void addSubscriber(MessageSubject subject,
             Function<byte[], M> decoder,
             Consumer<M> handler,
-            ExecutorService executor) {
+            Executor executor) {
         messagingService.registerHandler(subject.value(),
                 new InternalMessageConsumer<>(decoder, handler),
                 executor);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 8d495eb..a4db35f 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -54,6 +54,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -780,13 +781,13 @@
         @Override
         public <M, R> void addSubscriber(MessageSubject subject,
                 Function<byte[], M> decoder, Function<M, R> handler,
-                Function<R, byte[]> encoder, ExecutorService executor) {
+                Function<R, byte[]> encoder, Executor executor) {
         }
 
         @Override
         public <M> void addSubscriber(MessageSubject subject,
                 Function<byte[], M> decoder, Consumer<M> handler,
-                ExecutorService executor) {
+                Executor executor) {
         }
 
         @Override
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index d7823d4..77246cb 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -17,7 +17,7 @@
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 
 /**
  * Interface for low level messaging primitives.
@@ -48,7 +48,7 @@
      * @param handler message handler
      * @param executor executor to use for running message handler logic.
      */
-    public void registerHandler(String type, MessageHandler handler, ExecutorService executor);
+    public void registerHandler(String type, MessageHandler handler, Executor executor);
 
     /**
      * Registers a new message handler for message type.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index f96bb0b..3bdf0ac 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -42,7 +42,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -202,11 +202,11 @@
     }
 
     @Override
-    public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
+    public void registerHandler(String type, MessageHandler handler, Executor executor) {
         handlers.put(type, new MessageHandler() {
             @Override
             public void handle(Message message) throws IOException {
-                executor.submit(() -> {
+                executor.execute(() -> {
                     try {
                         handler.handle(message);
                     } catch (Exception e) {