Use two thread pools for handling the OpenFlow STATS and BARRIER messages.
This fixes a problem where a large number of incoming STATS messages
is practically using all available threds from the pool (16), and
there are no available threads to handle the BARRIER messages.

Change-Id: I1130eb8f3b5a17d5d3a7825f32da68eacb99569a

fixing other threadpool issues, ie. not using cachedThreadPool

Change-Id: I40ef10e1f704aef779b2a23c0497dfb7992520eb
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 9578944..0f543d0 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -103,7 +103,8 @@
 
     @Activate
     public void activate() {
-        futureService = Executors.newCachedThreadPool(namedThreads("provider-future-listeners-%d"));
+        futureService =
+                Executors.newFixedThreadPool(32, namedThreads("provider-future-listeners-%d"));
         store.setDelegate(delegate);
         eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
         log.info("Started");
diff --git a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index edd2f3a..51c746f 100644
--- a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -67,8 +67,13 @@
     private static final Logger log =
             LoggerFactory.getLogger(OpenFlowControllerImpl.class);
 
-    private final ExecutorService executor = Executors.newFixedThreadPool(16,
-            namedThreads("of-event-%d"));
+    private final ExecutorService executorMsgs =
+        Executors.newFixedThreadPool(32,
+                                     namedThreads("of-event-stats-%d"));
+
+    private final ExecutorService executorBarrier =
+        Executors.newFixedThreadPool(4,
+                                     namedThreads("of-event-barrier-%d"));
 
     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
             new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
@@ -189,6 +194,12 @@
                 p.handlePacket(pktCtx);
             }
             break;
+        // TODO: Consider using separate threadpool for sensitive messages.
+        //    ie. Back to back error could cause us to starve.
+        case FLOW_REMOVED:
+        case ERROR:
+            executorMsgs.submit(new OFMessageHandler(dpid, msg));
+            break;
         case STATS_REPLY:
             OFStatsReply reply = (OFStatsReply) msg;
             if (reply.getStatsType().equals(OFStatsType.PORT_DESC)) {
@@ -201,14 +212,11 @@
                 OFFlowStatsReply.Builder rep =
                         OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
                 rep.setEntries(Lists.newLinkedList(stats));
-                executor.submit(new OFMessageHandler(dpid, rep.build()));
+                executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
             }
             break;
-
-        case FLOW_REMOVED:
-        case ERROR:
         case BARRIER_REPLY:
-            executor.submit(new OFMessageHandler(dpid, msg));
+            executorBarrier.submit(new OFMessageHandler(dpid, msg));
             break;
         case EXPERIMENTER:
             // Handle optical port stats
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 0744b1b..d7bf539 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -329,7 +329,7 @@
             // Do nothing here for now.
         }
 
-        private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
+        private void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
 
             DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
             final OFFlowStatsReply replies = (OFFlowStatsReply) stats;