Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.

Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index cdab90d..c33d2ea 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -16,10 +16,12 @@
 package org.onosproject.store.cluster.messaging;
 
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.onosproject.cluster.NodeId;
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 // TODO: remove IOExceptions?
 /**
@@ -77,9 +79,19 @@
      * @param subject    message subject
      * @param subscriber message subscriber
      */
+    @Deprecated
     void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
 
     /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject    message subject
+     * @param subscriber message subscriber
+     * @param executor executor to use for running handler.
+     */
+    void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
+
+    /**
      * Removes a subscriber for the specified message subject.
      *
      * @param subject    message subject
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 0fd21d0..1c3c098 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -18,6 +18,7 @@
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -55,6 +56,7 @@
 import java.io.InputStream;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -90,6 +92,8 @@
     private final ScheduledExecutorService executor =
             Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
 
+    private ExecutorService messageHandlingExecutor;
+
     private EventuallyConsistentMap<ApplicationId, Application> apps;
     private EventuallyConsistentMap<Application, InternalState> states;
     private EventuallyConsistentMap<Application, Set<Permission>> permissions;
@@ -109,7 +113,10 @@
                 .register(KryoNamespaces.API)
                 .register(InternalState.class);
 
-        clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer());
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/store/app", "message-handler"));
+
+        clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
 
         apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
                                                  clusterCommunicator,
@@ -145,6 +152,8 @@
 
     @Deactivate
     public void deactivate() {
+        clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
+        messageHandlingExecutor.shutdown();
         apps.destroy();
         states.destroy();
         permissions.destroy();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 7298f2c..a276367 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -18,6 +18,7 @@
 import com.google.common.collect.Maps;
 import com.hazelcast.config.TopicConfig;
 import com.hazelcast.core.IAtomicLong;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -113,6 +114,8 @@
     private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
             new MessageSubject("hz-leadership-events");
 
+    private ExecutorService messageHandlingExecutor;
+
     @Activate
     protected void activate() {
         localNodeId = clusterService.getLocalNode().id();
@@ -124,7 +127,13 @@
         topicConfig.setName(TOPIC_HZ_ID);
         storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
 
-        clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener());
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/store/leadership", "message-handler"));
+
+        clusterCommunicator.addSubscriber(
+                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                new InternalLeadershipEventListener(),
+                messageHandlingExecutor);
 
         log.info("Hazelcast Leadership Service started");
     }
@@ -132,6 +141,7 @@
     @Deactivate
     protected void deactivate() {
         eventDispatcher.removeSink(LeadershipEvent.class);
+        messageHandlingExecutor.shutdown();
         clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
 
         for (Topic topic : topics.values()) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
index 6919ea1..2e414d4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
@@ -18,6 +18,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -43,6 +44,7 @@
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -92,6 +94,8 @@
     private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
     private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
 
+    private ExecutorService messageHandlingExecutor;
+
     public static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
@@ -109,9 +113,14 @@
         addListener(peerAdvertiser);
         addListener(leaderBoardUpdater);
 
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                        groupedThreads("onos/store/leadership",
+                        "peer-advertisement-handler"));
+
         clusterCommunicator.addSubscriber(
                 LEADERSHIP_UPDATES,
-                new PeerAdvertisementHandler());
+                new PeerAdvertisementHandler(),
+                messageHandlingExecutor);
 
         log.info("Started.");
     }
@@ -123,6 +132,7 @@
 
         clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
 
+        messageHandlingExecutor.shutdown();
         threadPool.shutdown();
 
         log.info("Stopped.");
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 5c20718..2df3e7a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.cluster.messaging.impl;
 
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -44,6 +45,7 @@
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -183,6 +185,13 @@
     }
 
     @Override
+    public void addSubscriber(MessageSubject subject,
+                              ClusterMessageHandler subscriber,
+                              ExecutorService executor) {
+        messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
+    }
+
+    @Override
     public void removeSubscriber(MessageSubject subject) {
         messagingService.unregisterHandler(subject.value());
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 602fd9f..f16bd06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -176,29 +176,36 @@
 
     @Activate
     public void activate() {
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
-        clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
 
         executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
 
         backgroundExecutor =
                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
 
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+                new InternalDeviceOfflineEventListener(),
+                executor);
+        clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
+                new InternalRemoveRequestListener(),
+                executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
+                new InternalDeviceAdvertisementListener(),
+                backgroundExecutor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
+
         // start anti-entropy thread
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                     initialDelaySec, periodSec, TimeUnit.SECONDS);
@@ -1325,17 +1332,11 @@
             DeviceId deviceId = event.deviceId();
             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling device update", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device update", e);
+            }
         }
     }
 
@@ -1350,17 +1351,11 @@
             DeviceId deviceId = event.deviceId();
             Timestamp timestamp = event.timestamp();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling device offline", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device offline", e);
+            }
         }
     }
 
@@ -1371,17 +1366,11 @@
             log.debug("Received device remove request from peer: {}", message.sender());
             DeviceId did = SERIALIZER.decode(message.payload());
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        removeDevice(did);
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling device remove", e);
-                    }
-                }
-            });
+            try {
+                removeDevice(did);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device remove", e);
+            }
         }
     }
 
@@ -1396,17 +1385,11 @@
             DeviceId deviceId = event.deviceId();
             Timestamp timestamp = event.timestamp();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling device removed", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device removed", e);
+            }
         }
     }
 
@@ -1428,17 +1411,11 @@
                 return;
             }
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling port update", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling port update", e);
+            }
         }
     }
 
@@ -1460,17 +1437,11 @@
                 return;
             }
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling port update", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling port update", e);
+            }
         }
     }
 
@@ -1481,17 +1452,11 @@
         public void handle(ClusterMessage message) {
             log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
             DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            backgroundExecutor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        handleAdvertisement(advertisement);
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling Device advertisements.", e);
-                    }
-                }
-            });
+            try {
+                handleAdvertisement(advertisement);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling Device advertisements.", e);
+            }
         }
     }
 
@@ -1507,13 +1472,11 @@
             DeviceId deviceId = event.deviceId();
             DeviceDescription deviceDescription = event.deviceDescription();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    createOrUpdateDevice(providerId, deviceId, deviceDescription);
-                }
-            });
+            try {
+                createOrUpdateDevice(providerId, deviceId, deviceDescription);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device injected event.", e);
+            }
         }
     }
 
@@ -1529,13 +1492,11 @@
             DeviceId deviceId = event.deviceId();
             List<PortDescription> portDescriptions = event.portDescriptions();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    updatePorts(providerId, deviceId, portDescriptions);
-                }
-            });
+            try {
+                updatePorts(providerId, deviceId, portDescriptions);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling port injected event.", e);
+            }
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 19e77bb..fe8f1a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -162,13 +162,13 @@
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalPutEventListener());
+                                          new InternalPutEventListener(), executor);
         removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
         clusterCommunicator.addSubscriber(removeMessageSubject,
-                                          new InternalRemoveEventListener());
+                                          new InternalRemoveEventListener(), executor);
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
-                                          new InternalAntiEntropyListener());
+                                          new InternalAntiEntropyListener(), backgroundExecutor);
     }
 
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -728,13 +728,11 @@
             log.trace("Received anti-entropy advertisement from peer: {}",
                       message.sender());
             AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
-            backgroundExecutor.submit(() -> {
-                try {
-                    handleAntiEntropyAdvertisement(advertisement);
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling advertisements", e);
-                }
-            });
+            try {
+                handleAntiEntropyAdvertisement(advertisement);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling advertisements", e);
+            }
         }
     }
 
@@ -745,25 +743,23 @@
             log.debug("Received put event from peer: {}", message.sender());
             InternalPutEvent<K, V> event = serializer.decode(message.payload());
 
-            executor.submit(() -> {
-                try {
-                    for (PutEntry<K, V> entry : event.entries()) {
-                        K key = entry.key();
-                        V value = entry.value();
-                        Timestamp timestamp = entry.timestamp();
+            try {
+                for (PutEntry<K, V> entry : event.entries()) {
+                    K key = entry.key();
+                    V value = entry.value();
+                    Timestamp timestamp = entry.timestamp();
 
-                        if (putInternal(key, value, timestamp)) {
-                            EventuallyConsistentMapEvent<K, V> externalEvent =
-                                    new EventuallyConsistentMapEvent<>(
-                                            EventuallyConsistentMapEvent.Type.PUT, key,
-                                            value);
-                            notifyListeners(externalEvent);
-                        }
+                    if (putInternal(key, value, timestamp)) {
+                        EventuallyConsistentMapEvent<K, V> externalEvent =
+                                new EventuallyConsistentMapEvent<>(
+                                        EventuallyConsistentMapEvent.Type.PUT, key,
+                                        value);
+                        notifyListeners(externalEvent);
                     }
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling put", e);
                 }
-            });
+            } catch (Exception e) {
+                log.warn("Exception thrown handling put", e);
+            }
         }
     }
 
@@ -773,25 +769,22 @@
         public void handle(ClusterMessage message) {
             log.debug("Received remove event from peer: {}", message.sender());
             InternalRemoveEvent<K> event = serializer.decode(message.payload());
+            try {
+                for (RemoveEntry<K> entry : event.entries()) {
+                    K key = entry.key();
+                    Timestamp timestamp = entry.timestamp();
 
-            executor.submit(() -> {
-                try {
-                    for (RemoveEntry<K> entry : event.entries()) {
-                        K key = entry.key();
-                        Timestamp timestamp = entry.timestamp();
-
-                        if (removeInternal(key, timestamp)) {
-                            EventuallyConsistentMapEvent<K, V> externalEvent
-                                    = new EventuallyConsistentMapEvent<>(
-                                    EventuallyConsistentMapEvent.Type.REMOVE,
-                                    key, null);
-                            notifyListeners(externalEvent);
-                        }
+                    if (removeInternal(key, timestamp)) {
+                        EventuallyConsistentMapEvent<K, V> externalEvent
+                        = new EventuallyConsistentMapEvent<>(
+                                EventuallyConsistentMapEvent.Type.REMOVE,
+                                key, null);
+                        notifyListeners(externalEvent);
                     }
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling remove", e);
                 }
-            });
+            } catch (Exception e) {
+                log.warn("Exception thrown handling remove", e);
+            }
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 86bc173..e28a753 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -107,6 +107,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    // TODO: Make configurable.
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+
     private InternalFlowTable flowTable = new InternalFlowTable();
 
     /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
@@ -132,6 +135,7 @@
     // Cache of SMaps used for backup data.  each SMap contain device flow table
     private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
 
+    private ExecutorService messageHandlingExecutor;
 
     private final ExecutorService backupExecutors =
             Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
@@ -172,7 +176,11 @@
 
         final NodeId local = clusterService.getLocalNode().id();
 
-        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
+        messageHandlingExecutor = Executors.newFixedThreadPool(
+                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                groupedThreads("onos/flow", "message-handlers"));
+
+        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
             @Override
@@ -181,7 +189,7 @@
                 log.trace("received completed notification for {}", event);
                 notifyDelegate(event);
             }
-        });
+        }, messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
 
@@ -196,7 +204,7 @@
                     log.error("Failed to respond back", e);
                 }
             }
-        });
+        }, messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
 
@@ -211,7 +219,7 @@
                     log.error("Failed to respond to peer's getFlowEntries request", e);
                 }
             }
-        });
+        }, messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
 
@@ -226,7 +234,7 @@
                     log.error("Failed to respond back", e);
                 }
             }
-        });
+        }, messageHandlingExecutor);
 
         replicaInfoEventListener = new InternalReplicaInfoEventListener();
 
@@ -242,6 +250,7 @@
         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
         clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
+        messageHandlingExecutor.shutdown();
         replicaInfoManager.removeListener(replicaInfoEventListener);
         log.info("Stopped");
     }
@@ -421,7 +430,7 @@
                     switch (op.operator()) {
                         case ADD:
                             entry = new DefaultFlowEntry(op.target());
-                            // always add requested FlowRule

+                            // always add requested FlowRule
                             // Note: 2 equal FlowEntry may have different treatment
                             flowTable.remove(entry.deviceId(), entry);
                             flowTable.add(entry);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 6effe1b..708faf6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -78,6 +78,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    // TODO: Make configurable.
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ReplicaInfoService replicaInfoManager;
 
@@ -102,6 +105,8 @@
     private final ExecutorService futureListeners = Executors
             .newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders"));
 
+    private ExecutorService messageHandlingExecutor;
+
     protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
@@ -120,6 +125,11 @@
 
     @Activate
     public void activate() {
+
+        messageHandlingExecutor = Executors.newFixedThreadPool(
+                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                groupedThreads("onos/flow", "message-handlers"));
+
         clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
             new ClusterMessageHandler() {
 
@@ -141,7 +151,7 @@
                       }
                   }, futureListeners);
               }
-            });
+            }, messageHandlingExecutor);
 
         replicaInfoManager.addListener(replicaInfoEventListener);
 
@@ -151,6 +161,7 @@
     @Deactivate
     public void deactivate() {
         clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
+        messageHandlingExecutor.shutdown();
         replicaInfoManager.removeListener(replicaInfoEventListener);
         log.info("Stopped");
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
index f472d27..c65dc79 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
@@ -154,21 +154,22 @@
 
     @Activate
     public void activate() {
-        clusterCommunicator.addSubscriber(
-                HOST_UPDATED_MSG,
-                new InternalHostEventListener());
-        clusterCommunicator.addSubscriber(
-                HOST_REMOVED_MSG,
-                new InternalHostRemovedEventListener());
-        clusterCommunicator.addSubscriber(
-                HOST_ANTI_ENTROPY_ADVERTISEMENT,
-                new InternalHostAntiEntropyAdvertisementListener());
 
         executor = newCachedThreadPool(groupedThreads("onos/host", "fg-%d"));
 
         backgroundExecutor =
                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/host", "bg-%d")));
 
+        clusterCommunicator.addSubscriber(
+                HOST_UPDATED_MSG,
+                new InternalHostEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                HOST_REMOVED_MSG,
+                new InternalHostRemovedEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                HOST_ANTI_ENTROPY_ADVERTISEMENT,
+                new InternalHostAntiEntropyAdvertisementListener(), backgroundExecutor);
+
         // start anti-entropy thread
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                     initialDelaySec, periodSec, TimeUnit.SECONDS);
@@ -512,20 +513,14 @@
             HostDescription hostDescription = event.hostDescription();
             Timestamp timestamp = event.timestamp();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
-                                                                           hostId,
-                                                                           hostDescription,
-                                                                           timestamp));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling host removed", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
+                        hostId,
+                        hostDescription,
+                        timestamp));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling host removed", e);
+            }
         }
     }
 
@@ -540,17 +535,11 @@
             HostId hostId = event.hostId();
             Timestamp timestamp = event.timestamp();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling host removed", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling host removed", e);
+            }
         }
     }
 
@@ -720,17 +709,11 @@
         public void handle(ClusterMessage message) {
             log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
             HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            backgroundExecutor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        handleAntiEntropyAdvertisement(advertisement);
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling Host advertisements", e);
-                    }
-                }
-            });
+            try {
+                handleAntiEntropyAdvertisement(advertisement);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling Host advertisements", e);
+            }
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 60d54e3..54c35ec 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -158,24 +158,24 @@
     @Activate
     public void activate() {
 
-        clusterCommunicator.addSubscriber(
-                GossipLinkStoreMessageSubjects.LINK_UPDATE,
-                new InternalLinkEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipLinkStoreMessageSubjects.LINK_REMOVED,
-                new InternalLinkRemovedEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
-                new InternalLinkAntiEntropyAdvertisementListener());
-        clusterCommunicator.addSubscriber(
-                GossipLinkStoreMessageSubjects.LINK_INJECTED,
-                new LinkInjectedEventListener());
-
         executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
 
         backgroundExecutors =
                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
 
+        clusterCommunicator.addSubscriber(
+                GossipLinkStoreMessageSubjects.LINK_UPDATE,
+                new InternalLinkEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipLinkStoreMessageSubjects.LINK_REMOVED,
+                new InternalLinkRemovedEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
+                new InternalLinkAntiEntropyAdvertisementListener(), backgroundExecutors);
+        clusterCommunicator.addSubscriber(
+                GossipLinkStoreMessageSubjects.LINK_INJECTED,
+                new LinkInjectedEventListener(), executor);
+
         long initialDelaySec = 5;
         long periodSec = 5;
         // start anti-entropy thread
@@ -822,17 +822,11 @@
             ProviderId providerId = event.providerId();
             Timestamped<LinkDescription> linkDescription = event.linkDescription();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling link event", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling link event", e);
+            }
         }
     }
 
@@ -847,17 +841,11 @@
             LinkKey linkKey = event.linkKey();
             Timestamp timestamp = event.timestamp();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling link removed", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling link removed", e);
+            }
         }
     }
 
@@ -868,18 +856,12 @@
         public void handle(ClusterMessage message) {
             log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
             LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            backgroundExecutors.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        handleAntiEntropyAdvertisement(advertisement);
-                    } catch (Exception e) {
-                        log.warn("Exception thrown while handling Link advertisements", e);
-                        throw e;
-                    }
-                }
-            });
+            try {
+                handleAntiEntropyAdvertisement(advertisement);
+            } catch (Exception e) {
+                log.warn("Exception thrown while handling Link advertisements", e);
+                throw e;
+            }
         }
     }
 
@@ -894,13 +876,11 @@
             ProviderId providerId = linkInjectedEvent.providerId();
             LinkDescription linkDescription = linkInjectedEvent.linkDescription();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    createOrUpdateLink(providerId, linkDescription);
-                }
-            });
+            try {
+                createOrUpdateLink(providerId, linkDescription);
+            } catch (Exception e) {
+                log.warn("Exception thrown while handling link injected event", e);
+            }
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 9fd4c7b..35e28c7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,9 +15,12 @@
  */
 package org.onosproject.store.packet.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -55,6 +58,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    // TODO: make this configurable.
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private MastershipService mastershipService;
 
@@ -77,16 +83,24 @@
         }
     };
 
+    private ExecutorService messageHandlingExecutor;
+
     @Activate
     public void activate() {
-        log.info("Started");
+        messageHandlingExecutor =  Executors.newFixedThreadPool(
+                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                groupedThreads("onos/flow", "message-handlers"));
 
         communicationService.addSubscriber(
-                PACKET_OUT_SUBJECT, new InternalClusterMessageHandler());
+                PACKET_OUT_SUBJECT, new InternalClusterMessageHandler(), messageHandlingExecutor);
+
+        log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
+        messageHandlingExecutor.shutdown();
         log.info("Stopped");
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index e224cd7..9c12528 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.statistic.impl;
 
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -48,11 +49,14 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -68,6 +72,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    // TODO: Make configurable.
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ReplicaInfoService replicaInfoManager;
 
@@ -97,10 +104,17 @@
         }
     };;
 
+    private ExecutorService messageHandlingExecutor;
+
     private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
 
     @Activate
     public void activate() {
+
+        messageHandlingExecutor = Executors.newFixedThreadPool(
+                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                groupedThreads("onos/store/statistic", "message-handlers"));
+
         clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
 
             @Override
@@ -112,7 +126,7 @@
                     log.error("Failed to respond back", e);
                 }
             }
-        });
+        }, messageHandlingExecutor);
 
         clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
 
@@ -125,12 +139,15 @@
                     log.error("Failed to respond back", e);
                 }
             }
-        });
+        }, messageHandlingExecutor);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        clusterCommunicator.removeSubscriber(GET_PREVIOUS);
+        clusterCommunicator.removeSubscriber(GET_CURRENT);
+        messageHandlingExecutor.shutdown();
         log.info("Stopped");
     }
 
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index 3711f7f..7054bd3 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -17,6 +17,7 @@
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+
 import org.easymock.Capture;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -62,6 +63,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.Arrays.asList;
@@ -152,7 +154,7 @@
 
         clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
         clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
-                                    anyObject(ClusterMessageHandler.class));
+                                    anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
         expectLastCall().anyTimes();
         replay(clusterCommunicator);
         ClusterService clusterService = new TestClusterService();
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index fc85795..e6670de 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -46,6 +46,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -129,7 +130,7 @@
         // allows us to get a reference to the map's internal cluster message
         // handlers so we can induce events coming in from a peer.
         clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
-                anyObject(ClusterMessageHandler.class));
+                anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
         expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
 
         replay(clusterCommunicator);
@@ -731,6 +732,21 @@
         }
 
         @Override
+        public void addSubscriber(MessageSubject subject,
+                                  ClusterMessageHandler subscriber,
+                                  ExecutorService executor) {
+            if (subject.equals(PUT_MESSAGE_SUBJECT)) {
+                putHandler = subscriber;
+            } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
+                removeHandler = subscriber;
+            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+                antiEntropyHandler = subscriber;
+            } else {
+                throw new RuntimeException("Unexpected message subject " + subject.toString());
+            }
+        }
+
+        @Override
         public void removeSubscriber(MessageSubject subject) {}
     }
 
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
index eeca2a8b..03e82bc 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.link.impl;
 
 import com.google.common.collect.Iterables;
+
 import org.easymock.Capture;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -56,6 +57,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.easymock.EasyMock.*;
@@ -140,7 +142,8 @@
         // TODO mock clusterCommunicator
         clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
         clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
-                                    anyObject(ClusterMessageHandler.class));
+                                    anyObject(ClusterMessageHandler.class),
+                                    anyObject(ExecutorService.class));
         expectLastCall().anyTimes();
         replay(clusterCommunicator);
 
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index efc0ae1..12e1d87 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -16,6 +16,7 @@
 package org.onlab.netty;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -47,7 +48,16 @@
      * Registers a new message handler for message type.
      * @param type message type.
      * @param handler message handler
+     * @param executor executor to use for running message handler logic.
      */
+    public void registerHandler(String type, MessageHandler handler, ExecutorService executor);
+
+    /**
+     * Registers a new message handler for message type.
+     * @param type message type.
+     * @param handler message handler
+     */
+    @Deprecated
     public void registerHandler(String type, MessageHandler handler);
 
     /**
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 69806b1..b84e193 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -41,6 +41,7 @@
 import java.net.UnknownHostException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -213,6 +214,22 @@
     }
 
     @Override
+    public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
+        handlers.putIfAbsent(hashToLong(type), new MessageHandler() {
+            @Override
+            public void handle(Message message) throws IOException {
+                executor.submit(() -> {
+                    try {
+                        handler.handle(message);
+                    } catch (Exception e) {
+                        log.warn("Failed to process message of type {}", type, e);
+                    }
+                });
+            }
+        });
+    }
+
+    @Override
     public void unregisterHandler(String type) {
         handlers.remove(type);
     }