Use typed queues for OF message processing
Process OF messages through 8 queues. Output queue for messages
controlled per OF Agent with help of message classifiers.
Queues can be configured through component configuration mechanism
for "org.onosproject.openflow.controller.impl.OpenFlowControllerImpl"
component.
Classifiers can be configured through NetworkConfig API in the following
form:
{
"devices": {
"of:0000000000000001": {
"classifiers": [{
"ethernet-type":"LLDP",
"target-queue":0
},{
"ethernet-type":"BDDP",
"target-queue":0
},{
"ethernet-type":"0x1234",
"target-queue":1
}]
}
}
}
Where "target_queue" is queue number from 0 to 7 (7 is default queue),
"ethernet_type" is a type of a packet either in "0xFFFF" from or enum
name as defined in the "org.onlab.packet.EthType.EtherType" enum.
Change-Id: I0512ef653d90c36f00289014872170c1a8aa5204
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index ee9422f..4696778 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -30,12 +30,16 @@
import org.onosproject.openflow.config.OpenFlowDeviceConfig;
import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifierListener;
import org.onosproject.openflow.controller.OpenFlowController;
import org.onosproject.openflow.controller.OpenFlowEventListener;
import org.onosproject.openflow.controller.OpenFlowMessageListener;
import org.onosproject.openflow.controller.OpenFlowPacketContext;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.OpenFlowSwitchListener;
+import org.onosproject.openflow.controller.OpenFlowListener;
+import org.onosproject.openflow.controller.OpenFlowService;
+import org.onosproject.openflow.controller.OpenFlowEvent;
import org.onosproject.openflow.controller.PacketListener;
import org.onosproject.openflow.controller.RoleState;
import org.onosproject.openflow.controller.driver.OpenFlowAgent;
@@ -104,8 +108,25 @@
KEY_STORE_PASSWORD + "=" + KEY_STORE_PASSWORD_DEFAULT,
TRUST_STORE + "=" + TRUST_STORE_DEFAULT,
TRUST_STORE_PASSWORD + "=" + TRUST_STORE_PASSWORD_DEFAULT,
+ DEFAULT_QUEUE_SIZE + ":Integer=" + DEFAULT_QUEUE_SIZE_DEFAULT,
+ DEBAULT_BULK_SIZE + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N0 + ":Integer=" + QUEUE_SIZE_N0_DEFAULT,
+ BULK_SIZE_N0 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N1 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N1 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N2 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N2 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N3 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N3 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N4 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N4 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N5 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N5 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N6 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N6 + ":Integer=" + BULK_SIZE_DEFAULT,
}
)
+
public class OpenFlowControllerImpl implements OpenFlowController {
private static final String APP_ID = "org.onosproject.openflow-base";
protected static final String SCHEME = "of";
@@ -131,12 +152,17 @@
/** Number of controller worker threads. */
private int workerThreads = WORKER_THREADS_DEFAULT;
- /** TLS mode for OpenFlow channel; options are: disabled [default], enabled, strict. */
+ /** TLS mode for OpenFlow channel; options are: disabled [default], enabled, strict. */
private String tlsMode;
/** File path to key store for TLS connections. */
private String keyStore;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OpenFlowService openFlowManager;
+
+ private final OpenFlowListener openFlowListener = new InternalOpenFlowListener();
+
/** Key store password. */
private String keyStorePassword;
@@ -146,6 +172,54 @@
/** Trust store password. */
private String trustStorePassword;
+ /** Size of deafult queue. */
+ private int defaultQueueSize = DEFAULT_QUEUE_SIZE_DEFAULT;
+
+ /** Size of deafult bulk. */
+ private int defaultBulkSize = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N0. */
+ private int queueSizeN0 = QUEUE_SIZE_N0_DEFAULT;
+
+ /** Size of bulk N0. */
+ private int bulkSizeN0 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N1. */
+ private int queueSizeN1 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N1. */
+ private int bulkSizeN1 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N2. */
+ private int queueSizeN2 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N2. */
+ private int bulkSizeN2 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N3. */
+ private int queueSizeN3 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N3. */
+ private int bulkSizeN3 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N4. */
+ private int queueSizeN4 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N4. */
+ private int bulkSizeN4 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N5. */
+ private int queueSizeN5 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N5. */
+ private int bulkSizeN5 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N6. */
+ private int queueSizeN6 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N6. */
+ private int bulkSizeN6 = BULK_SIZE_DEFAULT;
+
protected ExecutorService executorMsgs =
Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d", log));
@@ -179,6 +253,8 @@
protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
+ protected Set<OpenFlowClassifierListener> ofClassifierListener = new CopyOnWriteArraySet<>();
+
protected Set<OpenFlowMessageListener> ofMessageListener = new CopyOnWriteArraySet<>();
protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
@@ -266,6 +342,7 @@
netCfgService.addListener(netCfgListener);
ctrl.setConfigParams(context.getProperties());
ctrl.start(agent, driverService, netCfgService);
+ openFlowManager.addListener(openFlowListener);
}
private void cleanup() {
@@ -276,6 +353,7 @@
connectedSwitches.clear();
activeMasterSwitches.clear();
activeEqualSwitches.clear();
+ openFlowManager.removeListener(openFlowListener);
}
@Deactivate
@@ -334,6 +412,16 @@
}
@Override
+ public void addClassifierListener(OpenFlowClassifierListener listener) {
+ this.ofClassifierListener.add(listener);
+ }
+
+ @Override
+ public void removeClassifierListener(OpenFlowClassifierListener listener) {
+ this.ofClassifierListener.remove(listener);
+ }
+
+ @Override
public void addMessageListener(OpenFlowMessageListener listener) {
ofMessageListener.add(listener);
}
@@ -840,6 +928,16 @@
l.receivedRoleReply(dpid, requested, response);
}
}
+
+ @Override
+ public void addClassifierListener(OpenFlowClassifierListener listener) {
+ ofClassifierListener.add(listener);
+ }
+
+ @Override
+ public void removeClassifierListener(OpenFlowClassifierListener listener) {
+ ofClassifierListener.remove(listener);
+ }
}
/**
@@ -862,4 +960,28 @@
}
}
}
+
+ private class InternalOpenFlowListener implements OpenFlowListener {
+ public void event(OpenFlowEvent event) {
+ try {
+ switch (event.type()) {
+ case INSERT:
+ for (OpenFlowClassifierListener listener : ofClassifierListener) {
+ listener.handleClassifiersAdd(event.subject());
+ }
+ break;
+ case REMOVE:
+ for (OpenFlowClassifierListener listener : ofClassifierListener) {
+ listener.handleClassifiersRemove(event.subject());
+ }
+ break;
+ default:
+ log.warn("Unknown OpenFlow classifier event type: {}", event.type());
+ break;
+ }
+ } catch (Exception e) {
+ log.error("Internal OpenFlowListener exception: {}", e.getMessage());
+ }
+ }
+ }
}