[SDFAB-20] Prevent listeners ejection and the stop of the group polling

Offload listeners processing to external executors to prevent
the listener ejection due to time consuming processing

In future, we may want to extend the same fix to the
HostManager and NetworkConfigHostProvider

Additionally, avoid the propagation of the exceptions in GroupDriverProvider
which leads to the cancellation of the peridioc poll task

Change-Id: I8ea4ec9fda1ccc48bbd3855fd443ee8760cbbb60
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
index d55ca7f..f36e7a8 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -52,9 +52,13 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
 /**
  * Entity that manages gNMI subscription for devices using OpenConfig models and
  * that reports relevant events to the core.
@@ -78,6 +82,8 @@
 
     private final Striped<Lock> deviceLocks = Striped.lock(30);
 
+    private ExecutorService eventExecutor;
+
     GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
                               MastershipService mastershipService,
                               DeviceProviderService providerService) {
@@ -88,6 +94,8 @@
     }
 
     public void activate() {
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
+                "onos/gnmi", "events-%d", log));
         deviceService.addListener(deviceEventListener);
         mastershipService.addListener(mastershipListener);
         gnmiController.addListener(gnmiEventListener);
@@ -100,6 +108,8 @@
         deviceService.removeListener(deviceEventListener);
         mastershipService.removeListener(mastershipListener);
         gnmiController.removeListener(gnmiEventListener);
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
     }
 
     private void checkSubscription(DeviceId deviceId) {
@@ -248,19 +258,21 @@
 
         @Override
         public void event(GnmiEvent event) {
-            if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
-                log.warn("Received gNMI event from {}, but we did'nt expect to " +
-                                 "be subscribed to it! Discarding event...",
-                         event.subject().deviceId());
-                return;
-            }
+            eventExecutor.execute(() -> {
+                if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
+                    log.warn("Received gNMI event from {}, but we did'nt expect to " +
+                                    "be subscribed to it! Discarding event...",
+                            event.subject().deviceId());
+                    return;
+                }
 
-            log.debug("Received gNMI event {}", event.toString());
-            if (event.type() == GnmiEvent.Type.UPDATE) {
-                handleGnmiUpdate((GnmiUpdate) event.subject());
-            } else {
-                log.debug("Unsupported gNMI event type: {}", event.type());
-            }
+                log.debug("Received gNMI event {}", event.toString());
+                if (event.type() == GnmiEvent.Type.UPDATE) {
+                    handleGnmiUpdate((GnmiUpdate) event.subject());
+                } else {
+                    log.debug("Unsupported gNMI event type: {}", event.type());
+                }
+            });
         }
     }
 
@@ -268,7 +280,7 @@
 
         @Override
         public void event(MastershipEvent event) {
-            checkSubscription(event.subject());
+            eventExecutor.execute(() -> checkSubscription(event.subject()));
         }
     }
 
@@ -276,18 +288,20 @@
 
         @Override
         public void event(DeviceEvent event) {
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                case DEVICE_UPDATED:
-                case DEVICE_REMOVED:
-                case PORT_ADDED:
-                case PORT_REMOVED:
-                    checkSubscription(event.subject().id());
-                    break;
-                default:
-                    break;
-            }
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case DEVICE_ADDED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                    case DEVICE_UPDATED:
+                    case DEVICE_REMOVED:
+                    case PORT_ADDED:
+                    case PORT_REMOVED:
+                        checkSubscription(event.subject().id());
+                        break;
+                    default:
+                        break;
+                }
+            });
         }
     }
 }