Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 6effe1b..708faf6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -78,6 +78,9 @@
private final Logger log = getLogger(getClass());
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
@@ -102,6 +105,8 @@
private final ExecutorService futureListeners = Executors
.newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders"));
+ private ExecutorService messageHandlingExecutor;
+
protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -120,6 +125,11 @@
@Activate
public void activate() {
+
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/flow", "message-handlers"));
+
clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
new ClusterMessageHandler() {
@@ -141,7 +151,7 @@
}
}, futureListeners);
}
- });
+ }, messageHandlingExecutor);
replicaInfoManager.addListener(replicaInfoEventListener);
@@ -151,6 +161,7 @@
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
+ messageHandlingExecutor.shutdown();
replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}