[SDFAB-20] Prevent listeners ejection and the stop of the group polling

Offload listeners processing to external executors to prevent
the listener ejection due to time consuming processing

In future, we may want to extend the same fix to the
HostManager and NetworkConfigHostProvider

Additionally, avoid the propagation of the exceptions in GroupDriverProvider
which leads to the cancellation of the peridioc poll task

Change-Id: I8ea4ec9fda1ccc48bbd3855fd443ee8760cbbb60
diff --git a/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java b/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java
index ffb6b4f..bf7dc18 100644
--- a/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java
+++ b/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java
@@ -72,6 +72,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -79,6 +80,8 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -152,6 +155,8 @@
                 }
             };
 
+    protected ExecutorService eventExecutor;
+
     @Activate
     public void activate() {
 
@@ -206,6 +211,10 @@
         // Bootstrap config for already existing devices.
         triggerAllDeviceConfigure();
 
+        // Bootstrap core event executor before adding listener
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
+                "onos/int", "events-%d", log));
+
         hostService.addListener(hostListener);
         deviceService.addListener(deviceListener);
 
@@ -256,6 +265,8 @@
         deviceService.getDevices().forEach(d -> cleanupDevice(d.id()));
         netcfgService.removeListener(appConfigListener);
         netcfgRegistry.unregisterConfigFactory(intAppConfigFactory);
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
         log.info("Deactivated");
     }
 
@@ -503,30 +514,34 @@
     private class InternalHostListener implements HostListener {
         @Override
         public void event(HostEvent event) {
-            final DeviceId deviceId = event.subject().location().deviceId();
-            triggerDeviceConfigure(deviceId);
+            eventExecutor.execute(() -> {
+                final DeviceId deviceId = event.subject().location().deviceId();
+                triggerDeviceConfigure(deviceId);
+            });
         }
     }
 
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                case DEVICE_UPDATED:
-                case DEVICE_REMOVED:
-                case DEVICE_SUSPENDED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                case PORT_ADDED:
-                case PORT_UPDATED:
-                case PORT_REMOVED:
-                    triggerDeviceConfigure(event.subject().id());
-                    return;
-                case PORT_STATS_UPDATED:
-                    return;
-                default:
-                    log.warn("Unknown device event type {}", event.type());
-            }
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case DEVICE_ADDED:
+                    case DEVICE_UPDATED:
+                    case DEVICE_REMOVED:
+                    case DEVICE_SUSPENDED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                    case PORT_ADDED:
+                    case PORT_UPDATED:
+                    case PORT_REMOVED:
+                        triggerDeviceConfigure(event.subject().id());
+                        return;
+                    case PORT_STATS_UPDATED:
+                        return;
+                    default:
+                        log.warn("Unknown device event type {}", event.type());
+                }
+            });
         }
     }
 
@@ -580,56 +595,56 @@
 
         @Override
         public void event(NetworkConfigEvent event) {
-            switch (event.type()) {
-                case CONFIG_ADDED:
-                case CONFIG_UPDATED:
-                    event.config()
-                            .map(config -> (IntReportConfig) config)
-                            .ifPresent(config -> {
-                                IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder()
-                                        .withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs())
-                                        .withCollectorPort(config.collectorPort())
-                                        .withCollectorIp(config.collectorIp())
-                                        .enabled(true)
-                                        .build();
-                                setConfig(intDeviceConfig);
+            eventExecutor.execute(() -> {
+                if (event.configClass() == IntReportConfig.class) {
+                    switch (event.type()) {
+                        case CONFIG_ADDED:
+                        case CONFIG_UPDATED:
+                            event.config()
+                                    .map(config -> (IntReportConfig) config)
+                                    .ifPresent(config -> {
+                                        IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder()
+                                                .withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs())
+                                                .withCollectorPort(config.collectorPort())
+                                                .withCollectorIp(config.collectorIp())
+                                                .enabled(true)
+                                                .build();
+                                        setConfig(intDeviceConfig);
 
-                                // For each watched subnet, we install two INT rules.
-                                // One match on the source, another match on the destination.
-                                intentMap.clear();
-                                config.watchSubnets().forEach(subnet -> {
-                                    IntIntent.Builder intIntentBuilder = IntIntent.builder()
-                                            .withReportType(IntIntent.IntReportType.TRACKED_FLOW)
-                                            .withReportType(IntIntent.IntReportType.DROPPED_PACKET)
-                                            .withReportType(IntIntent.IntReportType.CONGESTED_QUEUE)
-                                            .withTelemetryMode(IntIntent.TelemetryMode.POSTCARD);
-                                    if (subnet.prefixLength() == 0) {
-                                        // Special case, match any packet
-                                        installIntIntent(intIntentBuilder
-                                                .withSelector(DefaultTrafficSelector.emptySelector())
-                                                .build());
-                                    } else {
-                                        TrafficSelector selector = DefaultTrafficSelector.builder()
-                                                .matchIPSrc(subnet)
-                                                .build();
-                                        installIntIntent(intIntentBuilder.withSelector(selector).build());
-                                        selector = DefaultTrafficSelector.builder()
-                                                .matchIPDst(subnet)
-                                                .build();
-                                        installIntIntent(intIntentBuilder.withSelector(selector).build());
-                                    }
-                                });
-                            });
-                    break;
-                // TODO: Support removing INT config.
-                default:
-                    break;
-            }
+                                        // For each watched subnet, we install two INT rules.
+                                        // One match on the source, another match on the destination.
+                                        intentMap.clear();
+                                        config.watchSubnets().forEach(subnet -> {
+                                            IntIntent.Builder intIntentBuilder = IntIntent.builder()
+                                                    .withReportType(IntIntent.IntReportType.TRACKED_FLOW)
+                                                    .withReportType(IntIntent.IntReportType.DROPPED_PACKET)
+                                                    .withReportType(IntIntent.IntReportType.CONGESTED_QUEUE)
+                                                    .withTelemetryMode(IntIntent.TelemetryMode.POSTCARD);
+                                            if (subnet.prefixLength() == 0) {
+                                                // Special case, match any packet
+                                                installIntIntent(intIntentBuilder
+                                                        .withSelector(DefaultTrafficSelector.emptySelector())
+                                                        .build());
+                                            } else {
+                                                TrafficSelector selector = DefaultTrafficSelector.builder()
+                                                        .matchIPSrc(subnet)
+                                                        .build();
+                                                installIntIntent(intIntentBuilder.withSelector(selector).build());
+                                                selector = DefaultTrafficSelector.builder()
+                                                        .matchIPDst(subnet)
+                                                        .build();
+                                                installIntIntent(intIntentBuilder.withSelector(selector).build());
+                                            }
+                                        });
+                                    });
+                            break;
+                        // TODO: Support removing INT config.
+                        default:
+                            break;
+                    }
+                }
+            });
         }
 
-        @Override
-        public boolean isRelevant(NetworkConfigEvent event) {
-            return event.configClass() == IntReportConfig.class;
-        }
     }
 }
diff --git a/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java b/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java
index 8519ed5..7fddf3c 100644
--- a/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java
+++ b/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.After;
@@ -81,6 +82,7 @@
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.onlab.junit.TestTools.assertAfter;
 import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.POSTCARD;
 import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SINK;
 import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SOURCE;
@@ -151,6 +153,7 @@
         manager.hostService = hostService;
         manager.netcfgService = networkConfigService;
         manager.netcfgRegistry = networkConfigRegistry;
+        manager.eventExecutor = MoreExecutors.newDirectExecutorService();
 
         expect(coreService.registerApplication(APP_NAME))
                 .andReturn(APP_ID).anyTimes();
@@ -226,7 +229,7 @@
 
         // The INT intent installation order can be random, so we need to collect
         // all expected INT intents and check if actual intent exists.
-        assertEquals(5, intentMap.size());
+        assertAfter(50, 100, () -> assertEquals(5, intentMap.size()));
         intentMap.entrySet().forEach(entry -> {
             IntIntent actualIntIntent = entry.getValue().value();
             assertTrue(expectedIntIntents.contains(actualIntIntent));
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
index ede60db..5ae2628 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
@@ -35,6 +35,7 @@
 
 import java.time.Duration;
 import java.util.Collection;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
@@ -65,6 +66,9 @@
     private final ScheduledExecutorService reaperExecutor =
             newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log));
 
+    private final ExecutorService eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
+            "onos/routemonitor", "events-%d", log));
+
     /**
      * Creates a new route monitor.
      *
@@ -94,6 +98,8 @@
     public void shutdown() {
         stopProcessing();
         clusterService.removeListener(clusterListener);
+        eventExecutor.shutdownNow();
+        reaperExecutor.shutdownNow();
         asyncLock.unlock();
     }
 
@@ -145,31 +151,33 @@
 
         @Override
         public void event(ClusterEvent event) {
-            switch (event.type()) {
-            case INSTANCE_DEACTIVATED:
-                NodeId id = event.subject().id();
-                log.info("Node {} deactivated", id);
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case INSTANCE_DEACTIVATED:
+                        NodeId id = event.subject().id();
+                        log.info("Node {} deactivated", id);
 
-                // DistributedLock is introduced to guarantee that minority nodes won't try to remove
-                // routes that originated from majority nodes.
-                // Adding 15 seconds retry for the leadership election to be completed.
-                asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
-                    if (result != null && result.isPresent()) {
-                        log.debug("Lock obtained. Put {} into removal queue", id);
-                        queue.addOne(id);
-                        asyncLock.unlock();
-                    } else {
-                        log.debug("Fail to obtain lock. Do not remove routes from {}", id);
-                    }
-                });
-                break;
-            case INSTANCE_ADDED:
-            case INSTANCE_REMOVED:
-            case INSTANCE_ACTIVATED:
-            case INSTANCE_READY:
-            default:
-                break;
-            }
+                        // DistributedLock is introduced to guarantee that minority nodes won't try to remove
+                        // routes that originated from majority nodes.
+                        // Adding 15 seconds retry for the leadership election to be completed.
+                        asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
+                            if (result != null && result.isPresent()) {
+                                log.debug("Lock obtained. Put {} into removal queue", id);
+                                queue.addOne(id);
+                                asyncLock.unlock();
+                            } else {
+                                log.debug("Fail to obtain lock. Do not remove routes from {}", id);
+                            }
+                        });
+                        break;
+                    case INSTANCE_ADDED:
+                    case INSTANCE_REMOVED:
+                    case INSTANCE_ACTIVATED:
+                    case INSTANCE_READY:
+                    default:
+                        break;
+                }
+            });
         }
     }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java
index f86e4a1..1092d8d 100644
--- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java
+++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java
@@ -117,12 +117,16 @@
     }
 
     private void pollGroups() {
-        deviceService.getAvailableDevices().forEach(device -> {
-            if (mastershipService.isLocalMaster(device.id()) &&
-                    device.is(GroupProgrammable.class)) {
-                pollDeviceGroups(device.id());
-            }
-        });
+        try {
+            deviceService.getAvailableDevices().forEach(device -> {
+                if (mastershipService.isLocalMaster(device.id()) &&
+                        device.is(GroupProgrammable.class)) {
+                    pollDeviceGroups(device.id());
+                }
+            });
+        } catch (Exception e) {
+            log.warn("Exception thrown while polling groups", e);
+        }
     }
 
     private void pollDeviceGroups(DeviceId deviceId) {
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 39ef3d9..363876c 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -508,26 +508,29 @@
 
         @Override
         public void event(DeviceEvent event) {
-            switch (event.type()) {
-                case DEVICE_REMOVED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                    DeviceId deviceId = event.subject().id();
-                    if (!deviceService.isAvailable(deviceId)) {
-                        BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class);
-                        //if purgeOnDisconnection is set for the device or it's a global configuration
-                        // lets remove the meters.
-                        boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ?
-                                cfg.purgeOnDisconnection() : purgeOnDisconnection;
-                        if (purge) {
-                            log.info("PurgeOnDisconnection is requested for device {}, " +
-                                             "removing meters", deviceId);
-                            store.purgeMeter(deviceId);
+            DeviceId deviceId = event.subject().id();
+            meterInstallers.execute(() -> {
+                switch (event.type()) {
+                    case DEVICE_REMOVED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                        if (!deviceService.isAvailable(deviceId)) {
+                            BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class);
+                            //if purgeOnDisconnection is set for the device or it's a global configuration
+                            // lets remove the meters.
+                            boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ?
+                                    cfg.purgeOnDisconnection() : purgeOnDisconnection;
+                            if (purge) {
+                                log.info("PurgeOnDisconnection is requested for device {}, " +
+                                        "removing meters", deviceId);
+                                store.purgeMeter(deviceId);
+                            }
                         }
-                    }
-                    break;
-                default:
-                    break;
-            }
+                        break;
+                    default:
+                        break;
+                }
+            }, deviceId.hashCode());
+
         }
     }
 
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
index d55ca7f..f36e7a8 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -52,9 +52,13 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
 /**
  * Entity that manages gNMI subscription for devices using OpenConfig models and
  * that reports relevant events to the core.
@@ -78,6 +82,8 @@
 
     private final Striped<Lock> deviceLocks = Striped.lock(30);
 
+    private ExecutorService eventExecutor;
+
     GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
                               MastershipService mastershipService,
                               DeviceProviderService providerService) {
@@ -88,6 +94,8 @@
     }
 
     public void activate() {
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
+                "onos/gnmi", "events-%d", log));
         deviceService.addListener(deviceEventListener);
         mastershipService.addListener(mastershipListener);
         gnmiController.addListener(gnmiEventListener);
@@ -100,6 +108,8 @@
         deviceService.removeListener(deviceEventListener);
         mastershipService.removeListener(mastershipListener);
         gnmiController.removeListener(gnmiEventListener);
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
     }
 
     private void checkSubscription(DeviceId deviceId) {
@@ -248,19 +258,21 @@
 
         @Override
         public void event(GnmiEvent event) {
-            if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
-                log.warn("Received gNMI event from {}, but we did'nt expect to " +
-                                 "be subscribed to it! Discarding event...",
-                         event.subject().deviceId());
-                return;
-            }
+            eventExecutor.execute(() -> {
+                if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
+                    log.warn("Received gNMI event from {}, but we did'nt expect to " +
+                                    "be subscribed to it! Discarding event...",
+                            event.subject().deviceId());
+                    return;
+                }
 
-            log.debug("Received gNMI event {}", event.toString());
-            if (event.type() == GnmiEvent.Type.UPDATE) {
-                handleGnmiUpdate((GnmiUpdate) event.subject());
-            } else {
-                log.debug("Unsupported gNMI event type: {}", event.type());
-            }
+                log.debug("Received gNMI event {}", event.toString());
+                if (event.type() == GnmiEvent.Type.UPDATE) {
+                    handleGnmiUpdate((GnmiUpdate) event.subject());
+                } else {
+                    log.debug("Unsupported gNMI event type: {}", event.type());
+                }
+            });
         }
     }
 
@@ -268,7 +280,7 @@
 
         @Override
         public void event(MastershipEvent event) {
-            checkSubscription(event.subject());
+            eventExecutor.execute(() -> checkSubscription(event.subject()));
         }
     }
 
@@ -276,18 +288,20 @@
 
         @Override
         public void event(DeviceEvent event) {
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                case DEVICE_UPDATED:
-                case DEVICE_REMOVED:
-                case PORT_ADDED:
-                case PORT_REMOVED:
-                    checkSubscription(event.subject().id());
-                    break;
-                default:
-                    break;
-            }
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case DEVICE_ADDED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                    case DEVICE_UPDATED:
+                    case DEVICE_REMOVED:
+                    case PORT_ADDED:
+                    case PORT_REMOVED:
+                        checkSubscription(event.subject().id());
+                        break;
+                    default:
+                        break;
+                }
+            });
         }
     }
 }