Fix for ticket no. ONOS-4754
Change-Id: Ic878096dcf8957e9903e22e6f4bf2e8ecda6897c
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 3062596..c88a207 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -124,6 +124,14 @@
private final ExecutorService executorBarrier =
Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log));
+ //Separate executor thread for handling error messages and barrier replies for same failed
+ // transactions to avoid context switching of thread
+ protected ExecutorService executorErrorMsgs =
+ Executors.newSingleThreadExecutor(groupedThreads("onos/of", "event-error-msg-%d", log));
+
+ //concurrent hashmap to track failed transactions
+ protected ConcurrentMap<Long, Boolean> errorMsgs =
+ new ConcurrentHashMap<>();
protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches =
new ConcurrentHashMap<>();
protected ConcurrentMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
@@ -299,7 +307,8 @@
}
case ERROR:
log.debug("Received error message from {}: {}", dpid, msg);
- executorMsgs.execute(new OFMessageHandler(dpid, msg));
+ errorMsgs.putIfAbsent(msg.getXid(), true);
+ executorErrorMsgs.execute(new OFMessageHandler(dpid, msg));
break;
case STATS_REPLY:
OFStatsReply reply = (OFStatsReply) msg;
@@ -407,7 +416,14 @@
}
break;
case BARRIER_REPLY:
- executorBarrier.execute(new OFMessageHandler(dpid, msg));
+ if (errorMsgs.containsKey(msg.getXid())) {
+ //To make oferror msg handling and corresponding barrier reply serialized,
+ // executorErrorMsgs is used for both transaction
+ errorMsgs.remove(msg.getXid());
+ executorErrorMsgs.execute(new OFMessageHandler(dpid, msg));
+ } else {
+ executorBarrier.execute(new OFMessageHandler(dpid, msg));
+ }
break;
case EXPERIMENTER:
long experimenter = ((OFExperimenter) msg).getExperimenter();
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
index 9f713ee..0a28a8f 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
@@ -53,6 +53,7 @@
TestExecutorService statsExecutorService;
TestExecutorService pktInExecutorService;
TestExecutorService flowRmvExecutorService;
+ TestExecutorService errorMsgExecutorService;
/**
* Mock packet listener that accumulates packets.
@@ -114,10 +115,12 @@
statsExecutorService = new TestExecutorService();
pktInExecutorService = new TestExecutorService();
flowRmvExecutorService = new TestExecutorService();
+ errorMsgExecutorService = new TestExecutorService();
controller.executorMsgs = statsExecutorService;
controller.executorPacketIn = pktInExecutorService;
controller.executorFlowRemoved = flowRmvExecutorService;
+ controller.executorErrorMsgs = errorMsgExecutorService;
}
@@ -168,8 +171,8 @@
agent.addConnectedSwitch(dpid1, switch1);
OfMessageAdapter errorPacket = new OfMessageAdapter(OFType.ERROR);
controller.processPacket(dpid1, errorPacket);
- assertThat(statsExecutorService.submittedMessages(), hasSize(1));
- assertThat(statsExecutorService.submittedMessages().get(0), is(errorPacket));
+ assertThat(errorMsgExecutorService.submittedMessages(), hasSize(1));
+ assertThat(errorMsgExecutorService.submittedMessages().get(0), is(errorPacket));
}
/**