GossipStores: remove potentially blocking method out of netty thread
Change-Id: I2da9ba745c3a63bf9709fb77c1f260ea8f4529a8
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index d11fa11..78838d0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -78,6 +78,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -160,8 +162,11 @@
}
};
+ private ExecutorService executor;
+
private ScheduledExecutorService backgroundExecutor;
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
@@ -178,6 +183,8 @@
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+ executor = Executors.newCachedThreadPool(namedThreads("device-fg-%d"));
+
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
@@ -194,6 +201,8 @@
@Deactivate
public void deactivate() {
+ executor.shutdownNow();
+
backgroundExecutor.shutdownNow();
try {
boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
@@ -1258,32 +1267,54 @@
}
}
- private class InternalDeviceEventListener implements ClusterMessageHandler {
+ private final class InternalDeviceEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device update event from peer: {}", message.sender());
- InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+ InternalDeviceEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
- notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device update", e);
+ }
+ }
+ });
}
}
- private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+ private final class InternalDeviceOfflineEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device offline event from peer: {}", message.sender());
- InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+ InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
- notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device offline", e);
+ }
+ }
+ });
}
}
@@ -1293,30 +1324,53 @@
public void handle(ClusterMessage message) {
log.debug("Received device remove request from peer: {}", message.sender());
DeviceId did = SERIALIZER.decode(message.payload());
- removeDevice(did);
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ removeDevice(did);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device remove", e);
+ }
+ }
+ });
}
}
- private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+ private final class InternalDeviceRemovedEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device removed event from peer: {}", message.sender());
- InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+ InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
- notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device removed", e);
+ }
+ }
+ });
}
}
- private class InternalPortEventListener implements ClusterMessageHandler {
+ private final class InternalPortEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received port update event from peer: {}", message.sender());
- InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
+ InternalPortEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
@@ -1328,16 +1382,27 @@
return;
}
- notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
+ }
+ }
+ });
}
}
- private class InternalPortStatusEventListener implements ClusterMessageHandler {
+ private final class InternalPortStatusEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received port status update event from peer: {}", message.sender());
- InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+ InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
@@ -1349,7 +1414,17 @@
return;
}
- notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
+ }
+ }
+ });
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index 41d72c3..5e7048a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -34,6 +34,8 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -137,6 +139,8 @@
}
};
+ private ExecutorService executor;
+
private ScheduledExecutorService backgroundExecutor;
@Activate
@@ -151,6 +155,8 @@
GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
new InternalHostAntiEntropyAdvertisementListener());
+ executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
+
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
@@ -166,6 +172,7 @@
@Deactivate
public void deactivate() {
+ executor.shutdownNow();
backgroundExecutor.shutdownNow();
try {
if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -459,33 +466,58 @@
}
}
- private class InternalHostEventListener implements ClusterMessageHandler {
+ private final class InternalHostEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received host update event from peer: {}", message.sender());
- InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
+ InternalHostEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
HostId hostId = event.hostId();
HostDescription hostDescription = event.hostDescription();
Timestamp timestamp = event.timestamp();
- notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
+ hostId,
+ hostDescription,
+ timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling host removed", e);
+ }
+ }
+ });
}
}
- private class InternalHostRemovedEventListener implements ClusterMessageHandler {
+ private final class InternalHostRemovedEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received host removed event from peer: {}", message.sender());
- InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
+ InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
HostId hostId = event.hostId();
Timestamp timestamp = event.timestamp();
- notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling host removed", e);
+ }
+ }
+ });
}
}
@@ -636,8 +668,8 @@
}
}
- private final class InternalHostAntiEntropyAdvertisementListener implements
- ClusterMessageHandler {
+ private final class InternalHostAntiEntropyAdvertisementListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index c465866..2a60120 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -71,6 +71,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -141,6 +143,8 @@
}
};
+ private ExecutorService executor;
+
private ScheduledExecutorService backgroundExecutors;
@Activate
@@ -156,6 +160,8 @@
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
+ executor = Executors.newCachedThreadPool(namedThreads("link-fg-%d"));
+
backgroundExecutors =
newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
@@ -172,6 +178,8 @@
@Deactivate
public void deactivate() {
+ executor.shutdownNow();
+
backgroundExecutors.shutdownNow();
try {
if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -762,7 +770,8 @@
}
}
- private class InternalLinkEventListener implements ClusterMessageHandler {
+ private final class InternalLinkEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
@@ -772,11 +781,22 @@
ProviderId providerId = event.providerId();
Timestamped<LinkDescription> linkDescription = event.linkDescription();
- notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling link event", e);
+ }
+ }
+ });
}
}
- private class InternalLinkRemovedEventListener implements ClusterMessageHandler {
+ private final class InternalLinkRemovedEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
@@ -786,11 +806,22 @@
LinkKey linkKey = event.linkKey();
Timestamp timestamp = event.timestamp();
- notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling link removed", e);
+ }
+ }
+ });
}
}
- private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
+ private final class InternalLinkAntiEntropyAdvertisementListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {