[SDFAB-20] Prevent listeners ejection and the stop of the group polling

Offload listeners processing to external executors to prevent
the listener ejection due to time consuming processing

In future, we may want to extend the same fix to the
HostManager and NetworkConfigHostProvider

Additionally, avoid the propagation of the exceptions in GroupDriverProvider
which leads to the cancellation of the peridioc poll task

Change-Id: I8ea4ec9fda1ccc48bbd3855fd443ee8760cbbb60
diff --git a/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java b/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java
index ffb6b4f..bf7dc18 100644
--- a/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java
+++ b/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java
@@ -72,6 +72,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -79,6 +80,8 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -152,6 +155,8 @@
                 }
             };
 
+    protected ExecutorService eventExecutor;
+
     @Activate
     public void activate() {
 
@@ -206,6 +211,10 @@
         // Bootstrap config for already existing devices.
         triggerAllDeviceConfigure();
 
+        // Bootstrap core event executor before adding listener
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
+                "onos/int", "events-%d", log));
+
         hostService.addListener(hostListener);
         deviceService.addListener(deviceListener);
 
@@ -256,6 +265,8 @@
         deviceService.getDevices().forEach(d -> cleanupDevice(d.id()));
         netcfgService.removeListener(appConfigListener);
         netcfgRegistry.unregisterConfigFactory(intAppConfigFactory);
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
         log.info("Deactivated");
     }
 
@@ -503,30 +514,34 @@
     private class InternalHostListener implements HostListener {
         @Override
         public void event(HostEvent event) {
-            final DeviceId deviceId = event.subject().location().deviceId();
-            triggerDeviceConfigure(deviceId);
+            eventExecutor.execute(() -> {
+                final DeviceId deviceId = event.subject().location().deviceId();
+                triggerDeviceConfigure(deviceId);
+            });
         }
     }
 
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                case DEVICE_UPDATED:
-                case DEVICE_REMOVED:
-                case DEVICE_SUSPENDED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                case PORT_ADDED:
-                case PORT_UPDATED:
-                case PORT_REMOVED:
-                    triggerDeviceConfigure(event.subject().id());
-                    return;
-                case PORT_STATS_UPDATED:
-                    return;
-                default:
-                    log.warn("Unknown device event type {}", event.type());
-            }
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case DEVICE_ADDED:
+                    case DEVICE_UPDATED:
+                    case DEVICE_REMOVED:
+                    case DEVICE_SUSPENDED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                    case PORT_ADDED:
+                    case PORT_UPDATED:
+                    case PORT_REMOVED:
+                        triggerDeviceConfigure(event.subject().id());
+                        return;
+                    case PORT_STATS_UPDATED:
+                        return;
+                    default:
+                        log.warn("Unknown device event type {}", event.type());
+                }
+            });
         }
     }
 
@@ -580,56 +595,56 @@
 
         @Override
         public void event(NetworkConfigEvent event) {
-            switch (event.type()) {
-                case CONFIG_ADDED:
-                case CONFIG_UPDATED:
-                    event.config()
-                            .map(config -> (IntReportConfig) config)
-                            .ifPresent(config -> {
-                                IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder()
-                                        .withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs())
-                                        .withCollectorPort(config.collectorPort())
-                                        .withCollectorIp(config.collectorIp())
-                                        .enabled(true)
-                                        .build();
-                                setConfig(intDeviceConfig);
+            eventExecutor.execute(() -> {
+                if (event.configClass() == IntReportConfig.class) {
+                    switch (event.type()) {
+                        case CONFIG_ADDED:
+                        case CONFIG_UPDATED:
+                            event.config()
+                                    .map(config -> (IntReportConfig) config)
+                                    .ifPresent(config -> {
+                                        IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder()
+                                                .withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs())
+                                                .withCollectorPort(config.collectorPort())
+                                                .withCollectorIp(config.collectorIp())
+                                                .enabled(true)
+                                                .build();
+                                        setConfig(intDeviceConfig);
 
-                                // For each watched subnet, we install two INT rules.
-                                // One match on the source, another match on the destination.
-                                intentMap.clear();
-                                config.watchSubnets().forEach(subnet -> {
-                                    IntIntent.Builder intIntentBuilder = IntIntent.builder()
-                                            .withReportType(IntIntent.IntReportType.TRACKED_FLOW)
-                                            .withReportType(IntIntent.IntReportType.DROPPED_PACKET)
-                                            .withReportType(IntIntent.IntReportType.CONGESTED_QUEUE)
-                                            .withTelemetryMode(IntIntent.TelemetryMode.POSTCARD);
-                                    if (subnet.prefixLength() == 0) {
-                                        // Special case, match any packet
-                                        installIntIntent(intIntentBuilder
-                                                .withSelector(DefaultTrafficSelector.emptySelector())
-                                                .build());
-                                    } else {
-                                        TrafficSelector selector = DefaultTrafficSelector.builder()
-                                                .matchIPSrc(subnet)
-                                                .build();
-                                        installIntIntent(intIntentBuilder.withSelector(selector).build());
-                                        selector = DefaultTrafficSelector.builder()
-                                                .matchIPDst(subnet)
-                                                .build();
-                                        installIntIntent(intIntentBuilder.withSelector(selector).build());
-                                    }
-                                });
-                            });
-                    break;
-                // TODO: Support removing INT config.
-                default:
-                    break;
-            }
+                                        // For each watched subnet, we install two INT rules.
+                                        // One match on the source, another match on the destination.
+                                        intentMap.clear();
+                                        config.watchSubnets().forEach(subnet -> {
+                                            IntIntent.Builder intIntentBuilder = IntIntent.builder()
+                                                    .withReportType(IntIntent.IntReportType.TRACKED_FLOW)
+                                                    .withReportType(IntIntent.IntReportType.DROPPED_PACKET)
+                                                    .withReportType(IntIntent.IntReportType.CONGESTED_QUEUE)
+                                                    .withTelemetryMode(IntIntent.TelemetryMode.POSTCARD);
+                                            if (subnet.prefixLength() == 0) {
+                                                // Special case, match any packet
+                                                installIntIntent(intIntentBuilder
+                                                        .withSelector(DefaultTrafficSelector.emptySelector())
+                                                        .build());
+                                            } else {
+                                                TrafficSelector selector = DefaultTrafficSelector.builder()
+                                                        .matchIPSrc(subnet)
+                                                        .build();
+                                                installIntIntent(intIntentBuilder.withSelector(selector).build());
+                                                selector = DefaultTrafficSelector.builder()
+                                                        .matchIPDst(subnet)
+                                                        .build();
+                                                installIntIntent(intIntentBuilder.withSelector(selector).build());
+                                            }
+                                        });
+                                    });
+                            break;
+                        // TODO: Support removing INT config.
+                        default:
+                            break;
+                    }
+                }
+            });
         }
 
-        @Override
-        public boolean isRelevant(NetworkConfigEvent event) {
-            return event.configClass() == IntReportConfig.class;
-        }
     }
 }
diff --git a/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java b/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java
index 8519ed5..7fddf3c 100644
--- a/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java
+++ b/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.After;
@@ -81,6 +82,7 @@
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.onlab.junit.TestTools.assertAfter;
 import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.POSTCARD;
 import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SINK;
 import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SOURCE;
@@ -151,6 +153,7 @@
         manager.hostService = hostService;
         manager.netcfgService = networkConfigService;
         manager.netcfgRegistry = networkConfigRegistry;
+        manager.eventExecutor = MoreExecutors.newDirectExecutorService();
 
         expect(coreService.registerApplication(APP_NAME))
                 .andReturn(APP_ID).anyTimes();
@@ -226,7 +229,7 @@
 
         // The INT intent installation order can be random, so we need to collect
         // all expected INT intents and check if actual intent exists.
-        assertEquals(5, intentMap.size());
+        assertAfter(50, 100, () -> assertEquals(5, intentMap.size()));
         intentMap.entrySet().forEach(entry -> {
             IntIntent actualIntIntent = entry.getValue().value();
             assertTrue(expectedIntIntents.contains(actualIntIntent));
diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
index ede60db..5ae2628 100644
--- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
+++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java
@@ -35,6 +35,7 @@
 
 import java.time.Duration;
 import java.util.Collection;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
@@ -65,6 +66,9 @@
     private final ScheduledExecutorService reaperExecutor =
             newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log));
 
+    private final ExecutorService eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
+            "onos/routemonitor", "events-%d", log));
+
     /**
      * Creates a new route monitor.
      *
@@ -94,6 +98,8 @@
     public void shutdown() {
         stopProcessing();
         clusterService.removeListener(clusterListener);
+        eventExecutor.shutdownNow();
+        reaperExecutor.shutdownNow();
         asyncLock.unlock();
     }
 
@@ -145,31 +151,33 @@
 
         @Override
         public void event(ClusterEvent event) {
-            switch (event.type()) {
-            case INSTANCE_DEACTIVATED:
-                NodeId id = event.subject().id();
-                log.info("Node {} deactivated", id);
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case INSTANCE_DEACTIVATED:
+                        NodeId id = event.subject().id();
+                        log.info("Node {} deactivated", id);
 
-                // DistributedLock is introduced to guarantee that minority nodes won't try to remove
-                // routes that originated from majority nodes.
-                // Adding 15 seconds retry for the leadership election to be completed.
-                asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
-                    if (result != null && result.isPresent()) {
-                        log.debug("Lock obtained. Put {} into removal queue", id);
-                        queue.addOne(id);
-                        asyncLock.unlock();
-                    } else {
-                        log.debug("Fail to obtain lock. Do not remove routes from {}", id);
-                    }
-                });
-                break;
-            case INSTANCE_ADDED:
-            case INSTANCE_REMOVED:
-            case INSTANCE_ACTIVATED:
-            case INSTANCE_READY:
-            default:
-                break;
-            }
+                        // DistributedLock is introduced to guarantee that minority nodes won't try to remove
+                        // routes that originated from majority nodes.
+                        // Adding 15 seconds retry for the leadership election to be completed.
+                        asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
+                            if (result != null && result.isPresent()) {
+                                log.debug("Lock obtained. Put {} into removal queue", id);
+                                queue.addOne(id);
+                                asyncLock.unlock();
+                            } else {
+                                log.debug("Fail to obtain lock. Do not remove routes from {}", id);
+                            }
+                        });
+                        break;
+                    case INSTANCE_ADDED:
+                    case INSTANCE_REMOVED:
+                    case INSTANCE_ACTIVATED:
+                    case INSTANCE_READY:
+                    default:
+                        break;
+                }
+            });
         }
     }
 }