Use typed queues for OF message processing

Process OF messages through 8 queues. Output queue for messages
controlled per OF Agent with help of message classifiers.

Queues can be configured through component configuration mechanism
for "org.onosproject.openflow.controller.impl.OpenFlowControllerImpl"
component.

Classifiers can be configured through NetworkConfig API in the following
form:

      {
        "devices": {
           "of:0000000000000001": {
               "classifiers": [{
                   "ethernet-type":"LLDP",
                   "target-queue":0
               },{
                   "ethernet-type":"BDDP",
                   "target-queue":0
               },{
                   "ethernet-type":"0x1234",
                   "target-queue":1
               }]
           }
        }
      }

Where "target_queue" is queue number from 0 to 7 (7 is default queue),
"ethernet_type" is a type of a packet either in "0xFFFF" from or enum
name as defined in the "org.onlab.packet.EthType.EtherType" enum.

Change-Id: I0512ef653d90c36f00289014872170c1a8aa5204
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
index 6cf1ddc..c71de08 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.onosproject.openflow.controller.impl;
 
 import com.google.common.base.MoreObjects;
@@ -96,6 +95,12 @@
 
     private static final short MIN_KS_LENGTH = 6;
 
+    //Default queues settings
+    private static final short DEFAULT_QUEUE_SIZE = 5000;
+    private static final short FIRST_QUEUE_SIZE = 1000;
+    private static final short DEFAULT_BULK_SIZE = 100;
+    private static final short DEFAULT_QUEUE_ID = 7;
+
     protected HashMap<String, String> controllerNodeIPsCache;
 
     private ChannelGroup cg;
@@ -103,6 +108,8 @@
     // Configuration options
     protected List<Integer> openFlowPorts = ImmutableList.of(6633, 6653);
     protected int workerThreads = 0;
+    protected int[] cfgQueueSizes = {FIRST_QUEUE_SIZE, 0, 0, 0, 0, 0, 0, DEFAULT_QUEUE_SIZE};
+    protected int[] cfgBulkSizes = new int[8];
 
     // Start time of the controller
     protected long systemStartTime;
@@ -129,7 +136,17 @@
     private DriverService driverService;
     private NetworkConfigRegistry netCfgService;
 
+    public Controller() {
+        Arrays.fill(cfgBulkSizes, DEFAULT_BULK_SIZE);
+    }
 
+    public int getQueueSize(int queueId) {
+        return cfgQueueSizes[queueId];
+    }
+
+    public int getBulkSize(int queueId) {
+        return cfgBulkSizes[queueId];
+    }
 
     // **************
     // Initialization
@@ -221,6 +238,29 @@
                     .channel(NioServerSocketChannel.class);
     }
 
+    public void setQueueParams(Dictionary<?, ?> properties, String sizeParamName, String bulkParamName, int queueId) {
+        String queueSize = get(properties, sizeParamName);
+        if (!Strings.isNullOrEmpty(queueSize)) {
+            int size = Integer.parseInt(queueSize);
+            if (size > 0) {
+                this.cfgQueueSizes[queueId] = size;
+            } else {
+                throw new IllegalArgumentException(
+                    String.format("%s value must be either a positive integer value", sizeParamName));
+            }
+        }
+        String bulkSize = get(properties, bulkParamName);
+        if (!Strings.isNullOrEmpty(bulkSize)) {
+            int bulk = Integer.parseInt(bulkSize);
+            if (bulk > 0) {
+                this.cfgBulkSizes[queueId] = bulk;
+            } else {
+                throw new IllegalArgumentException(
+                    String.format("%s value must be either a positive integer value", bulkParamName));
+            }
+        }
+    }
+
     public void setConfigParams(Dictionary<?, ?> properties) {
         boolean restartRequired = setOpenFlowPorts(properties);
         restartRequired |= setWorkerThreads(properties);
@@ -273,6 +313,16 @@
             this.workerThreads = Integer.parseInt(threads);
         }
         log.debug("Number of worker threads set to {}", this.workerThreads);
+
+        setQueueParams(properties, "defaultQueueSize", "defaultBulkSize", DEFAULT_QUEUE_ID);
+        setQueueParams(properties, "queueSizeN0", "bulkSizeN0", 0);
+        setQueueParams(properties, "queueSizeN1", "bulkSizeN1", 1);
+        setQueueParams(properties, "queueSizeN2", "bulkSizeN2", 2);
+        setQueueParams(properties, "queueSizeN3", "bulkSizeN3", 3);
+        setQueueParams(properties, "queueSizeN4", "bulkSizeN4", 4);
+        setQueueParams(properties, "queueSizeN5", "bulkSizeN5", 5);
+        setQueueParams(properties, "queueSizeN6", "bulkSizeN6", 6);
+
         return oldValue != this.workerThreads; // restart if number of threads has changed
     }