GossipStores: remove potentially blocking method out of netty thread
Change-Id: I2da9ba745c3a63bf9709fb77c1f260ea8f4529a8
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index c465866..2a60120 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -71,6 +71,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -141,6 +143,8 @@
}
};
+ private ExecutorService executor;
+
private ScheduledExecutorService backgroundExecutors;
@Activate
@@ -156,6 +160,8 @@
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
+ executor = Executors.newCachedThreadPool(namedThreads("link-fg-%d"));
+
backgroundExecutors =
newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
@@ -172,6 +178,8 @@
@Deactivate
public void deactivate() {
+ executor.shutdownNow();
+
backgroundExecutors.shutdownNow();
try {
if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -762,7 +770,8 @@
}
}
- private class InternalLinkEventListener implements ClusterMessageHandler {
+ private final class InternalLinkEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
@@ -772,11 +781,22 @@
ProviderId providerId = event.providerId();
Timestamped<LinkDescription> linkDescription = event.linkDescription();
- notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling link event", e);
+ }
+ }
+ });
}
}
- private class InternalLinkRemovedEventListener implements ClusterMessageHandler {
+ private final class InternalLinkRemovedEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
@@ -786,11 +806,22 @@
LinkKey linkKey = event.linkKey();
Timestamp timestamp = event.timestamp();
- notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling link removed", e);
+ }
+ }
+ });
}
}
- private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
+ private final class InternalLinkAntiEntropyAdvertisementListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {