Moving the device event handling to separate thread
Change-Id: Idfffee1ef46fda4248839854fa2d87f3db6008bb
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 45e2e95..2368f25 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -70,6 +70,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.AnnotationKeys.DRIVER;
import static org.onosproject.net.OsgiPropertyConstants.FOM_NUM_THREADS;
@@ -150,12 +151,15 @@
private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
ExecutorService executorService;
+ protected ExecutorService devEventExecutor;
@Activate
protected void activate() {
cfgService.registerProperties(FlowObjectiveManager.class);
executorService = newFixedThreadPool(numThreads,
groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ devEventExecutor = newSingleThreadScheduledExecutor(
+ groupedThreads("onos/flowobj-dev-events", "events-%d", log));
flowObjectiveStore.setDelegate(delegate);
deviceService.addListener(deviceListener);
driverService.addListener(driverListener);
@@ -169,6 +173,8 @@
deviceService.removeListener(deviceListener);
driverService.removeListener(driverListener);
executorService.shutdown();
+ devEventExecutor.shutdownNow();
+ devEventExecutor = null;
pipeliners.clear();
driverHandlers.clear();
nextToDevice.clear();
@@ -435,22 +441,25 @@
private class InnerDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
+ if (devEventExecutor != null) {
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
log.debug("Device either added or availability changed {}",
event.subject().id());
- if (deviceService.isAvailable(event.subject().id())) {
+ devEventExecutor.execute(() -> {
+ if (deviceService.isAvailable(event.subject().id())) {
log.debug("Device is now available {}", event.subject().id());
getAndInitDevicePipeliner(event.subject().id());
- } else {
+ } else {
log.debug("Device is no longer available {}", event.subject().id());
- }
+ }
+ });
break;
case DEVICE_UPDATED:
// Invalidate pipeliner and handler caches if the driver name
// device annotation changed.
- invalidatePipelinerIfNecessary(event.subject());
+ devEventExecutor.execute(() -> invalidatePipelinerIfNecessary(event.subject()));
break;
case DEVICE_REMOVED:
// evict Pipeliner and Handler cache, when
@@ -459,8 +468,10 @@
// System expect the user to clear all existing flows,
// before removing device, especially if they intend to
// replace driver/pipeliner assigned to the device.
- driverHandlers.remove(event.subject().id());
- pipeliners.remove(event.subject().id());
+ devEventExecutor.execute(() -> {
+ driverHandlers.remove(event.subject().id());
+ pipeliners.remove(event.subject().id());
+ });
break;
case DEVICE_SUSPENDED:
break;
@@ -473,6 +484,7 @@
default:
break;
}
+ }
}
}