Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 9fd4c7b..35e28c7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,9 +15,12 @@
*/
package org.onosproject.store.packet.impl;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -55,6 +58,9 @@
private final Logger log = getLogger(getClass());
+ // TODO: make this configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MastershipService mastershipService;
@@ -77,16 +83,24 @@
}
};
+ private ExecutorService messageHandlingExecutor;
+
@Activate
public void activate() {
- log.info("Started");
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/flow", "message-handlers"));
communicationService.addSubscriber(
- PACKET_OUT_SUBJECT, new InternalClusterMessageHandler());
+ PACKET_OUT_SUBJECT, new InternalClusterMessageHandler(), messageHandlingExecutor);
+
+ log.info("Started");
}
@Deactivate
public void deactivate() {
+ communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
+ messageHandlingExecutor.shutdown();
log.info("Stopped");
}