Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 86bc173..e28a753 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -107,6 +107,9 @@
private final Logger log = getLogger(getClass());
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+
private InternalFlowTable flowTable = new InternalFlowTable();
/*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
@@ -132,6 +135,7 @@
// Cache of SMaps used for backup data. each SMap contain device flow table
private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
+ private ExecutorService messageHandlingExecutor;
private final ExecutorService backupExecutors =
Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
@@ -172,7 +176,11 @@
final NodeId local = clusterService.getLocalNode().id();
- clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/flow", "message-handlers"));
+
+ clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
@Override
@@ -181,7 +189,7 @@
log.trace("received completed notification for {}", event);
notifyDelegate(event);
}
- });
+ }, messageHandlingExecutor);
clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
@@ -196,7 +204,7 @@
log.error("Failed to respond back", e);
}
}
- });
+ }, messageHandlingExecutor);
clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
@@ -211,7 +219,7 @@
log.error("Failed to respond to peer's getFlowEntries request", e);
}
}
- });
+ }, messageHandlingExecutor);
clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
@@ -226,7 +234,7 @@
log.error("Failed to respond back", e);
}
}
- });
+ }, messageHandlingExecutor);
replicaInfoEventListener = new InternalReplicaInfoEventListener();
@@ -242,6 +250,7 @@
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
+ messageHandlingExecutor.shutdown();
replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}
@@ -421,7 +430,7 @@
switch (op.operator()) {
case ADD:
entry = new DefaultFlowEntry(op.target());
- // always add requested FlowRule

+ // always add requested FlowRule
// Note: 2 equal FlowEntry may have different treatment
flowTable.remove(entry.deviceId(), entry);
flowTable.add(entry);