Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 0fd21d0..1c3c098 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -18,6 +18,7 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -55,6 +56,7 @@
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -90,6 +92,8 @@
private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
+ private ExecutorService messageHandlingExecutor;
+
private EventuallyConsistentMap<ApplicationId, Application> apps;
private EventuallyConsistentMap<Application, InternalState> states;
private EventuallyConsistentMap<Application, Set<Permission>> permissions;
@@ -109,7 +113,10 @@
.register(KryoNamespaces.API)
.register(InternalState.class);
- clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer());
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/app", "message-handler"));
+
+ clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
clusterCommunicator,
@@ -145,6 +152,8 @@
@Deactivate
public void deactivate() {
+ clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
+ messageHandlingExecutor.shutdown();
apps.destroy();
states.destroy();
permissions.destroy();