Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 69806b1..b84e193 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -41,6 +41,7 @@
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@@ -213,6 +214,22 @@
}
@Override
+ public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
+ handlers.putIfAbsent(hashToLong(type), new MessageHandler() {
+ @Override
+ public void handle(Message message) throws IOException {
+ executor.submit(() -> {
+ try {
+ handler.handle(message);
+ } catch (Exception e) {
+ log.warn("Failed to process message of type {}", type, e);
+ }
+ });
+ }
+ });
+ }
+
+ @Override
public void unregisterHandler(String type) {
handlers.remove(type);
}