Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index efc0ae1..12e1d87 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -16,6 +16,7 @@
package org.onlab.netty;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
@@ -47,7 +48,16 @@
* Registers a new message handler for message type.
* @param type message type.
* @param handler message handler
+ * @param executor executor to use for running message handler logic.
*/
+ public void registerHandler(String type, MessageHandler handler, ExecutorService executor);
+
+ /**
+ * Registers a new message handler for message type.
+ * @param type message type.
+ * @param handler message handler
+ */
+ @Deprecated
public void registerHandler(String type, MessageHandler handler);
/**
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 69806b1..b84e193 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -41,6 +41,7 @@
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@@ -213,6 +214,22 @@
}
@Override
+ public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
+ handlers.putIfAbsent(hashToLong(type), new MessageHandler() {
+ @Override
+ public void handle(Message message) throws IOException {
+ executor.submit(() -> {
+ try {
+ handler.handle(message);
+ } catch (Exception e) {
+ log.warn("Failed to process message of type {}", type, e);
+ }
+ });
+ }
+ });
+ }
+
+ @Override
public void unregisterHandler(String type) {
handlers.remove(type);
}