[ONOS-3724] Fix the Cbench regression issue

Control message monitoring brings some overhead to controller.
In an extreme stressing environment (e.g., running Cbench),
it leads potential performance degradation.

This commit tries to mitigate the Cbench regression with two steps:
1. improve the monitoring performance by assigning more # of
threads in each thread group.
2. make the control message listening feature optional.

Change-Id: I4f7361b7c598c6de71d390eab78a20ada381d4dd
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
index 2c68fa0..b3c2091 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
@@ -65,6 +65,13 @@
     OpenFlowSwitch getEqualSwitch(Dpid dpid);
 
     /**
+     * If this set to be true, all incoming events are monitored.
+     * Other wise, only stats related incoming events are monitored
+     * @param monitor monitoring flag
+     */
+    void monitorAllEvents(boolean monitor);
+
+    /**
      * Register a listener for meta events that occur to OF
      * devices.
      * @param listener the listener to notify
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index a877a66..182935a 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -92,10 +92,10 @@
     protected OFFeaturesReply features;
     protected OFDescStatsReply desc;
 
-    protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
+    protected Set<OpenFlowEventListener> ofOutgoingMsgListener = new CopyOnWriteArraySet<>();
 
     protected ExecutorService executorMsgs =
-            Executors.newFixedThreadPool(2, groupedThreads("onos/of", "ctrl-msg-stats-%d"));
+            Executors.newCachedThreadPool(groupedThreads("onos/of", "event-outgoing-msg-stats-%d"));
 
     // messagesPendingMastership is used as synchronization variable for
     // all mastership related changes. In this block, mastership (including
@@ -167,14 +167,16 @@
             }
         }
 
-        // listen to outgoing control messages
-        msgs.forEach(m -> {
-            if (m.getType() == OFType.PACKET_OUT ||
-                m.getType() == OFType.FLOW_MOD ||
-                m.getType() == OFType.STATS_REQUEST) {
-                executorMsgs.submit(new OFMessageHandler(dpid, m));
-            }
-        });
+        // listen to outgoing control messages only if listeners are registered
+        if (ofOutgoingMsgListener.size() != 0) {
+            msgs.forEach(m -> {
+                if (m.getType() == OFType.PACKET_OUT ||
+                        m.getType() == OFType.FLOW_MOD ||
+                        m.getType() == OFType.STATS_REQUEST) {
+                    executorMsgs.submit(new OFMessageHandler(dpid, m));
+                }
+            });
+        }
     }
 
     private void sendMsgsOnChannel(List<OFMessage> msgs) {
@@ -332,12 +334,12 @@
 
     @Override
     public void addEventListener(OpenFlowEventListener listener) {
-        ofEventListener.add(listener);
+        ofOutgoingMsgListener.add(listener);
     }
 
     @Override
     public void removeEventListener(OpenFlowEventListener listener) {
-        ofEventListener.remove(listener);
+        ofOutgoingMsgListener.remove(listener);
     }
 
     @Override
@@ -547,7 +549,7 @@
 
         @Override
         public void run() {
-            for (OpenFlowEventListener listener : ofEventListener) {
+            for (OpenFlowEventListener listener : ofOutgoingMsgListener) {
                 listener.handleMessage(dpid, msg);
             }
         }
diff --git a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
index f4fe490..d431fd4 100644
--- a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
+++ b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
@@ -52,6 +52,10 @@
     }
 
     @Override
+    public void monitorAllEvents(boolean monitor) {
+    }
+
+    @Override
     public void addListener(OpenFlowSwitchListener listener) {
     }
 
diff --git a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitchTest.java b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitchTest.java
index 16989da..288f141 100644
--- a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitchTest.java
+++ b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitchTest.java
@@ -17,6 +17,8 @@
 
 import org.junit.Before;
 import org.junit.Test;
+import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowEventListener;
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.jboss.netty.channel.Channel;
 import java.util.ArrayList;
@@ -65,6 +67,7 @@
         ofSwitch.executorMsgs = executorService;
         Channel channel = new ChannelAdapter();
         ofSwitch.setChannel(channel);
+        ofSwitch.addEventListener(new OpenFlowEventListenerAdapter());
     }
 
     /**
@@ -120,4 +123,11 @@
         public void processDriverHandshakeMessage(OFMessage m) {
         }
     }
+
+    private class OpenFlowEventListenerAdapter implements OpenFlowEventListener {
+
+        @Override
+        public void handleMessage(Dpid dpid, OFMessage msg) {
+        }
+    }
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 5bca54e..eb90c26 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -115,6 +115,12 @@
     protected ExecutorService executorMsgs =
         Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d"));
 
+    protected ExecutorService executorPacketIn =
+        Executors.newCachedThreadPool(groupedThreads("onos/of", "event-pkt-in-stats-%d"));
+
+    protected ExecutorService executorFlowRemoved =
+        Executors.newCachedThreadPool(groupedThreads("onos/of", "event-flow-removed-stats-%d"));
+
     private final ExecutorService executorBarrier =
         Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
 
@@ -133,6 +139,8 @@
 
     protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
 
+    protected boolean monitorAllEvents = false;
+
     protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
             ArrayListMultimap.create();
 
@@ -210,6 +218,11 @@
     }
 
     @Override
+    public void monitorAllEvents(boolean monitor) {
+        this.monitorAllEvents = monitor;
+    }
+
+    @Override
     public void addListener(OpenFlowSwitchListener listener) {
         if (!ofSwitchListener.contains(listener)) {
             this.ofSwitchListener.add(listener);
@@ -272,13 +285,17 @@
             for (PacketListener p : ofPacketListener.values()) {
                 p.handlePacket(pktCtx);
             }
-            executorMsgs.submit(new OFMessageHandler(dpid, msg));
+            if (monitorAllEvents) {
+                executorPacketIn.submit(new OFMessageHandler(dpid, msg));
+            }
             break;
         // TODO: Consider using separate threadpool for sensitive messages.
         //    ie. Back to back error could cause us to starve.
         case FLOW_REMOVED:
-            executorMsgs.submit(new OFMessageHandler(dpid, msg));
-            break;
+            if (monitorAllEvents) {
+                executorFlowRemoved.submit(new OFMessageHandler(dpid, msg));
+                break;
+            }
         case ERROR:
             log.debug("Received error message from {}: {}", dpid, msg);
             executorMsgs.submit(new OFMessageHandler(dpid, msg));
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
index bd417bb..3fef8dd 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
@@ -53,7 +53,9 @@
     OpenFlowSwitch switch1;
     OpenFlowSwitchListenerAdapter switchListener;
     TestPacketListener packetListener;
-    TestExecutorService executorService;
+    TestExecutorService statsExecutorService;
+    TestExecutorService pktInExecutorService;
+    TestExecutorService flowRmvExecutorService;
 
     /**
      * Mock packet listener that accumulates packets.
@@ -108,12 +110,19 @@
         agent = controller.agent;
         switchListener = new OpenFlowSwitchListenerAdapter();
         controller.addListener(switchListener);
+        controller.monitorAllEvents(true);
 
         packetListener = new TestPacketListener();
         controller.addPacketListener(100, packetListener);
 
-        executorService = new TestExecutorService();
-        controller.executorMsgs = executorService;
+        statsExecutorService = new TestExecutorService();
+        pktInExecutorService = new TestExecutorService();
+        flowRmvExecutorService = new TestExecutorService();
+
+        controller.executorMsgs = statsExecutorService;
+        controller.executorPacketIn = pktInExecutorService;
+        controller.executorFlowRemoved = flowRmvExecutorService;
+
     }
 
     /**
@@ -151,8 +160,8 @@
         OFMessage packetInPacket = new MockOfPacketIn();
         controller.processPacket(dpid1, packetInPacket);
         assertThat(packetListener.contexts(), hasSize(1));
-        assertThat(executorService.submittedMessages(), hasSize(1));
-        assertThat(executorService.submittedMessages().get(0), is(packetInPacket));
+        assertThat(pktInExecutorService.submittedMessages(), hasSize(1));
+        assertThat(pktInExecutorService.submittedMessages().get(0), is(packetInPacket));
     }
 
     /**
@@ -163,8 +172,8 @@
         agent.addConnectedSwitch(dpid1, switch1);
         OfMessageAdapter errorPacket = new OfMessageAdapter(OFType.ERROR);
         controller.processPacket(dpid1, errorPacket);
-        assertThat(executorService.submittedMessages(), hasSize(1));
-        assertThat(executorService.submittedMessages().get(0), is(errorPacket));
+        assertThat(statsExecutorService.submittedMessages(), hasSize(1));
+        assertThat(statsExecutorService.submittedMessages().get(0), is(errorPacket));
     }
 
     /**
@@ -175,7 +184,7 @@
         agent.addConnectedSwitch(dpid1, switch1);
         OFMessage flowRemovedPacket = new MockOfFlowRemoved();
         controller.processPacket(dpid1, flowRemovedPacket);
-        assertThat(executorService.submittedMessages(), hasSize(1));
-        assertThat(executorService.submittedMessages().get(0), is(flowRemovedPacket));
+        assertThat(flowRmvExecutorService.submittedMessages(), hasSize(1));
+        assertThat(flowRmvExecutorService.submittedMessages().get(0), is(flowRemovedPacket));
     }
 }