[ONOS-3724] Fix the Cbench regression issue

Control message monitoring brings some overhead to controller.
In an extreme stressing environment (e.g., running Cbench),
it leads potential performance degradation.

This commit tries to mitigate the Cbench regression with two steps:
1. improve the monitoring performance by assigning more # of
threads in each thread group.
2. make the control message listening feature optional.

Change-Id: I4f7361b7c598c6de71d390eab78a20ada381d4dd
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 5bca54e..eb90c26 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -115,6 +115,12 @@
     protected ExecutorService executorMsgs =
         Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d"));
 
+    protected ExecutorService executorPacketIn =
+        Executors.newCachedThreadPool(groupedThreads("onos/of", "event-pkt-in-stats-%d"));
+
+    protected ExecutorService executorFlowRemoved =
+        Executors.newCachedThreadPool(groupedThreads("onos/of", "event-flow-removed-stats-%d"));
+
     private final ExecutorService executorBarrier =
         Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
 
@@ -133,6 +139,8 @@
 
     protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
 
+    protected boolean monitorAllEvents = false;
+
     protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
             ArrayListMultimap.create();
 
@@ -210,6 +218,11 @@
     }
 
     @Override
+    public void monitorAllEvents(boolean monitor) {
+        this.monitorAllEvents = monitor;
+    }
+
+    @Override
     public void addListener(OpenFlowSwitchListener listener) {
         if (!ofSwitchListener.contains(listener)) {
             this.ofSwitchListener.add(listener);
@@ -272,13 +285,17 @@
             for (PacketListener p : ofPacketListener.values()) {
                 p.handlePacket(pktCtx);
             }
-            executorMsgs.submit(new OFMessageHandler(dpid, msg));
+            if (monitorAllEvents) {
+                executorPacketIn.submit(new OFMessageHandler(dpid, msg));
+            }
             break;
         // TODO: Consider using separate threadpool for sensitive messages.
         //    ie. Back to back error could cause us to starve.
         case FLOW_REMOVED:
-            executorMsgs.submit(new OFMessageHandler(dpid, msg));
-            break;
+            if (monitorAllEvents) {
+                executorFlowRemoved.submit(new OFMessageHandler(dpid, msg));
+                break;
+            }
         case ERROR:
             log.debug("Received error message from {}: {}", dpid, msg);
             executorMsgs.submit(new OFMessageHandler(dpid, msg));