GossipStores: remove potentially blocking method out of netty thread
Change-Id: I2da9ba745c3a63bf9709fb77c1f260ea8f4529a8
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index d11fa11..78838d0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -78,6 +78,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -160,8 +162,11 @@
}
};
+ private ExecutorService executor;
+
private ScheduledExecutorService backgroundExecutor;
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
@@ -178,6 +183,8 @@
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+ executor = Executors.newCachedThreadPool(namedThreads("device-fg-%d"));
+
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
@@ -194,6 +201,8 @@
@Deactivate
public void deactivate() {
+ executor.shutdownNow();
+
backgroundExecutor.shutdownNow();
try {
boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
@@ -1258,32 +1267,54 @@
}
}
- private class InternalDeviceEventListener implements ClusterMessageHandler {
+ private final class InternalDeviceEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device update event from peer: {}", message.sender());
- InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+ InternalDeviceEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
- notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device update", e);
+ }
+ }
+ });
}
}
- private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+ private final class InternalDeviceOfflineEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device offline event from peer: {}", message.sender());
- InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+ InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
- notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device offline", e);
+ }
+ }
+ });
}
}
@@ -1293,30 +1324,53 @@
public void handle(ClusterMessage message) {
log.debug("Received device remove request from peer: {}", message.sender());
DeviceId did = SERIALIZER.decode(message.payload());
- removeDevice(did);
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ removeDevice(did);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device remove", e);
+ }
+ }
+ });
}
}
- private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+ private final class InternalDeviceRemovedEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received device removed event from peer: {}", message.sender());
- InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+ InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
- notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device removed", e);
+ }
+ }
+ });
}
}
- private class InternalPortEventListener implements ClusterMessageHandler {
+ private final class InternalPortEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received port update event from peer: {}", message.sender());
- InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
+ InternalPortEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
@@ -1328,16 +1382,27 @@
return;
}
- notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
+ }
+ }
+ });
}
}
- private class InternalPortStatusEventListener implements ClusterMessageHandler {
+ private final class InternalPortStatusEventListener
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received port status update event from peer: {}", message.sender());
- InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+ InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
@@ -1349,7 +1414,17 @@
return;
}
- notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
+ }
+ }
+ });
}
}