Offload OF Port Status message processing to a single thread executor.
This avoid processing Port Status messages in the same thread of the channel read
avoiding blocking the processing if too many messages are received at once.
Change-Id: I397723d621f69fbac9478f4f3d91a277d17cfa28
(cherry picked from commit dad65ac5a058242326fe91fbe47e65fa9c5a3583)
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 9f78a11..84d9deb 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -16,6 +16,7 @@
package org.onosproject.openflow.controller.impl;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.packet.Ethernet.TYPE_BSN;
import static org.onlab.packet.Ethernet.TYPE_LLDP;
import static org.onlab.util.Tools.groupedThreads;
@@ -235,6 +236,12 @@
private final Deque<OFMessage> dispatchBacklog;
/**
+ * Port Status executor to offload from the main thread the processing of port
+ * status OF messages.
+ */
+ protected ExecutorService portStatusExecutor;
+
+ /**
* Create a new unconnected OFChannelHandler.
* @param controller parent controller
*/
@@ -245,6 +252,8 @@
this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
this.portDescReplies = new ArrayList<>();
duplicateDpidFound = Boolean.FALSE;
+ portStatusExecutor = newSingleThreadExecutor(
+ groupedThreads("onos/of-channel-handler", "port-status-%d", log));
//Initialize queues and classifiers
dispatchBacklog = new LinkedBlockingDeque<>(BACKLOG_READ_BUFFER_DEFAULT);
for (int i = 0; i < NUM_OF_QUEUES; i++) {
@@ -907,7 +916,28 @@
@Override
void processOFPortStatus(OFChannelHandler h, OFPortStatus m)
throws SwitchStateException {
- handlePortStatusMessage(h, m, true);
+ // Handing over processing of port status messages to a thread to avoid
+ // getting blocked on the main thread and resulting other OF
+ // message being delayed.
+ // Ordering of the port status messages is guaranteed by portStatsExecutor
+ // being a single threaded executor.
+ // This executor will execute concurrently to the netty thread;
+ // meaning that the order is no more guaranteed like it was in the
+ // past between port status handling and the other events handled
+ // inline to the netty thread.
+ // This also remove guarantees of ordered processing of ROLE_CHANGED
+ // during active state, this should have no effect given that mastership
+ // is ignored here: https://github.com/opennetworkinglab/onos/blob/master/
+ // protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/
+ // driver/AbstractOpenFlowSwitch.java#L279
+ h.portStatusExecutor.submit(() -> {
+ try {
+ handlePortStatusMessage(h, m, true);
+ } catch (SwitchStateException e) {
+ log.error("SwitchStateException while processing " +
+ "port status message {}", m, e);
+ }
+ });
//h.dispatchMessage(m);
}