cherry picking the fix for ONOS-4754 to onos-1.6 and master

Change-Id: I2c7da62479566f16034b598029df5f98a37cc99e
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 43b91fd..3af1a89 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -120,6 +120,14 @@
     private final ExecutorService executorBarrier =
         Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log));
 
+    //Separate executor thread for handling error messages and barrier replies for same failed
+    // transactions to avoid context switching of thread
+    protected ExecutorService executorErrorMsgs =
+            Executors.newSingleThreadExecutor(groupedThreads("onos/of", "event-error-msg-%d", log));
+
+    //concurrent hashmap to track failed transactions
+    protected ConcurrentMap<Long, Boolean> errorMsgs =
+            new ConcurrentHashMap<>();
     protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches =
             new ConcurrentHashMap<>();
     protected ConcurrentMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
@@ -294,9 +302,13 @@
         // TODO: Consider using separate threadpool for sensitive messages.
         //    ie. Back to back error could cause us to starve.
         case FLOW_REMOVED:
-        case ERROR:
             executorMsgs.execute(new OFMessageHandler(dpid, msg));
             break;
+        case ERROR:
+            log.debug("Received error message from {}: {}", dpid, msg);
+            errorMsgs.putIfAbsent(msg.getXid(), true);
+            executorErrorMsgs.execute(new OFMessageHandler(dpid, msg));
+            break;
         case STATS_REPLY:
             OFStatsReply reply = (OFStatsReply) msg;
             switch (reply.getStatsType()) {
@@ -403,7 +415,14 @@
             }
             break;
         case BARRIER_REPLY:
-            executorBarrier.execute(new OFMessageHandler(dpid, msg));
+            if (errorMsgs.containsKey(msg.getXid())) {
+                //To make oferror msg handling and corresponding barrier reply serialized,
+                // executorErrorMsgs is used for both transaction
+                errorMsgs.remove(msg.getXid());
+                executorErrorMsgs.execute(new OFMessageHandler(dpid, msg));
+            } else {
+                executorBarrier.execute(new OFMessageHandler(dpid, msg));
+            }
             break;
         case EXPERIMENTER:
             long experimenter = ((OFExperimenter) msg).getExperimenter();
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
index 54fae4e..c4bdc7a 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
@@ -52,7 +52,7 @@
     OpenFlowSwitchListenerAdapter switchListener;
     TestPacketListener packetListener;
     TestExecutorService statsExecutorService;
-
+    TestExecutorService errorMsgExecutorService;
     /**
      * Mock packet listener that accumulates packets.
      */
@@ -109,8 +109,10 @@
         controller.addPacketListener(100, packetListener);
 
         statsExecutorService = new TestExecutorService();
+        errorMsgExecutorService = new TestExecutorService();
 
         controller.executorMsgs = statsExecutorService;
+        controller.executorErrorMsgs = errorMsgExecutorService;
     }
 
     /**
@@ -147,7 +149,7 @@
         agent.addConnectedSwitch(dpid1, switch1);
         OfMessageAdapter errorPacket = new OfMessageAdapter(OFType.ERROR);
         controller.processPacket(dpid1, errorPacket);
-        assertThat(statsExecutorService.submittedMessages(), hasSize(1));
-        assertThat(statsExecutorService.submittedMessages().get(0), is(errorPacket));
+        assertThat(errorMsgExecutorService.submittedMessages(), hasSize(1));
+        assertThat(errorMsgExecutorService.submittedMessages().get(0), is(errorPacket));
     }
 }