Moving the device event handling to separate thread
Change-Id: Idfffee1ef46fda4248839854fa2d87f3db6008bb
diff --git a/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java b/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
index 2b404d9..90c58b7 100644
--- a/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
+++ b/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
@@ -27,6 +27,7 @@
import java.util.stream.Stream;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -94,13 +95,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.groupedThreads;
-
-
-import com.google.common.collect.ImmutableSet;
-
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
/**
@@ -207,6 +206,7 @@
protected boolean dhcpFpmEnabled = false;
private ScheduledExecutorService timerExecutor;
+ protected ExecutorService devEventExecutor;
protected DeviceListener deviceListener = new InternalDeviceListener();
private DhcpRelayPacketProcessor dhcpRelayPacketProcessor = new DhcpRelayPacketProcessor();
@@ -243,6 +243,9 @@
dhcpPollInterval,
TimeUnit.SECONDS);
+ devEventExecutor = newSingleThreadScheduledExecutor(
+ groupedThreads("onos/dhcprelay-dev-events", "events-%d", log));
+
modified(context);
// Enable distribute route store
@@ -266,6 +269,8 @@
compCfgService.unregisterProperties(getClass(), false);
deviceService.removeListener(deviceListener);
timerExecutor.shutdown();
+ devEventExecutor.shutdownNow();
+ devEventExecutor = null;
log.info("DHCP-RELAY Stopped");
}
@@ -619,16 +624,19 @@
@Override
public void event(DeviceEvent event) {
+ if (devEventExecutor != null) {
Device device = event.subject();
switch (event.type()) {
case DEVICE_ADDED:
- updateIgnoreVlanConfigs();
+ devEventExecutor.execute(this::updateIgnoreVlanConfigs);
break;
case DEVICE_AVAILABILITY_CHANGED:
- deviceAvailabilityChanged(device);
+ devEventExecutor.execute(() -> deviceAvailabilityChanged(device));
+ break;
default:
break;
}
+ }
}
private void deviceAvailabilityChanged(Device device) {
diff --git a/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java b/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
index 9d86606..ba943f9 100644
--- a/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
+++ b/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
@@ -124,9 +124,11 @@
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
+import static org.onlab.junit.TestTools.assertAfter;
import static org.onosproject.dhcprelay.DhcpRelayManager.DHCP_RELAY_APP;
public class DhcpRelayManagerTest {
+ private static final int EVENT_PROCESSING_MS = 1000;
private static final short VLAN_LEN = 2;
private static final short SEPARATOR_LEN = 1;
private static final String CONFIG_FILE_PATH = "dhcp-relay.json";
@@ -634,10 +636,16 @@
flowObjectiveService.apply(eq(DEV_1_ID), capture(capturedFromDev1));
expectLastCall().times(DHCP_SELECTORS.size());
replay(manager.cfgService, flowObjectiveService, manager.deviceService, device);
+
manager.deviceListener.event(event);
+
+ // Wait until all flow objective events are captured before triggering onSuccess
+ int expectFlowObjCount = Dhcp4HandlerImpl.DHCP_SELECTORS.size() + Dhcp6HandlerImpl.DHCP_SELECTORS.size();
+ assertAfter(EVENT_PROCESSING_MS, () -> assertEquals(expectFlowObjCount, capturedFromDev1.getValues().size()));
capturedFromDev1.getValues().forEach(obj -> obj.context().ifPresent(ctx -> ctx.onSuccess(obj)));
- assertEquals(1, v4Handler.ignoredVlans.size());
- assertEquals(1, v6Handler.ignoredVlans.size());
+
+ assertAfter(EVENT_PROCESSING_MS, () -> assertEquals(1, v4Handler.ignoredVlans.size()));
+ assertAfter(EVENT_PROCESSING_MS, () -> assertEquals(1, v6Handler.ignoredVlans.size()));
}
/**
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 4567415..462762a 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -72,6 +72,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.AnnotationKeys.DRIVER;
import static org.onosproject.security.AppGuard.checkPermission;
@@ -149,12 +150,15 @@
private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
ExecutorService executorService;
+ protected ExecutorService devEventExecutor;
@Activate
protected void activate() {
cfgService.registerProperties(getClass());
executorService = newFixedThreadPool(numThreads,
groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ devEventExecutor = newSingleThreadScheduledExecutor(
+ groupedThreads("onos/flowobj-dev-events", "events-%d", log));
flowObjectiveStore.setDelegate(delegate);
deviceService.addListener(deviceListener);
driverService.addListener(driverListener);
@@ -168,6 +172,8 @@
deviceService.removeListener(deviceListener);
driverService.removeListener(driverListener);
executorService.shutdown();
+ devEventExecutor.shutdownNow();
+ devEventExecutor = null;
pipeliners.clear();
driverHandlers.clear();
nextToDevice.clear();
@@ -434,22 +440,25 @@
private class InnerDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
+ if (devEventExecutor != null) {
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
log.debug("Device either added or availability changed {}",
event.subject().id());
- if (deviceService.isAvailable(event.subject().id())) {
+ devEventExecutor.execute(() -> {
+ if (deviceService.isAvailable(event.subject().id())) {
log.debug("Device is now available {}", event.subject().id());
getAndInitDevicePipeliner(event.subject().id());
- } else {
+ } else {
log.debug("Device is no longer available {}", event.subject().id());
- }
+ }
+ });
break;
case DEVICE_UPDATED:
// Invalidate pipeliner and handler caches if the driver name
// device annotation changed.
- invalidatePipelinerIfNecessary(event.subject());
+ devEventExecutor.execute(() -> invalidatePipelinerIfNecessary(event.subject()));
break;
case DEVICE_REMOVED:
// evict Pipeliner and Handler cache, when
@@ -458,8 +467,10 @@
// System expect the user to clear all existing flows,
// before removing device, especially if they intend to
// replace driver/pipeliner assigned to the device.
- driverHandlers.remove(event.subject().id());
- pipeliners.remove(event.subject().id());
+ devEventExecutor.execute(() -> {
+ driverHandlers.remove(event.subject().id());
+ pipeliners.remove(event.subject().id());
+ });
break;
case DEVICE_SUSPENDED:
break;
@@ -472,6 +483,7 @@
default:
break;
}
+ }
}
}