Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.

Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 86bc173..e28a753 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -107,6 +107,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    // TODO: Make configurable.
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+
     private InternalFlowTable flowTable = new InternalFlowTable();
 
     /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
@@ -132,6 +135,7 @@
     // Cache of SMaps used for backup data.  each SMap contain device flow table
     private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
 
+    private ExecutorService messageHandlingExecutor;
 
     private final ExecutorService backupExecutors =
             Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
@@ -172,7 +176,11 @@
 
         final NodeId local = clusterService.getLocalNode().id();
 
-        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
+        messageHandlingExecutor = Executors.newFixedThreadPool(
+                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                groupedThreads("onos/flow", "message-handlers"));
+
+        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
             @Override
@@ -181,7 +189,7 @@
                 log.trace("received completed notification for {}", event);
                 notifyDelegate(event);
             }
-        });
+        }, messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
 
@@ -196,7 +204,7 @@
                     log.error("Failed to respond back", e);
                 }
             }
-        });
+        }, messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
 
@@ -211,7 +219,7 @@
                     log.error("Failed to respond to peer's getFlowEntries request", e);
                 }
             }
-        });
+        }, messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
 
@@ -226,7 +234,7 @@
                     log.error("Failed to respond back", e);
                 }
             }
-        });
+        }, messageHandlingExecutor);
 
         replicaInfoEventListener = new InternalReplicaInfoEventListener();
 
@@ -242,6 +250,7 @@
         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
         clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
+        messageHandlingExecutor.shutdown();
         replicaInfoManager.removeListener(replicaInfoEventListener);
         log.info("Stopped");
     }
@@ -421,7 +430,7 @@
                     switch (op.operator()) {
                         case ADD:
                             entry = new DefaultFlowEntry(op.target());
-                            // always add requested FlowRule

+                            // always add requested FlowRule
                             // Note: 2 equal FlowEntry may have different treatment
                             flowTable.remove(entry.deviceId(), entry);
                             flowTable.add(entry);