Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 602fd9f..f16bd06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -176,29 +176,36 @@
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
- clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+ new InternalDeviceOfflineEventListener(),
+ executor);
+ clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
+ new InternalRemoveRequestListener(),
+ executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
+ new InternalDeviceAdvertisementListener(),
+ backgroundExecutor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
+
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
@@ -1325,17 +1332,11 @@
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling device update", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device update", e);
+ }
}
}
@@ -1350,17 +1351,11 @@
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling device offline", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device offline", e);
+ }
}
}
@@ -1371,17 +1366,11 @@
log.debug("Received device remove request from peer: {}", message.sender());
DeviceId did = SERIALIZER.decode(message.payload());
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- removeDevice(did);
- } catch (Exception e) {
- log.warn("Exception thrown handling device remove", e);
- }
- }
- });
+ try {
+ removeDevice(did);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device remove", e);
+ }
}
}
@@ -1396,17 +1385,11 @@
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling device removed", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device removed", e);
+ }
}
}
@@ -1428,17 +1411,11 @@
return;
}
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
- } catch (Exception e) {
- log.warn("Exception thrown handling port update", e);
- }
- }
- });
+ try {
+ notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
+ }
}
}
@@ -1460,17 +1437,11 @@
return;
}
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling port update", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
+ }
}
}
@@ -1481,17 +1452,11 @@
public void handle(ClusterMessage message) {
log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- backgroundExecutor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- handleAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling Device advertisements.", e);
- }
- }
- });
+ try {
+ handleAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling Device advertisements.", e);
+ }
}
}
@@ -1507,13 +1472,11 @@
DeviceId deviceId = event.deviceId();
DeviceDescription deviceDescription = event.deviceDescription();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- createOrUpdateDevice(providerId, deviceId, deviceDescription);
- }
- });
+ try {
+ createOrUpdateDevice(providerId, deviceId, deviceDescription);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device injected event.", e);
+ }
}
}
@@ -1529,13 +1492,11 @@
DeviceId deviceId = event.deviceId();
List<PortDescription> portDescriptions = event.portDescriptions();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- updatePorts(providerId, deviceId, portDescriptions);
- }
- });
+ try {
+ updatePorts(providerId, deviceId, portDescriptions);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port injected event.", e);
+ }
}
}
}