Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 60d54e3..54c35ec 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -158,24 +158,24 @@
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_UPDATE,
- new InternalLinkEventListener());
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_REMOVED,
- new InternalLinkRemovedEventListener());
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
- new InternalLinkAntiEntropyAdvertisementListener());
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_INJECTED,
- new LinkInjectedEventListener());
-
executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
backgroundExecutors =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_UPDATE,
+ new InternalLinkEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_REMOVED,
+ new InternalLinkRemovedEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
+ new InternalLinkAntiEntropyAdvertisementListener(), backgroundExecutors);
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_INJECTED,
+ new LinkInjectedEventListener(), executor);
+
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
@@ -822,17 +822,11 @@
ProviderId providerId = event.providerId();
Timestamped<LinkDescription> linkDescription = event.linkDescription();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling link event", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling link event", e);
+ }
}
}
@@ -847,17 +841,11 @@
LinkKey linkKey = event.linkKey();
Timestamp timestamp = event.timestamp();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling link removed", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling link removed", e);
+ }
}
}
@@ -868,18 +856,12 @@
public void handle(ClusterMessage message) {
log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- backgroundExecutors.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown while handling Link advertisements", e);
- throw e;
- }
- }
- });
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown while handling Link advertisements", e);
+ throw e;
+ }
}
}
@@ -894,13 +876,11 @@
ProviderId providerId = linkInjectedEvent.providerId();
LinkDescription linkDescription = linkInjectedEvent.linkDescription();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- createOrUpdateLink(providerId, linkDescription);
- }
- });
+ try {
+ createOrUpdateLink(providerId, linkDescription);
+ } catch (Exception e) {
+ log.warn("Exception thrown while handling link injected event", e);
+ }
}
}
}