Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.

Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 602fd9f..f16bd06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -176,29 +176,36 @@
 
     @Activate
     public void activate() {
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
-        clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
 
         executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
 
         backgroundExecutor =
                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
 
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+                new InternalDeviceOfflineEventListener(),
+                executor);
+        clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
+                new InternalRemoveRequestListener(),
+                executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
+                new InternalDeviceAdvertisementListener(),
+                backgroundExecutor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
+
         // start anti-entropy thread
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                     initialDelaySec, periodSec, TimeUnit.SECONDS);
@@ -1325,17 +1332,11 @@
             DeviceId deviceId = event.deviceId();
             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling device update", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device update", e);
+            }
         }
     }
 
@@ -1350,17 +1351,11 @@
             DeviceId deviceId = event.deviceId();
             Timestamp timestamp = event.timestamp();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling device offline", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device offline", e);
+            }
         }
     }
 
@@ -1371,17 +1366,11 @@
             log.debug("Received device remove request from peer: {}", message.sender());
             DeviceId did = SERIALIZER.decode(message.payload());
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        removeDevice(did);
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling device remove", e);
-                    }
-                }
-            });
+            try {
+                removeDevice(did);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device remove", e);
+            }
         }
     }
 
@@ -1396,17 +1385,11 @@
             DeviceId deviceId = event.deviceId();
             Timestamp timestamp = event.timestamp();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling device removed", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device removed", e);
+            }
         }
     }
 
@@ -1428,17 +1411,11 @@
                 return;
             }
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling port update", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling port update", e);
+            }
         }
     }
 
@@ -1460,17 +1437,11 @@
                 return;
             }
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling port update", e);
-                    }
-                }
-            });
+            try {
+                notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+            } catch (Exception e) {
+                log.warn("Exception thrown handling port update", e);
+            }
         }
     }
 
@@ -1481,17 +1452,11 @@
         public void handle(ClusterMessage message) {
             log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
             DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            backgroundExecutor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        handleAdvertisement(advertisement);
-                    } catch (Exception e) {
-                        log.warn("Exception thrown handling Device advertisements.", e);
-                    }
-                }
-            });
+            try {
+                handleAdvertisement(advertisement);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling Device advertisements.", e);
+            }
         }
     }
 
@@ -1507,13 +1472,11 @@
             DeviceId deviceId = event.deviceId();
             DeviceDescription deviceDescription = event.deviceDescription();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    createOrUpdateDevice(providerId, deviceId, deviceDescription);
-                }
-            });
+            try {
+                createOrUpdateDevice(providerId, deviceId, deviceDescription);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling device injected event.", e);
+            }
         }
     }
 
@@ -1529,13 +1492,11 @@
             DeviceId deviceId = event.deviceId();
             List<PortDescription> portDescriptions = event.portDescriptions();
 
-            executor.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    updatePorts(providerId, deviceId, portDescriptions);
-                }
-            });
+            try {
+                updatePorts(providerId, deviceId, portDescriptions);
+            } catch (Exception e) {
+                log.warn("Exception thrown handling port injected event.", e);
+            }
         }
     }
 }