Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.

Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 19e77bb..fe8f1a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -162,13 +162,13 @@
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalPutEventListener());
+                                          new InternalPutEventListener(), executor);
         removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
         clusterCommunicator.addSubscriber(removeMessageSubject,
-                                          new InternalRemoveEventListener());
+                                          new InternalRemoveEventListener(), executor);
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
-                                          new InternalAntiEntropyListener());
+                                          new InternalAntiEntropyListener(), backgroundExecutor);
     }
 
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -728,13 +728,11 @@
             log.trace("Received anti-entropy advertisement from peer: {}",
                       message.sender());
             AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
-            backgroundExecutor.submit(() -> {
-                try {
-                    handleAntiEntropyAdvertisement(advertisement);
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling advertisements", e);
-                }
-            });
+            try {
+                handleAntiEntropyAdvertisement(advertisement);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling advertisements", e);
+            }
         }
     }
 
@@ -745,25 +743,23 @@
             log.debug("Received put event from peer: {}", message.sender());
             InternalPutEvent<K, V> event = serializer.decode(message.payload());
 
-            executor.submit(() -> {
-                try {
-                    for (PutEntry<K, V> entry : event.entries()) {
-                        K key = entry.key();
-                        V value = entry.value();
-                        Timestamp timestamp = entry.timestamp();
+            try {
+                for (PutEntry<K, V> entry : event.entries()) {
+                    K key = entry.key();
+                    V value = entry.value();
+                    Timestamp timestamp = entry.timestamp();
 
-                        if (putInternal(key, value, timestamp)) {
-                            EventuallyConsistentMapEvent<K, V> externalEvent =
-                                    new EventuallyConsistentMapEvent<>(
-                                            EventuallyConsistentMapEvent.Type.PUT, key,
-                                            value);
-                            notifyListeners(externalEvent);
-                        }
+                    if (putInternal(key, value, timestamp)) {
+                        EventuallyConsistentMapEvent<K, V> externalEvent =
+                                new EventuallyConsistentMapEvent<>(
+                                        EventuallyConsistentMapEvent.Type.PUT, key,
+                                        value);
+                        notifyListeners(externalEvent);
                     }
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling put", e);
                 }
-            });
+            } catch (Exception e) {
+                log.warn("Exception thrown handling put", e);
+            }
         }
     }
 
@@ -773,25 +769,22 @@
         public void handle(ClusterMessage message) {
             log.debug("Received remove event from peer: {}", message.sender());
             InternalRemoveEvent<K> event = serializer.decode(message.payload());
+            try {
+                for (RemoveEntry<K> entry : event.entries()) {
+                    K key = entry.key();
+                    Timestamp timestamp = entry.timestamp();
 
-            executor.submit(() -> {
-                try {
-                    for (RemoveEntry<K> entry : event.entries()) {
-                        K key = entry.key();
-                        Timestamp timestamp = entry.timestamp();
-
-                        if (removeInternal(key, timestamp)) {
-                            EventuallyConsistentMapEvent<K, V> externalEvent
-                                    = new EventuallyConsistentMapEvent<>(
-                                    EventuallyConsistentMapEvent.Type.REMOVE,
-                                    key, null);
-                            notifyListeners(externalEvent);
-                        }
+                    if (removeInternal(key, timestamp)) {
+                        EventuallyConsistentMapEvent<K, V> externalEvent
+                        = new EventuallyConsistentMapEvent<>(
+                                EventuallyConsistentMapEvent.Type.REMOVE,
+                                key, null);
+                        notifyListeners(externalEvent);
                     }
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling remove", e);
                 }
-            });
+            } catch (Exception e) {
+                log.warn("Exception thrown handling remove", e);
+            }
         }
     }