Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 19e77bb..fe8f1a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -162,13 +162,13 @@
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
- new InternalPutEventListener());
+ new InternalPutEventListener(), executor);
removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
clusterCommunicator.addSubscriber(removeMessageSubject,
- new InternalRemoveEventListener());
+ new InternalRemoveEventListener(), executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- new InternalAntiEntropyListener());
+ new InternalAntiEntropyListener(), backgroundExecutor);
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -728,13 +728,11 @@
log.trace("Received anti-entropy advertisement from peer: {}",
message.sender());
AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
- backgroundExecutor.submit(() -> {
- try {
- handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling advertisements", e);
- }
- });
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling advertisements", e);
+ }
}
}
@@ -745,25 +743,23 @@
log.debug("Received put event from peer: {}", message.sender());
InternalPutEvent<K, V> event = serializer.decode(message.payload());
- executor.submit(() -> {
- try {
- for (PutEntry<K, V> entry : event.entries()) {
- K key = entry.key();
- V value = entry.value();
- Timestamp timestamp = entry.timestamp();
+ try {
+ for (PutEntry<K, V> entry : event.entries()) {
+ K key = entry.key();
+ V value = entry.value();
+ Timestamp timestamp = entry.timestamp();
- if (putInternal(key, value, timestamp)) {
- EventuallyConsistentMapEvent<K, V> externalEvent =
- new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key,
- value);
- notifyListeners(externalEvent);
- }
+ if (putInternal(key, value, timestamp)) {
+ EventuallyConsistentMapEvent<K, V> externalEvent =
+ new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, key,
+ value);
+ notifyListeners(externalEvent);
}
- } catch (Exception e) {
- log.warn("Exception thrown handling put", e);
}
- });
+ } catch (Exception e) {
+ log.warn("Exception thrown handling put", e);
+ }
}
}
@@ -773,25 +769,22 @@
public void handle(ClusterMessage message) {
log.debug("Received remove event from peer: {}", message.sender());
InternalRemoveEvent<K> event = serializer.decode(message.payload());
+ try {
+ for (RemoveEntry<K> entry : event.entries()) {
+ K key = entry.key();
+ Timestamp timestamp = entry.timestamp();
- executor.submit(() -> {
- try {
- for (RemoveEntry<K> entry : event.entries()) {
- K key = entry.key();
- Timestamp timestamp = entry.timestamp();
-
- if (removeInternal(key, timestamp)) {
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE,
- key, null);
- notifyListeners(externalEvent);
- }
+ if (removeInternal(key, timestamp)) {
+ EventuallyConsistentMapEvent<K, V> externalEvent
+ = new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE,
+ key, null);
+ notifyListeners(externalEvent);
}
- } catch (Exception e) {
- log.warn("Exception thrown handling remove", e);
}
- });
+ } catch (Exception e) {
+ log.warn("Exception thrown handling remove", e);
+ }
}
}