Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.

Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 6effe1b..708faf6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -78,6 +78,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    // TODO: Make configurable.
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ReplicaInfoService replicaInfoManager;
 
@@ -102,6 +105,8 @@
     private final ExecutorService futureListeners = Executors
             .newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders"));
 
+    private ExecutorService messageHandlingExecutor;
+
     protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
@@ -120,6 +125,11 @@
 
     @Activate
     public void activate() {
+
+        messageHandlingExecutor = Executors.newFixedThreadPool(
+                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                groupedThreads("onos/flow", "message-handlers"));
+
         clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
             new ClusterMessageHandler() {
 
@@ -141,7 +151,7 @@
                       }
                   }, futureListeners);
               }
-            });
+            }, messageHandlingExecutor);
 
         replicaInfoManager.addListener(replicaInfoEventListener);
 
@@ -151,6 +161,7 @@
     @Deactivate
     public void deactivate() {
         clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
+        messageHandlingExecutor.shutdown();
         replicaInfoManager.removeListener(replicaInfoEventListener);
         log.info("Stopped");
     }