GossipStores: remove potentially blocking method out of netty thread

Change-Id: I2da9ba745c3a63bf9709fb77c1f260ea8f4529a8
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index d11fa11..78838d0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -78,6 +78,8 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -160,8 +162,11 @@
         }
     };
 
+    private ExecutorService executor;
+
     private ScheduledExecutorService backgroundExecutor;
 
+
     @Activate
     public void activate() {
         clusterCommunicator.addSubscriber(
@@ -178,6 +183,8 @@
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
 
+        executor = Executors.newCachedThreadPool(namedThreads("device-fg-%d"));
+
         backgroundExecutor =
                 newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
 
@@ -194,6 +201,8 @@
     @Deactivate
     public void deactivate() {
 
+        executor.shutdownNow();
+
         backgroundExecutor.shutdownNow();
         try {
             boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
@@ -1258,32 +1267,54 @@
         }
     }
 
-    private class InternalDeviceEventListener implements ClusterMessageHandler {
+    private final class InternalDeviceEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received device update event from peer: {}", message.sender());
-            InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+            InternalDeviceEvent event = SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
 
-            notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling device update", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+    private final class InternalDeviceOfflineEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received device offline event from peer: {}", message.sender());
-            InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+            InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
 
             DeviceId deviceId = event.deviceId();
             Timestamp timestamp = event.timestamp();
 
-            notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling device offline", e);
+                    }
+                }
+            });
         }
     }
 
@@ -1293,30 +1324,53 @@
         public void handle(ClusterMessage message) {
             log.debug("Received device remove request from peer: {}", message.sender());
             DeviceId did = SERIALIZER.decode(message.payload());
-            removeDevice(did);
+
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        removeDevice(did);
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling device remove", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+    private final class InternalDeviceRemovedEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received device removed event from peer: {}", message.sender());
-            InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+            InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
 
             DeviceId deviceId = event.deviceId();
             Timestamp timestamp = event.timestamp();
 
-            notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling device removed", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalPortEventListener implements ClusterMessageHandler {
+    private final class InternalPortEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received port update event from peer: {}", message.sender());
-            InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
+            InternalPortEvent event = SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -1328,16 +1382,27 @@
                 return;
             }
 
-            notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling port update", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalPortStatusEventListener implements ClusterMessageHandler {
+    private final class InternalPortStatusEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received port status update event from peer: {}", message.sender());
-            InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+            InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -1349,7 +1414,17 @@
                 return;
             }
 
-            notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling port update", e);
+                    }
+                }
+            });
         }
     }