Use typed queues for OF message processing
Process OF messages through 8 queues. Output queue for messages
controlled per OF Agent with help of message classifiers.
Queues can be configured through component configuration mechanism
for "org.onosproject.openflow.controller.impl.OpenFlowControllerImpl"
component.
Classifiers can be configured through NetworkConfig API in the following
form:
{
"devices": {
"of:0000000000000001": {
"classifiers": [{
"ethernet-type":"LLDP",
"target-queue":0
},{
"ethernet-type":"BDDP",
"target-queue":0
},{
"ethernet-type":"0x1234",
"target-queue":1
}]
}
}
}
Where "target_queue" is queue number from 0 to 7 (7 is default queue),
"ethernet_type" is a type of a packet either in "0xFFFF" from or enum
name as defined in the "org.onlab.packet.EthType.EtherType" enum.
Change-Id: I0512ef653d90c36f00289014872170c1a8aa5204
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
index 6cf1ddc..c71de08 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.onosproject.openflow.controller.impl;
import com.google.common.base.MoreObjects;
@@ -96,6 +95,12 @@
private static final short MIN_KS_LENGTH = 6;
+ //Default queues settings
+ private static final short DEFAULT_QUEUE_SIZE = 5000;
+ private static final short FIRST_QUEUE_SIZE = 1000;
+ private static final short DEFAULT_BULK_SIZE = 100;
+ private static final short DEFAULT_QUEUE_ID = 7;
+
protected HashMap<String, String> controllerNodeIPsCache;
private ChannelGroup cg;
@@ -103,6 +108,8 @@
// Configuration options
protected List<Integer> openFlowPorts = ImmutableList.of(6633, 6653);
protected int workerThreads = 0;
+ protected int[] cfgQueueSizes = {FIRST_QUEUE_SIZE, 0, 0, 0, 0, 0, 0, DEFAULT_QUEUE_SIZE};
+ protected int[] cfgBulkSizes = new int[8];
// Start time of the controller
protected long systemStartTime;
@@ -129,7 +136,17 @@
private DriverService driverService;
private NetworkConfigRegistry netCfgService;
+ public Controller() {
+ Arrays.fill(cfgBulkSizes, DEFAULT_BULK_SIZE);
+ }
+ public int getQueueSize(int queueId) {
+ return cfgQueueSizes[queueId];
+ }
+
+ public int getBulkSize(int queueId) {
+ return cfgBulkSizes[queueId];
+ }
// **************
// Initialization
@@ -221,6 +238,29 @@
.channel(NioServerSocketChannel.class);
}
+ public void setQueueParams(Dictionary<?, ?> properties, String sizeParamName, String bulkParamName, int queueId) {
+ String queueSize = get(properties, sizeParamName);
+ if (!Strings.isNullOrEmpty(queueSize)) {
+ int size = Integer.parseInt(queueSize);
+ if (size > 0) {
+ this.cfgQueueSizes[queueId] = size;
+ } else {
+ throw new IllegalArgumentException(
+ String.format("%s value must be either a positive integer value", sizeParamName));
+ }
+ }
+ String bulkSize = get(properties, bulkParamName);
+ if (!Strings.isNullOrEmpty(bulkSize)) {
+ int bulk = Integer.parseInt(bulkSize);
+ if (bulk > 0) {
+ this.cfgBulkSizes[queueId] = bulk;
+ } else {
+ throw new IllegalArgumentException(
+ String.format("%s value must be either a positive integer value", bulkParamName));
+ }
+ }
+ }
+
public void setConfigParams(Dictionary<?, ?> properties) {
boolean restartRequired = setOpenFlowPorts(properties);
restartRequired |= setWorkerThreads(properties);
@@ -273,6 +313,16 @@
this.workerThreads = Integer.parseInt(threads);
}
log.debug("Number of worker threads set to {}", this.workerThreads);
+
+ setQueueParams(properties, "defaultQueueSize", "defaultBulkSize", DEFAULT_QUEUE_ID);
+ setQueueParams(properties, "queueSizeN0", "bulkSizeN0", 0);
+ setQueueParams(properties, "queueSizeN1", "bulkSizeN1", 1);
+ setQueueParams(properties, "queueSizeN2", "bulkSizeN2", 2);
+ setQueueParams(properties, "queueSizeN3", "bulkSizeN3", 3);
+ setQueueParams(properties, "queueSizeN4", "bulkSizeN4", 4);
+ setQueueParams(properties, "queueSizeN5", "bulkSizeN5", 5);
+ setQueueParams(properties, "queueSizeN6", "bulkSizeN6", 6);
+
return oldValue != this.workerThreads; // restart if number of threads has changed
}