Fix ONOS-4683 - Don't process device events on the listener thread

Change-Id: Icc465311c2c047dba11bacc69c745bbda55ea714
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
index 69449b3..d905e13 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
@@ -15,9 +15,16 @@
  */
 package org.onosproject.provider.lldp.impl;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import java.util.Dictionary;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -26,7 +33,6 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
-import org.onlab.util.SharedExecutors;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterService;
@@ -51,29 +57,24 @@
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.link.DefaultLinkDescription;
-import org.onosproject.net.link.ProbedLinkProvider;
 import org.onosproject.net.link.LinkProviderRegistry;
 import org.onosproject.net.link.LinkProviderService;
 import org.onosproject.net.link.LinkService;
+import org.onosproject.net.link.ProbedLinkProvider;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
-import org.onosproject.provider.lldpcommon.LinkDiscoveryContext;
 import org.onosproject.provider.lldpcommon.LinkDiscovery;
+import org.onosproject.provider.lldpcommon.LinkDiscoveryContext;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
-import java.util.Dictionary;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -141,6 +142,7 @@
     private LinkProviderService providerService;
 
     private ScheduledExecutorService executor;
+    protected ExecutorService eventExecutor;
 
     private boolean shuttingDown = false;
 
@@ -240,6 +242,7 @@
 
     @Activate
     public void activate(ComponentContext context) {
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/linkevents", "events-%d", log));
         shuttingDown = false;
         cfgService.registerProperties(getClass());
         appId = coreService.registerApplication(PROVIDER_NAME);
@@ -274,6 +277,8 @@
 
         cfgService.unregisterProperties(getClass(), false);
         disable();
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
         log.info("Stopped");
     }
 
@@ -542,27 +547,30 @@
                 return;
             }
 
-            DeviceId deviceId = event.subject();
-            Device device = deviceService.getDevice(deviceId);
-            if (device == null) {
-                log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
-                return;
-            }
-            if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
-                updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
-            }
+            eventExecutor.execute(() -> {
+                DeviceId deviceId = event.subject();
+                Device device = deviceService.getDevice(deviceId);
+                if (device == null) {
+                    log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
+                    return;
+                }
+                if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
+                    updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
+                }
+            });
         }
     }
 
-    /**
-     * Processes device events.
-     */
-    private class InternalDeviceListener implements DeviceListener {
+    private class DeviceEventProcessor implements Runnable {
+
+        DeviceEvent event;
+
+        DeviceEventProcessor(DeviceEvent event) {
+            this.event = event;
+        }
+
         @Override
-        public void event(DeviceEvent event) {
-            if (event.type() == Type.PORT_STATS_UPDATED) {
-                return;
-            }
+        public void run() {
             Device device = event.subject();
             Port port = event.port();
             if (device == null) {
@@ -618,6 +626,22 @@
     }
 
     /**
+     * Processes device events.
+     */
+    private class InternalDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            if (event.type() == Type.PORT_STATS_UPDATED) {
+                return;
+            }
+
+            Runnable deviceEventProcessor = new DeviceEventProcessor(event);
+
+            eventExecutor.execute(deviceEventProcessor);
+        }
+    }
+
+    /**
      * Processes incoming packets.
      */
     private class InternalPacketProcessor implements PacketProcessor {
@@ -774,7 +798,7 @@
 
         @Override
         public void event(NetworkConfigEvent event) {
-            SharedExecutors.getPoolThreadExecutor().execute(() -> {
+            eventExecutor.execute(() -> {
                 if (event.configClass() == LinkDiscoveryFromDevice.class &&
                         CONFIG_CHANGED.contains(event.type())) {
 
diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
index fb1bf67..811baac 100644
--- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
@@ -15,11 +15,15 @@
  */
 package org.onosproject.provider.lldp.impl;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -66,14 +70,12 @@
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.provider.lldpcommon.LinkDiscovery;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
@@ -146,6 +148,8 @@
 
         provider.activate(null);
 
+        provider.eventExecutor = MoreExecutors.newDirectExecutorService();
+
         providerService = linkRegistry.registeredProvider();
     }
 
diff --git a/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java b/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java
index f108cd4..1937e07 100644
--- a/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java
+++ b/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java
@@ -255,6 +255,9 @@
     }
 
     private void sendProbes(Long portNumber) {
+        if (context.packetService() == null) {
+            return;
+        }
         log.trace("Sending probes out to {}@{}", portNumber, device.id());
         OutboundPacket pkt = createOutBoundLldp(portNumber);
         context.packetService().emit(pkt);