Fix ONOS-4683 - Don't process device events on the listener thread
Change-Id: Icc465311c2c047dba11bacc69c745bbda55ea714
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
index 246cf4c..bd81a99 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
@@ -15,9 +15,16 @@
*/
package org.onosproject.provider.lldp.impl;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import java.util.Dictionary;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -26,7 +33,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.Ethernet;
-import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterMetadataService;
@@ -52,30 +58,25 @@
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.link.DefaultLinkDescription;
-import org.onosproject.net.link.ProbedLinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.link.LinkService;
+import org.onosproject.net.link.ProbedLinkProvider;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
-import org.onosproject.provider.lldpcommon.LinkDiscoveryContext;
import org.onosproject.provider.lldpcommon.LinkDiscovery;
+import org.onosproject.provider.lldpcommon.LinkDiscoveryContext;
import org.onosproject.store.service.ConsistentMapException;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
-import java.util.Dictionary;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -143,6 +144,7 @@
private LinkProviderService providerService;
private ScheduledExecutorService executor;
+ protected ExecutorService eventExecutor;
private boolean shuttingDown = false;
@@ -242,6 +244,7 @@
@Activate
public void activate(ComponentContext context) {
+ eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/linkevents", "events-%d", log));
shuttingDown = false;
cfgService.registerProperties(getClass());
appId = coreService.registerApplication(PROVIDER_NAME);
@@ -280,6 +283,8 @@
cfgService.unregisterProperties(getClass(), false);
disable();
+ eventExecutor.shutdownNow();
+ eventExecutor = null;
log.info("Stopped");
}
@@ -548,27 +553,30 @@
return;
}
- DeviceId deviceId = event.subject();
- Device device = deviceService.getDevice(deviceId);
- if (device == null) {
- log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
- return;
- }
- if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
- updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
- }
+ eventExecutor.execute(() -> {
+ DeviceId deviceId = event.subject();
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null) {
+ log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
+ return;
+ }
+ if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
+ updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
+ }
+ });
}
}
- /**
- * Processes device events.
- */
- private class InternalDeviceListener implements DeviceListener {
+ private class DeviceEventProcessor implements Runnable {
+
+ DeviceEvent event;
+
+ DeviceEventProcessor(DeviceEvent event) {
+ this.event = event;
+ }
+
@Override
- public void event(DeviceEvent event) {
- if (event.type() == Type.PORT_STATS_UPDATED) {
- return;
- }
+ public void run() {
Device device = event.subject();
Port port = event.port();
if (device == null) {
@@ -624,6 +632,22 @@
}
/**
+ * Processes device events.
+ */
+ private class InternalDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ if (event.type() == Type.PORT_STATS_UPDATED) {
+ return;
+ }
+
+ Runnable deviceEventProcessor = new DeviceEventProcessor(event);
+
+ eventExecutor.execute(deviceEventProcessor);
+ }
+ }
+
+ /**
* Processes incoming packets.
*/
private class InternalPacketProcessor implements PacketProcessor {
@@ -780,7 +804,7 @@
@Override
public void event(NetworkConfigEvent event) {
- SharedExecutors.getPoolThreadExecutor().execute(() -> {
+ eventExecutor.execute(() -> {
if (event.configClass() == LinkDiscoveryFromDevice.class &&
CONFIG_CHANGED.contains(event.type())) {
diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
index b911879..bf60024 100644
--- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
@@ -75,6 +75,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
@@ -147,6 +148,8 @@
provider.activate(null);
+ provider.eventExecutor = MoreExecutors.newDirectExecutorService();
+
providerService = linkRegistry.registeredProvider();
}
diff --git a/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java b/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java
index 9d021e4..08d20bb 100644
--- a/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java
+++ b/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java
@@ -255,6 +255,9 @@
}
private void sendProbes(Long portNumber) {
+ if (context.packetService() == null) {
+ return;
+ }
log.trace("Sending probes out to {}@{}", portNumber, device.id());
OutboundPacket pkt = createOutBoundLldp(portNumber);
context.packetService().emit(pkt);