ONOS-2043: Move device event handling in PacketManager off of event loop thread
Change-Id: Ia8b12e6ec3e732f0311adc7b3e7e63d07ad117e0
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index 26c4102..28f1df0 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -58,9 +58,12 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.security.AppGuard.checkPermission;
@@ -92,6 +95,8 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private FlowObjectiveService objectiveService;
+ private ExecutorService eventHandlingExecutor;
+
private final DeviceListener deviceListener = new InternalDeviceListener();
private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
@@ -100,6 +105,8 @@
@Activate
public void activate() {
+ eventHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/net/packet", "event-handler"));
appId = coreService.getAppId(CoreService.CORE_APP_NAME);
store.setDelegate(delegate);
deviceService.addListener(deviceListener);
@@ -111,6 +118,7 @@
public void deactivate() {
store.unsetDelegate(delegate);
deviceService.removeListener(deviceListener);
+ eventHandlingExecutor.shutdown();
log.info("Stopped");
}
@@ -277,19 +285,25 @@
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
- Device device = event.subject();
- switch (event.type()) {
- case DEVICE_ADDED:
- case DEVICE_AVAILABILITY_CHANGED:
- if (deviceService.isAvailable(event.subject().id())) {
- for (PacketRequest request : store.existingRequests()) {
- pushRule(device, request);
+ eventHandlingExecutor.execute(() -> {
+ try {
+ Device device = event.subject();
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ if (deviceService.isAvailable(event.subject().id())) {
+ for (PacketRequest request : store.existingRequests()) {
+ pushRule(device, request);
+ }
}
+ break;
+ default:
+ break;
}
- break;
- default:
- break;
- }
+ } catch (Exception e) {
+ log.warn("Failed to process {}", event, e);
+ }
+ });
}
}