Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.

Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 9fd4c7b..35e28c7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,9 +15,12 @@
  */
 package org.onosproject.store.packet.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -55,6 +58,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    // TODO: make this configurable.
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private MastershipService mastershipService;
 
@@ -77,16 +83,24 @@
         }
     };
 
+    private ExecutorService messageHandlingExecutor;
+
     @Activate
     public void activate() {
-        log.info("Started");
+        messageHandlingExecutor =  Executors.newFixedThreadPool(
+                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                groupedThreads("onos/flow", "message-handlers"));
 
         communicationService.addSubscriber(
-                PACKET_OUT_SUBJECT, new InternalClusterMessageHandler());
+                PACKET_OUT_SUBJECT, new InternalClusterMessageHandler(), messageHandlingExecutor);
+
+        log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
+        messageHandlingExecutor.shutdown();
         log.info("Stopped");
     }