Avoid blocking event thread in ResourceDeviceListener
Change-Id: Ia17721ce3a435440f9383e55757ffb90ec555e9a
diff --git a/core/net/src/main/java/org/onosproject/net/resource/impl/ResourceDeviceListener.java b/core/net/src/main/java/org/onosproject/net/resource/impl/ResourceDeviceListener.java
index f4f4f1f..c8b5ba5 100644
--- a/core/net/src/main/java/org/onosproject/net/resource/impl/ResourceDeviceListener.java
+++ b/core/net/src/main/java/org/onosproject/net/resource/impl/ResourceDeviceListener.java
@@ -104,128 +104,122 @@
@Override
public void event(DeviceEvent event) {
- Device device = event.subject();
- // registration happens only when the caller is the master of the device
- if (!mastershipService.isLocalMaster(device.id())) {
- return;
- }
-
- switch (event.type()) {
- case DEVICE_ADDED:
- registerDeviceResource(device);
- break;
- case DEVICE_REMOVED:
- unregisterDeviceResource(device);
- break;
- case DEVICE_AVAILABILITY_CHANGED:
- if (deviceService.isAvailable(device.id())) {
- registerDeviceResource(device);
- // TODO: do we need to walk the ports?
- } else {
- unregisterDeviceResource(device);
- }
- break;
- case PORT_ADDED:
- case PORT_UPDATED:
- if (event.port().isEnabled()) {
- registerPortResource(device, event.port());
- } else {
- unregisterPortResource(device, event.port());
- }
- break;
- case PORT_REMOVED:
- unregisterPortResource(device, event.port());
- break;
- default:
- break;
- }
- }
-
- private void registerDeviceResource(Device device) {
executor.execute(() -> {
- boolean success = adminService.register(Resources.discrete(device.id()).resource());
- if (!success) {
- log.error("Failed to register Device: {}", device.id());
+ Device device = event.subject();
+ // registration happens only when the caller is the master of the device
+ if (!mastershipService.isLocalMaster(device.id())) {
+ return;
+ }
+
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ registerDeviceResource(device);
+ break;
+ case DEVICE_REMOVED:
+ unregisterDeviceResource(device);
+ break;
+ case DEVICE_AVAILABILITY_CHANGED:
+ if (deviceService.isAvailable(device.id())) {
+ registerDeviceResource(device);
+ // TODO: do we need to walk the ports?
+ } else {
+ unregisterDeviceResource(device);
+ }
+ break;
+ case PORT_ADDED:
+ case PORT_UPDATED:
+ if (event.port().isEnabled()) {
+ registerPortResource(device, event.port());
+ } else {
+ unregisterPortResource(device, event.port());
+ }
+ break;
+ case PORT_REMOVED:
+ unregisterPortResource(device, event.port());
+ break;
+ default:
+ break;
}
});
}
+ private void registerDeviceResource(Device device) {
+ boolean success = adminService.register(Resources.discrete(device.id()).resource());
+ if (!success) {
+ log.error("Failed to register Device: {}", device.id());
+ }
+ }
+
private void unregisterDeviceResource(Device device) {
- executor.execute(() -> {
- DiscreteResource devResource = Resources.discrete(device.id()).resource();
- List<Resource> allResources = getDescendantResources(devResource);
- adminService.unregister(Lists.transform(allResources, Resource::id));
- });
+ DiscreteResource devResource = Resources.discrete(device.id()).resource();
+ List<Resource> allResources = getDescendantResources(devResource);
+ adminService.unregister(Lists.transform(allResources, Resource::id));
}
private void registerPortResource(Device device, Port port) {
Resource portPath = Resources.discrete(device.id(), port.number()).resource();
- executor.execute(() -> {
- if (!adminService.register(portPath)) {
- log.error("Failed to register Port: {}", portPath.id());
- }
+ if (!adminService.register(portPath)) {
+ log.error("Failed to register Port: {}", portPath.id());
+ }
- queryBandwidth(device.id(), port.number())
- .map(bw -> portPath.child(Bandwidth.class, bw.bps()))
- .map(adminService::register)
- .ifPresent(success -> {
- if (!success) {
- log.error("Failed to register Bandwidth for {}", portPath.id());
- }
- });
-
- // for VLAN IDs
- Set<VlanId> vlans = queryVlanIds(device.id(), port.number());
- if (!vlans.isEmpty()) {
- boolean success = adminService.register(vlans.stream()
- .map(portPath::child)
- .collect(Collectors.toList()));
+ queryBandwidth(device.id(), port.number())
+ .map(bw -> portPath.child(Bandwidth.class, bw.bps()))
+ .map(adminService::register)
+ .ifPresent(success -> {
if (!success) {
- log.error("Failed to register VLAN IDs for {}", portPath.id());
+ log.error("Failed to register Bandwidth for {}", portPath.id());
}
- }
+ });
- // for MPLS labels
- Set<MplsLabel> mplsLabels = queryMplsLabels(device.id(), port.number());
- if (!mplsLabels.isEmpty()) {
- boolean success = adminService.register(mplsLabels.stream()
- .map(portPath::child)
- .collect(Collectors.toList()));
- if (!success) {
- log.error("Failed to register MPLS Labels for {}", portPath.id());
- }
+ // for VLAN IDs
+ Set<VlanId> vlans = queryVlanIds(device.id(), port.number());
+ if (!vlans.isEmpty()) {
+ boolean success = adminService.register(vlans.stream()
+ .map(portPath::child)
+ .collect(Collectors.toList()));
+ if (!success) {
+ log.error("Failed to register VLAN IDs for {}", portPath.id());
}
+ }
- // for Lambdas
- Set<OchSignal> lambdas = queryLambdas(device.id(), port.number());
- if (!lambdas.isEmpty()) {
- boolean success = adminService.register(lambdas.stream()
- .map(portPath::child)
- .collect(Collectors.toList()));
- if (!success) {
- log.error("Failed to register lambdas for {}", portPath.id());
- }
+ // for MPLS labels
+ Set<MplsLabel> mplsLabels = queryMplsLabels(device.id(), port.number());
+ if (!mplsLabels.isEmpty()) {
+ boolean success = adminService.register(mplsLabels.stream()
+ .map(portPath::child)
+ .collect(Collectors.toList()));
+ if (!success) {
+ log.error("Failed to register MPLS Labels for {}", portPath.id());
}
+ }
- // for Tributary slots
- Set<TributarySlot> tSlots = queryTributarySlots(device.id(), port.number());
- if (!tSlots.isEmpty()) {
- boolean success = adminService.register(tSlots.stream()
- .map(portPath::child)
- .collect(Collectors.toList()));
- if (!success) {
- log.error("Failed to register tributary slots for {}", portPath.id());
- }
+ // for Lambdas
+ Set<OchSignal> lambdas = queryLambdas(device.id(), port.number());
+ if (!lambdas.isEmpty()) {
+ boolean success = adminService.register(lambdas.stream()
+ .map(portPath::child)
+ .collect(Collectors.toList()));
+ if (!success) {
+ log.error("Failed to register lambdas for {}", portPath.id());
}
- });
+ }
+
+ // for Tributary slots
+ Set<TributarySlot> tSlots = queryTributarySlots(device.id(), port.number());
+ if (!tSlots.isEmpty()) {
+ boolean success = adminService.register(tSlots.stream()
+ .map(portPath::child)
+ .collect(Collectors.toList()));
+ if (!success) {
+ log.error("Failed to register tributary slots for {}", portPath.id());
+ }
+ }
}
private void unregisterPortResource(Device device, Port port) {
- executor.execute(() -> {
- DiscreteResource portResource = Resources.discrete(device.id(), port.number()).resource();
- List<Resource> allResources = getDescendantResources(portResource);
- adminService.unregister(Lists.transform(allResources, Resource::id));
- });
+ DiscreteResource portResource = Resources.discrete(device.id(), port.number()).resource();
+ List<Resource> allResources = getDescendantResources(portResource);
+ adminService.unregister(Lists.transform(allResources, Resource::id));
}
// Returns list of all descendant resources of given resource, including itself.