GossipStores: remove potentially blocking method out of netty thread

Change-Id: I2da9ba745c3a63bf9709fb77c1f260ea8f4529a8
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index d11fa11..78838d0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -78,6 +78,8 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -160,8 +162,11 @@
         }
     };
 
+    private ExecutorService executor;
+
     private ScheduledExecutorService backgroundExecutor;
 
+
     @Activate
     public void activate() {
         clusterCommunicator.addSubscriber(
@@ -178,6 +183,8 @@
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
 
+        executor = Executors.newCachedThreadPool(namedThreads("device-fg-%d"));
+
         backgroundExecutor =
                 newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
 
@@ -194,6 +201,8 @@
     @Deactivate
     public void deactivate() {
 
+        executor.shutdownNow();
+
         backgroundExecutor.shutdownNow();
         try {
             boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
@@ -1258,32 +1267,54 @@
         }
     }
 
-    private class InternalDeviceEventListener implements ClusterMessageHandler {
+    private final class InternalDeviceEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received device update event from peer: {}", message.sender());
-            InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+            InternalDeviceEvent event = SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
 
-            notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling device update", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+    private final class InternalDeviceOfflineEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received device offline event from peer: {}", message.sender());
-            InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+            InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
 
             DeviceId deviceId = event.deviceId();
             Timestamp timestamp = event.timestamp();
 
-            notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling device offline", e);
+                    }
+                }
+            });
         }
     }
 
@@ -1293,30 +1324,53 @@
         public void handle(ClusterMessage message) {
             log.debug("Received device remove request from peer: {}", message.sender());
             DeviceId did = SERIALIZER.decode(message.payload());
-            removeDevice(did);
+
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        removeDevice(did);
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling device remove", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+    private final class InternalDeviceRemovedEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received device removed event from peer: {}", message.sender());
-            InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+            InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
 
             DeviceId deviceId = event.deviceId();
             Timestamp timestamp = event.timestamp();
 
-            notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling device removed", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalPortEventListener implements ClusterMessageHandler {
+    private final class InternalPortEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received port update event from peer: {}", message.sender());
-            InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
+            InternalPortEvent event = SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -1328,16 +1382,27 @@
                 return;
             }
 
-            notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling port update", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalPortStatusEventListener implements ClusterMessageHandler {
+    private final class InternalPortStatusEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received port status update event from peer: {}", message.sender());
-            InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+            InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
@@ -1349,7 +1414,17 @@
                 return;
             }
 
-            notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling port update", e);
+                    }
+                }
+            });
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index 41d72c3..5e7048a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -34,6 +34,8 @@
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -137,6 +139,8 @@
         }
     };
 
+    private ExecutorService executor;
+
     private ScheduledExecutorService backgroundExecutor;
 
     @Activate
@@ -151,6 +155,8 @@
                 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
                 new InternalHostAntiEntropyAdvertisementListener());
 
+        executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
+
         backgroundExecutor =
                 newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
 
@@ -166,6 +172,7 @@
 
     @Deactivate
     public void deactivate() {
+        executor.shutdownNow();
         backgroundExecutor.shutdownNow();
         try {
             if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -459,33 +466,58 @@
         }
     }
 
-    private class InternalHostEventListener implements ClusterMessageHandler {
+    private final class InternalHostEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received host update event from peer: {}", message.sender());
-            InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
+            InternalHostEvent event = SERIALIZER.decode(message.payload());
 
             ProviderId providerId = event.providerId();
             HostId hostId = event.hostId();
             HostDescription hostDescription = event.hostDescription();
             Timestamp timestamp = event.timestamp();
 
-            notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
+                                                                           hostId,
+                                                                           hostDescription,
+                                                                           timestamp));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling host removed", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalHostRemovedEventListener implements ClusterMessageHandler {
+    private final class InternalHostRemovedEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
             log.debug("Received host removed event from peer: {}", message.sender());
-            InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
+            InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
 
             HostId hostId = event.hostId();
             Timestamp timestamp = event.timestamp();
 
-            notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling host removed", e);
+                    }
+                }
+            });
         }
     }
 
@@ -636,8 +668,8 @@
         }
     }
 
-    private final class InternalHostAntiEntropyAdvertisementListener implements
-            ClusterMessageHandler {
+    private final class InternalHostAntiEntropyAdvertisementListener
+            implements ClusterMessageHandler {
 
         @Override
         public void handle(ClusterMessage message) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index c465866..2a60120 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -71,6 +71,8 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -141,6 +143,8 @@
         }
     };
 
+    private ExecutorService executor;
+
     private ScheduledExecutorService backgroundExecutors;
 
     @Activate
@@ -156,6 +160,8 @@
                 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
                 new InternalLinkAntiEntropyAdvertisementListener());
 
+        executor = Executors.newCachedThreadPool(namedThreads("link-fg-%d"));
+
         backgroundExecutors =
                 newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
 
@@ -172,6 +178,8 @@
     @Deactivate
     public void deactivate() {
 
+        executor.shutdownNow();
+
         backgroundExecutors.shutdownNow();
         try {
             if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -762,7 +770,8 @@
         }
     }
 
-    private class InternalLinkEventListener implements ClusterMessageHandler {
+    private final class InternalLinkEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
@@ -772,11 +781,22 @@
             ProviderId providerId = event.providerId();
             Timestamped<LinkDescription> linkDescription = event.linkDescription();
 
-            notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling link event", e);
+                    }
+                }
+            });
         }
     }
 
-    private class InternalLinkRemovedEventListener implements ClusterMessageHandler {
+    private final class InternalLinkRemovedEventListener
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
 
@@ -786,11 +806,22 @@
             LinkKey linkKey = event.linkKey();
             Timestamp timestamp = event.timestamp();
 
-            notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling link removed", e);
+                    }
+                }
+            });
         }
     }
 
-    private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
+    private final class InternalLinkAntiEntropyAdvertisementListener
+            implements ClusterMessageHandler {
 
         @Override
         public void handle(ClusterMessage message) {