Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 7298f2c..a276367 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Maps;
import com.hazelcast.config.TopicConfig;
import com.hazelcast.core.IAtomicLong;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -113,6 +114,8 @@
private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
new MessageSubject("hz-leadership-events");
+ private ExecutorService messageHandlingExecutor;
+
@Activate
protected void activate() {
localNodeId = clusterService.getLocalNode().id();
@@ -124,7 +127,13 @@
topicConfig.setName(TOPIC_HZ_ID);
storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
- clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener());
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/leadership", "message-handler"));
+
+ clusterCommunicator.addSubscriber(
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ new InternalLeadershipEventListener(),
+ messageHandlingExecutor);
log.info("Hazelcast Leadership Service started");
}
@@ -132,6 +141,7 @@
@Deactivate
protected void deactivate() {
eventDispatcher.removeSink(LeadershipEvent.class);
+ messageHandlingExecutor.shutdown();
clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
for (Topic topic : topics.values()) {