Use typed queues for OF message processing
Process OF messages through 8 queues. Output queue for messages
controlled per OF Agent with help of message classifiers.
Queues can be configured through component configuration mechanism
for "org.onosproject.openflow.controller.impl.OpenFlowControllerImpl"
component.
Classifiers can be configured through NetworkConfig API in the following
form:
{
"devices": {
"of:0000000000000001": {
"classifiers": [{
"ethernet-type":"LLDP",
"target-queue":0
},{
"ethernet-type":"BDDP",
"target-queue":0
},{
"ethernet-type":"0x1234",
"target-queue":1
}]
}
}
}
Where "target_queue" is queue number from 0 to 7 (7 is default queue),
"ethernet_type" is a type of a packet either in "0xFFFF" from or enum
name as defined in the "org.onlab.packet.EthType.EtherType" enum.
Change-Id: I0512ef653d90c36f00289014872170c1a8aa5204
diff --git a/protocols/openflow/api/BUILD b/protocols/openflow/api/BUILD
index bc5f55a..5e113b2 100644
--- a/protocols/openflow/api/BUILD
+++ b/protocols/openflow/api/BUILD
@@ -1,4 +1,4 @@
-COMPILE_DEPS = CORE_DEPS + NETTY + [
+COMPILE_DEPS = CORE_DEPS + NETTY + JACKSON + [
"//core/common:onos-core-common",
"@openflowj//jar",
"@io_netty_netty_transport//jar",
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifier.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifier.java
new file mode 100644
index 0000000..6b746a0
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifier.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import org.onosproject.net.DeviceId;
+import java.util.Objects;
+
+/**
+ * Represents to OpenFlow messages classifier.
+ */
+public final class OpenFlowClassifier {
+
+ private final short ethernetType;
+ private final int idQueue;
+ private final DeviceId deviceId;
+
+ /**
+ * Builder of the OpenFlow classifier.
+ */
+ public static class Builder {
+ private Short ethernetType = 0;
+ private int idQueue;
+ private DeviceId deviceId;
+
+ /**
+ * Builder constructor for OpenFlow classifier.
+ *
+ * @param deviceId the device id
+ * @param idQueue the queue id
+ */
+ public Builder(DeviceId deviceId, int idQueue) {
+ this.deviceId = deviceId;
+ this.idQueue = idQueue;
+ }
+
+ /**
+ * Sets the ethernet type for the OpenFlow classifier that will be built.
+ *
+ * @param ethernetType the ethernet type
+ * @return this builder
+ */
+ public Builder ethernetType(short ethernetType) {
+ this.ethernetType = ethernetType;
+ return this;
+ }
+
+ /**
+ * Builds the OpenFlow classifier from the accumulated parameters.
+ *
+ * @return OpenFlow classifier instance
+ */
+ public OpenFlowClassifier build() {
+ return new OpenFlowClassifier(this);
+ }
+ }
+
+ private OpenFlowClassifier(Builder builder) {
+ this.idQueue = builder.idQueue;
+ this.ethernetType = builder.ethernetType;
+ this.deviceId = builder.deviceId;
+ }
+
+ /**
+ * Gets the ethernet type matched by the classifier.
+ *
+ * @return matched packet ethernet type
+ */
+ public short ethernetType() {
+ return this.ethernetType;
+ }
+
+ /**
+ * Gets the id of source OpenFlow device matched by the classifier.
+ *
+ * @return connected device id
+ */
+ public DeviceId deviceId() {
+ return this.deviceId;
+ }
+
+ /**
+ * Gets the queue id targeted by the classifier.
+ *
+ * @return target queue id
+ */
+ public int idQueue() {
+ return this.idQueue;
+ }
+
+ /**
+ * Compares OpenFlow classifiers.
+ *
+ * @param o object that we want to compare to
+ * @return equality check result
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof OpenFlowClassifier)) {
+ return false;
+ }
+ OpenFlowClassifier classifier = (OpenFlowClassifier) o;
+ return this.ethernetType == classifier.ethernetType()
+ && this.idQueue == classifier.idQueue()
+ && this.deviceId.equals(classifier.deviceId());
+ }
+
+ /**
+ * Calculates hashCode of the OpenFlow Classifier object.
+ *
+ * @return hash of the OpenFlow Classifier
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, idQueue, ethernetType);
+ }
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierConfig.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierConfig.java
new file mode 100644
index 0000000..8a44589
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierConfig.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.openflow.controller;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Sets;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.Config;
+import org.onlab.packet.EthType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Configuration for classifiers.
+ */
+public class OpenFlowClassifierConfig extends Config<DeviceId> {
+ private static Logger log = LoggerFactory.getLogger(OpenFlowClassifierConfig.class);
+
+ public static final String TARGET_QUEUE = "target-queue";
+ public static final String ETHER_TYPE = "ethernet-type";
+
+ private static final String CONFIG_VALUE_ERROR = "Error parsing config value";
+ private static final String CLASSF_NULL_ERROR = "Classifier cannot be null";
+
+ private short etherValue(String etherType) throws IllegalArgumentException {
+ short etherTypeValue;
+ try {
+ if (etherType.startsWith("0x")) {
+ Integer e = Integer.valueOf(etherType.substring(2), 16);
+ if (e < 0 || e > 0xFFFF) {
+ throw new IllegalArgumentException("EtherType value out of range");
+ }
+ etherTypeValue = e.shortValue();
+ } else {
+ etherTypeValue = EthType.EtherType.valueOf(etherType).ethType().toShort();
+ }
+ } catch (IllegalArgumentException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to parse ethernet type string");
+ }
+ return etherTypeValue;
+ }
+
+ @Override
+ public boolean isValid() {
+ for (JsonNode node : array) {
+ if (!hasOnlyFields((ObjectNode) node, TARGET_QUEUE, ETHER_TYPE)) {
+ return false;
+ }
+
+ ObjectNode obj = (ObjectNode) node;
+
+ if (!(isString(obj, ETHER_TYPE, FieldPresence.MANDATORY) &&
+ isIntegralNumber(obj, TARGET_QUEUE, FieldPresence.MANDATORY, 0, 7))) {
+ return false;
+ }
+
+ try {
+ etherValue(node.path(ETHER_TYPE).asText());
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Retrieves all classifiers configured on this port.
+ *
+ * @return set of classifiers
+ */
+ public Set<OpenFlowClassifier> getClassifiers() {
+ Set<OpenFlowClassifier> classifiers = Sets.newHashSet();
+
+ for (JsonNode classfNode : array) {
+ DeviceId deviceId = this.subject();
+ short ethernetType = etherValue(classfNode.path(ETHER_TYPE).asText());
+ int idQueue = Integer.valueOf(classfNode.path(TARGET_QUEUE).asText());
+
+ OpenFlowClassifier classf =
+ new OpenFlowClassifier.Builder(deviceId, idQueue).ethernetType(ethernetType).build();
+ classifiers.add(classf);
+ }
+
+ return classifiers;
+ }
+
+ /**
+ * Adds a classifier to the config.
+ *
+ * @param classf classifier to add
+ */
+ public void addClassifier(OpenFlowClassifier classf) {
+ checkNotNull(classf, CLASSF_NULL_ERROR);
+ checkArgument(classf.deviceId().equals(this.subject()));
+
+ ObjectNode classfNode = array.addObject();
+
+ EthType.EtherType e = EthType.EtherType.lookup(classf.ethernetType());
+ if (e.equals(EthType.EtherType.UNKNOWN)) {
+ classfNode.put(ETHER_TYPE, String.format("0x%04x", classf.ethernetType()));
+ } else {
+ classfNode.put(ETHER_TYPE, e.name());
+ }
+ classfNode.put(TARGET_QUEUE, classf.idQueue());
+ }
+
+ /**
+ * Removes a classifier from the config.
+ *
+ * @param classf classifier to remove
+ */
+ public void removeClassifier(OpenFlowClassifier classf) {
+ checkNotNull(classf, CLASSF_NULL_ERROR);
+ checkArgument(classf.deviceId().equals(this.subject()));
+
+ Iterator<JsonNode> it = array.iterator();
+ while (it.hasNext()) {
+ JsonNode node = it.next();
+ if (etherValue(node.path(ETHER_TYPE).asText()) == classf.ethernetType()
+ && Integer.valueOf(node.path(TARGET_QUEUE).asText()) == classf.idQueue()) {
+ it.remove();
+ break;
+ }
+ }
+ }
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierListener.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierListener.java
new file mode 100644
index 0000000..895ddd5
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierListener.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+/**
+ * Notifies providers about OpenFlow Classifiers modifications.
+ */
+public interface OpenFlowClassifierListener {
+
+ /**
+ * Handle classifier add action.
+ *
+ * @param classifier the OpenFlow classifier
+ */
+ void handleClassifiersAdd(OpenFlowClassifier classifier);
+
+ /**
+ * Handle classifier remove action.
+ *
+ * @param classifier the OpenFlow classifier
+ */
+ void handleClassifiersRemove(OpenFlowClassifier classifier);
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
index e5d31596..3e153d7 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
@@ -152,4 +152,18 @@
* @param dpid the switch to set the role for.
*/
void setRole(Dpid dpid, RoleState role);
+
+ /**
+ * Remove OpenFlow classifier listener from runtime store of classifiers listener.
+ *
+ * @param listener the OpenFlow classifier to remove
+ */
+ void removeClassifierListener(OpenFlowClassifierListener listener);
+
+ /**
+ * Add OpenFlow classifier listener to runtime store of classifiers listener.
+ *
+ * @param listener the OpenFlow classifier listener
+ */
+ void addClassifierListener(OpenFlowClassifierListener listener);
}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowEvent.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowEvent.java
new file mode 100644
index 0000000..2aeaec1
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Represent OpenFlow Classifiers events.
+ */
+public class OpenFlowEvent extends AbstractEvent<OpenFlowEvent.Type, OpenFlowClassifier> {
+
+ /**
+ * Enum of OpenFlow event type.
+ */
+ public enum Type {
+ /**
+ * Signifies that a new packet classifier has been added.
+ */
+ INSERT,
+ /**
+ * Signifies that a packet classifier has been removed.
+ */
+ REMOVE
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param type the OpenFlow event type
+ * @param subject the OpenFlow update data
+ */
+ public OpenFlowEvent(Type type, OpenFlowClassifier subject) {
+ super(type, subject);
+ }
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowListener.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowListener.java
new file mode 100644
index 0000000..5f1a271
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Notifies providers about OpenFlow Classifiers events.
+ */
+public interface OpenFlowListener extends EventListener<OpenFlowEvent> {
+
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowService.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowService.java
new file mode 100644
index 0000000..b702c23
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowService.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import java.util.Set;
+import org.onosproject.event.ListenerService;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Abstraction of an OpenFlow classifiers management.
+ */
+public interface OpenFlowService extends ListenerService<OpenFlowEvent, OpenFlowListener> {
+
+ /**
+ * Adds the OpenFlow classifier to the information base.
+ *
+ * @param classifier the OpenFlow classifier
+ */
+ void add(OpenFlowClassifier classifier);
+
+ /**
+ * Removes the OpenFlow classifier from the information base.
+ *
+ * @param classifier classifier to remove
+ */
+ void remove(OpenFlowClassifier classifier);
+
+ /**
+ * Gets all OpenFlow classifiers in the system.
+ *
+ * @return set of OpenFlow classifiers
+ */
+ Set<OpenFlowClassifier> getClassifiers();
+
+ /**
+ * Obtains OpenFlow classifiers in the system by device id.
+ *
+ * @param deviceId the device id
+ * @return set of OpenFlow classifiers
+ */
+ Set<OpenFlowClassifier> getClassifiersByDeviceId(DeviceId deviceId);
+
+ /**
+ * Obtains OpenFlow classifiers in the system by device id and number of queue.
+ *
+ * @param deviceId the device id
+ * @param idQueue the queue id
+ * @return set of OpenFlow classifiers
+ */
+ Set<OpenFlowClassifier> getClassifiersByDeviceIdAndQueue(DeviceId deviceId, int idQueue);
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
index c380322..1677e5e 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
@@ -60,5 +60,17 @@
*/
CharSequence sessionInfo();
+ /**
+ * Add classifier to runtime store of classifiers.
+ *
+ * @param classifier the OpenFlow classifier to add
+ */
+ void addClassifier(OpenFlowClassifier classifier);
+ /**
+ * Remove classifier from runtime store of classifiers.
+ *
+ * @param classifier the OpenFlow classifier to remove
+ */
+ void removeClassifier(OpenFlowClassifier classifier);
}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 0ba3bd9..8431068 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.onosproject.openflow.controller.driver;
import com.google.common.base.MoreObjects;
@@ -23,6 +22,8 @@
import org.onosproject.net.Device;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifier;
+import org.onosproject.openflow.controller.OpenFlowClassifierListener;
import org.onosproject.openflow.controller.OpenFlowSession;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
@@ -98,6 +99,8 @@
private OFMeterFeaturesStatsReply meterfeatures;
+ protected OpenFlowClassifierListener classifierListener = new InternalClassifierListener();
+
// messagesPendingMastership is used as synchronization variable for
// all mastership related changes. In this block, mastership (including
// role update) will have either occurred or not.
@@ -301,7 +304,11 @@
@Override
public final boolean connectSwitch() {
- return this.agent.addConnectedSwitch(dpid, this);
+ boolean status = this.agent.addConnectedSwitch(dpid, this);
+ if (status) {
+ this.agent.addClassifierListener(classifierListener);
+ }
+ return status;
}
@Override
@@ -339,6 +346,7 @@
@Override
public final void removeConnectedSwitch() {
this.agent.removeConnectedSwitch(dpid);
+ this.agent.removeClassifierListener(classifierListener);
}
@Override
@@ -566,4 +574,17 @@
.add("dpid", dpid)
.toString();
}
+
+ private class InternalClassifierListener implements OpenFlowClassifierListener {
+
+ @Override
+ public void handleClassifiersAdd(OpenFlowClassifier classifier) {
+ channel.addClassifier(classifier);
+ }
+
+ @Override
+ public void handleClassifiersRemove(OpenFlowClassifier classifier) {
+ channel.removeClassifier(classifier);
+ }
+ }
}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
index 37c43f3..4451b19 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
@@ -16,6 +16,7 @@
package org.onosproject.openflow.controller.driver;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifierListener;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFMessage;
@@ -109,4 +110,18 @@
* @param response role reply from the switch
*/
void returnRoleReply(Dpid dpid, RoleState requested, RoleState response);
+
+ /**
+ * Add OpenFlow classifier listener.
+ *
+ * @param listener the OpenFlow classifier listener
+ */
+ void addClassifierListener(OpenFlowClassifierListener listener);
+
+ /**
+ * Remove OpenFlow classifier listener.
+ *
+ * @param listener the OpenFlow classifier listener
+ */
+ void removeClassifierListener(OpenFlowClassifierListener listener);
}
diff --git a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
index a3b8e5c..5056840 100644
--- a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
+++ b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
@@ -103,4 +103,12 @@
@Override
public void removeEventListener(OpenFlowEventListener listener) {
}
+
+ @Override
+ public void removeClassifierListener(OpenFlowClassifierListener listener) {
+ }
+
+ @Override
+ public void addClassifierListener(OpenFlowClassifierListener listener) {
+ }
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
index 6cf1ddc..c71de08 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.onosproject.openflow.controller.impl;
import com.google.common.base.MoreObjects;
@@ -96,6 +95,12 @@
private static final short MIN_KS_LENGTH = 6;
+ //Default queues settings
+ private static final short DEFAULT_QUEUE_SIZE = 5000;
+ private static final short FIRST_QUEUE_SIZE = 1000;
+ private static final short DEFAULT_BULK_SIZE = 100;
+ private static final short DEFAULT_QUEUE_ID = 7;
+
protected HashMap<String, String> controllerNodeIPsCache;
private ChannelGroup cg;
@@ -103,6 +108,8 @@
// Configuration options
protected List<Integer> openFlowPorts = ImmutableList.of(6633, 6653);
protected int workerThreads = 0;
+ protected int[] cfgQueueSizes = {FIRST_QUEUE_SIZE, 0, 0, 0, 0, 0, 0, DEFAULT_QUEUE_SIZE};
+ protected int[] cfgBulkSizes = new int[8];
// Start time of the controller
protected long systemStartTime;
@@ -129,7 +136,17 @@
private DriverService driverService;
private NetworkConfigRegistry netCfgService;
+ public Controller() {
+ Arrays.fill(cfgBulkSizes, DEFAULT_BULK_SIZE);
+ }
+ public int getQueueSize(int queueId) {
+ return cfgQueueSizes[queueId];
+ }
+
+ public int getBulkSize(int queueId) {
+ return cfgBulkSizes[queueId];
+ }
// **************
// Initialization
@@ -221,6 +238,29 @@
.channel(NioServerSocketChannel.class);
}
+ public void setQueueParams(Dictionary<?, ?> properties, String sizeParamName, String bulkParamName, int queueId) {
+ String queueSize = get(properties, sizeParamName);
+ if (!Strings.isNullOrEmpty(queueSize)) {
+ int size = Integer.parseInt(queueSize);
+ if (size > 0) {
+ this.cfgQueueSizes[queueId] = size;
+ } else {
+ throw new IllegalArgumentException(
+ String.format("%s value must be either a positive integer value", sizeParamName));
+ }
+ }
+ String bulkSize = get(properties, bulkParamName);
+ if (!Strings.isNullOrEmpty(bulkSize)) {
+ int bulk = Integer.parseInt(bulkSize);
+ if (bulk > 0) {
+ this.cfgBulkSizes[queueId] = bulk;
+ } else {
+ throw new IllegalArgumentException(
+ String.format("%s value must be either a positive integer value", bulkParamName));
+ }
+ }
+ }
+
public void setConfigParams(Dictionary<?, ?> properties) {
boolean restartRequired = setOpenFlowPorts(properties);
restartRequired |= setWorkerThreads(properties);
@@ -273,6 +313,16 @@
this.workerThreads = Integer.parseInt(threads);
}
log.debug("Number of worker threads set to {}", this.workerThreads);
+
+ setQueueParams(properties, "defaultQueueSize", "defaultBulkSize", DEFAULT_QUEUE_ID);
+ setQueueParams(properties, "queueSizeN0", "bulkSizeN0", 0);
+ setQueueParams(properties, "queueSizeN1", "bulkSizeN1", 1);
+ setQueueParams(properties, "queueSizeN2", "bulkSizeN2", 2);
+ setQueueParams(properties, "queueSizeN3", "bulkSizeN3", 3);
+ setQueueParams(properties, "queueSizeN4", "bulkSizeN4", 4);
+ setQueueParams(properties, "queueSizeN5", "bulkSizeN5", 5);
+ setQueueParams(properties, "queueSizeN6", "bulkSizeN6", 6);
+
return oldValue != this.workerThreads; // restart if number of threads has changed
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/LinkedBlockingMessagesQueue.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/LinkedBlockingMessagesQueue.java
new file mode 100644
index 0000000..518cfcc
--- /dev/null
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/LinkedBlockingMessagesQueue.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller.impl;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class wrap existing class LinkedBlockingQueue for solution problem
+ * with handling and processed received messages in onos.
+ *
+ * @see java.util.concurrent.LinkedBlockingQueue
+ */
+public class LinkedBlockingMessagesQueue<T> {
+
+ /**
+ * Identifier of queue.
+ */
+ private int idQueue;
+
+ /**
+ * Size of queue.
+ */
+ private int sizeOfQueue;
+
+ /**
+ * Maximal bulk of messages that will be processed.
+ */
+ private int bulk;
+
+ /**
+ * Queue of messages.
+ */
+ private BlockingQueue<T> queue;
+
+ /**
+ * Constructor.
+ *
+ * @param idQueue Identifier of queue
+ * @param sizeOfQueue Size of queue
+ * @param bulk Maximal bulk of messages that will be processed
+ */
+ public LinkedBlockingMessagesQueue(int idQueue, int sizeOfQueue, int bulk) {
+ this.idQueue = idQueue;
+ this.sizeOfQueue = sizeOfQueue;
+ this.queue = new LinkedBlockingQueue<>(this.sizeOfQueue);
+ this.bulk = bulk;
+ }
+
+ /**
+ * Returns the identifier of this queue.
+ *
+ * @return the id of this queue
+ */
+ public int idQueue() {
+ return idQueue;
+ }
+
+ /**
+ * Return the size of this queue.
+ *
+ * @return the size of this queue
+ */
+ public int sizeOfQueue() {
+ return sizeOfQueue;
+ }
+
+ /**
+ * Set size for this queue.
+ *
+ * @param sizeOfQueue Size of queue
+ */
+ public void setSizeOfQueue(int sizeOfQueue) {
+ this.sizeOfQueue = sizeOfQueue;
+ this.queue = new LinkedBlockingQueue<>(this.sizeOfQueue);
+ }
+
+ /**
+ * Offer new message to this queue.
+ *
+ * @param message elemet to add
+ * @return <code>true</code> if the element was added to this queue, else <code>false</code>
+ */
+ public boolean offer(T message) {
+ return this.queue.offer(message);
+ }
+
+ /**
+ * Transfer bulk of elements from this queue to the <code>messages</code> collection.
+ *
+ * @param messages the collection to transfer bulk of elements from this queue
+ * @return the numbers of elements transfered
+ */
+ public int drainTo(Collection<? super T> messages) {
+ return this.queue.drainTo(messages, this.bulk);
+ }
+
+ /**
+ * Return the elements count in this queue.
+ *
+ * @return the elements count
+ */
+ public int size() {
+ return this.queue.size();
+ }
+
+ /**
+ * Return the maximal bulk of messages for this queue.
+ *
+ * @return maximal bulk of messages that will be processed
+ */
+ public int bulk() {
+ return bulk;
+ }
+
+ /**
+ * Set the maximal bulk of messages for this queue.
+ *
+ * @param bulk Maximal bulk of messages that will be processed
+ */
+ public void setBulk(int bulk) {
+ this.bulk = bulk;
+ }
+
+}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 264629e..5a95188 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -16,30 +16,51 @@
package org.onosproject.openflow.controller.impl;
+import static org.onlab.packet.Ethernet.TYPE_BSN;
+import static org.onlab.packet.Ethernet.TYPE_LLDP;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.openflow.controller.Dpid.uri;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.security.cert.Certificate;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.packet.Ethernet;
import org.onlab.packet.IpAddress;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.packet.DefaultInboundPacket;
+import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifier;
+import org.onosproject.openflow.controller.OpenFlowPacketContext;
+import org.onosproject.openflow.controller.OpenFlowService;
import org.onosproject.openflow.controller.OpenFlowSession;
import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
import org.onosproject.openflow.controller.driver.SwitchStateException;
@@ -106,11 +127,12 @@
private static final String RESET_BY_PEER = "Connection reset by peer";
private static final String BROKEN_PIPE = "Broken pipe";
+ private static final int NUM_OF_QUEUES = 8;
private final Controller controller;
private OpenFlowSwitchDriver sw;
private long thisdpid; // channelHandler cached value of connected switch id
-
+ private DeviceId deviceId;
private Channel channel;
private String channelId;
@@ -152,15 +174,38 @@
*/
private int handshakeTransactionIds = -1;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private OpenFlowService openFlowManager;
-
- private static final int MSG_READ_BUFFER = 5000;
+ private static final int BACKLOG_READ_BUFFER_DEFAULT = 1000;
/**
- * OFMessage dispatch queue.
+ * Map with all LinkedBlockingMessagesQueue queues which contains OFMessages.
*/
- private final BlockingQueue<OFMessage> dispatchQueue =
- new LinkedBlockingQueue<>(MSG_READ_BUFFER);
+ private Map<Integer, LinkedBlockingMessagesQueue<OFMessage>> dispatchQueuesMapProducer = new ConcurrentHashMap<>();
+
+ /**
+ * OFMessage classifiers map.
+ */
+ private List<Set<OpenFlowClassifier>> messageClassifiersMapProducer =
+ new CopyOnWriteArrayList<Set<OpenFlowClassifier>>();
+
+
+ /**
+ * Lock held by take, poll, etc.
+ */
+ private final ReentrantLock takeLock = new ReentrantLock();
+
+ /**
+ * Wait queue for waiting takes.
+ */
+ private final Condition notEmpty = takeLock.newCondition();
+
+ /**
+ * Current number of elements in enabled sub-queues.
+ */
+ private final AtomicInteger totalCount = new AtomicInteger();
+
/**
* Single thread executor for OFMessage dispatching.
@@ -181,7 +226,7 @@
* <p>
* Should only be touched from the Channel I/O thread
*/
- private final Deque<OFMessage> dispatchBacklog = new ArrayDeque<>();
+ private final Deque<OFMessage> dispatchBacklog;
/**
* Create a new unconnected OFChannelHandler.
@@ -194,6 +239,17 @@
this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
this.portDescReplies = new ArrayList<>();
duplicateDpidFound = Boolean.FALSE;
+ //Initialize queues and classifiers
+ dispatchBacklog = new LinkedBlockingDeque<>(BACKLOG_READ_BUFFER_DEFAULT);
+ for (int i = 0; i < NUM_OF_QUEUES; i++) {
+ if (controller.getQueueSize(i) > 0) {
+ dispatchQueuesMapProducer.put(i,
+ new LinkedBlockingMessagesQueue<>(i, controller.getQueueSize(i), controller.getBulkSize(i)));
+ }
+ if (i != NUM_OF_QUEUES) {
+ messageClassifiersMapProducer.add(i, new CopyOnWriteArraySet<>());
+ }
+ }
}
@@ -324,6 +380,7 @@
h.channel.disconnect();
return;
}
+ h.deviceId = DeviceId.deviceId(uri(h.thisdpid));
log.debug("Received features reply for switch at {} with dpid {}",
h.getSwitchInfoString(), h.thisdpid);
@@ -461,6 +518,7 @@
throws IOException, SwitchStateException {
illegalMessageReceived(h, m);
}
+
@Override
void processOFStatisticsReply(OFChannelHandler h,
OFStatsReply m)
@@ -564,6 +622,8 @@
if (h.sw.isDriverHandshakeComplete()) {
if (!h.sw.connectSwitch()) {
disconnectDuplicate(h);
+ } else {
+ h.initClassifiers();
}
handlePendingPortStatusMessages(h);
h.setState(ACTIVE);
@@ -1421,63 +1481,152 @@
/**
* Is this a state in which the handshake has completed?
+ *
* @return true if the handshake is complete
*/
public boolean isHandshakeComplete() {
return this.state.isHandshakeComplete();
}
- private void dispatchMessage(OFMessage m) {
-
- if (dispatchBacklog.isEmpty()) {
- if (!dispatchQueue.offer(m)) {
- // queue full
- channel.config().setAutoRead(false);
- // put it on the head of backlog
- dispatchBacklog.addFirst(m);
- return;
+ /**
+ * Increment totalCount variable and send signal to executor.
+ */
+ private void incrementAndSignal() {
+ try {
+ totalCount.incrementAndGet();
+ takeLock.lockInterruptibly();
+ try {
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
}
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Try to push OpenFlow message to queue.
+ *
+ * @param message OpenFlow message
+ * @param idQueue id of Queue
+ * @return true if message was successful added to queue
+ */
+ private boolean pushMessageToQueue(OFMessage message, int idQueue) {
+ if (!dispatchQueuesMapProducer.get(idQueue).offer(message)) {
+ return false;
} else {
- dispatchBacklog.addLast(m);
+ incrementAndSignal();
+ return true;
}
+ }
+ /**
+ * Process backlog - move messages from backlog to default queue.
+ *
+ * @return true if whole backlog was processed, otherwise false
+ */
+ private boolean processDispatchBacklogQueue() {
while (!dispatchBacklog.isEmpty()) {
- OFMessage msg = dispatchBacklog.pop();
-
- if (!dispatchQueue.offer(msg)) {
- // queue full
- channel.config().setAutoRead(false);
- // put it back to the head of backlog
- dispatchBacklog.addFirst(msg);
- return;
+ OFMessage msgFromBacklog = dispatchBacklog.removeFirst();
+ if (!pushMessageToQueue(msgFromBacklog, NUM_OF_QUEUES - 1)) {
+ dispatchBacklog.addFirst(msgFromBacklog);
+ return false;
}
}
+ return true;
+ }
+
+ /**
+ * Parse OpenFlow message context for get Ethernet packet.
+ *
+ * @param message OpenFlow message
+ * @return parsed Ethernet packet
+ */
+ private Ethernet parsePacketInMessage(OFMessage message) {
+ OpenFlowPacketContext pktCtx = DefaultOpenFlowPacketContext
+ .packetContextFromPacketIn(sw, (OFPacketIn) message);
+ DeviceId id = DeviceId.deviceId(Dpid.uri(pktCtx.dpid().value()));
+ DefaultInboundPacket inPkt = new DefaultInboundPacket(
+ new ConnectPoint(id, PortNumber.portNumber(pktCtx.inPort())),
+ pktCtx.parsed(), ByteBuffer.wrap(pktCtx.unparsed()),
+ pktCtx.cookie());
+ return inPkt.parsed();
+ }
+
+ /**
+ * Classify the Ethernet packet for membership on one of the queues.
+ *
+ * @param packet ethernet packet
+ * @return Id of destination Queue
+ */
+ private int classifyEthernetPacket(Ethernet packet) {
+ for (Set<OpenFlowClassifier> classifiers : this.messageClassifiersMapProducer) {
+ for (OpenFlowClassifier classifier : classifiers) {
+ if (classifier.ethernetType() == packet.getEtherType()) {
+ return classifier.idQueue();
+ }
+ }
+ }
+ return NUM_OF_QUEUES - 1;
+ }
+
+ /**
+ * Process messages from dispatch queues.
+ *
+ * @param queuesSize count of messages in all queues
+ */
+ private void processMessages(int queuesSize) {
+ List<OFMessage> msgs = new ArrayList<>();
+ int processed;
+ do {
+ processed = 0;
+ while (processed < queuesSize) {
+ for (LinkedBlockingMessagesQueue<OFMessage> queue :
+ dispatchQueuesMapProducer.values()) {
+ processed += queue.drainTo(msgs);
+ }
+ }
+
+ msgs.forEach(sw::handleMessage);
+ msgs.clear();
+ /* Decrement conditional variable */
+ queuesSize = totalCount.addAndGet(-1 * processed);
+ } while (queuesSize > 0);
+ }
+
+ private void dispatchMessage(OFMessage m) {
+ log.debug("Begin dispatch OpenFlow Message");
+ boolean backlogEmpty = processDispatchBacklogQueue();
+ if (m.getType() == OFType.PACKET_IN) {
+ Ethernet pkt = parsePacketInMessage(m);
+ pushMessageToQueue(m, classifyEthernetPacket(pkt));
+ } else {
+ if (!backlogEmpty || !pushMessageToQueue(m, NUM_OF_QUEUES - 1)) {
+ dispatchBacklog.offer(m);
+ }
+ }
if (dispatcherHandle.isDone()) {
// dispatcher terminated for some reason, restart
-
dispatcherHandle = dispatcher.submit((Runnable) () -> {
try {
- List<OFMessage> msgs = new ArrayList<>();
for (;;) {
- // wait for new message
- OFMessage msg = dispatchQueue.take();
- sw.handleMessage(msg);
-
- while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
- if (!channel.config().isAutoRead()) {
- channel.config().setAutoRead(true);
+ int tc = 0;
+ takeLock.lockInterruptibly();
+ try {
+ while ((tc = totalCount.get()) == 0) {
+ notEmpty.await();
}
- msgs.forEach(sw::handleMessage);
- msgs.clear();
+ } finally {
+ takeLock.unlock();
}
- if (!channel.config().isAutoRead()) {
- channel.config().setAutoRead(true);
- }
+ processMessages(tc);
}
} catch (InterruptedException e) {
+ log.error("executor thread InterruptedException: {}", e);
Thread.currentThread().interrupt();
// interrupted. gracefully shutting down
return;
@@ -1697,4 +1846,50 @@
return channelId;
}
+ @Override
+ public void addClassifier(OpenFlowClassifier classifier) {
+ if (this.deviceId.equals(classifier.deviceId())) {
+ log.debug("Add OpenFlow Classifier for switch {} to queue {} with type {}",
+ classifier.deviceId().toString(), classifier.idQueue(), classifier.ethernetType());
+ this.messageClassifiersMapProducer.get(classifier.idQueue()).add(classifier);
+ }
+ }
+
+ @Override
+ public void removeClassifier(OpenFlowClassifier classifier) {
+ if (this.deviceId.equals(classifier.deviceId())) {
+ log.debug("Remove OpenFlow Classifier for switch {} from queue {} with type {}",
+ classifier.deviceId().toString(), classifier.idQueue(), classifier.ethernetType());
+ this.messageClassifiersMapProducer.get(classifier.idQueue()).remove(classifier);
+ }
+ }
+
+ /**
+ * Init classifier configuration for the switch. Use stored configuration if exist.
+ * Otherwise add LLDP and BDDP classifiers for Queue N0.
+ */
+ private void initClassifiers() {
+ try {
+ openFlowManager = DefaultServiceDirectory.getService(OpenFlowService.class);
+ DeviceId did = DeviceId.deviceId(uri(thisdpid));
+ Set<OpenFlowClassifier> classifiers = openFlowManager.getClassifiersByDeviceId(did);
+ if (classifiers == null) {
+ OpenFlowClassifier classifier =
+ new OpenFlowClassifier.Builder(did, 0).ethernetType(TYPE_LLDP).build();
+ openFlowManager.add(classifier);
+ classifier = new OpenFlowClassifier.Builder(did, 0).ethernetType(TYPE_BSN).build();
+ openFlowManager.add(classifier);
+ } else {
+ this.messageClassifiersMapProducer.forEach((v) -> {
+ v.clear();
+ });
+ classifiers.forEach((c) -> {
+ messageClassifiersMapProducer.get(c.idQueue()).add(c);
+ });
+ }
+ } catch (Exception e) {
+ log.error("Initialize default classifier failed: {}", e.toString());
+ e.printStackTrace();
+ }
+ }
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
index b03f007..a78e6a6 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
@@ -23,6 +23,7 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.EventExecutorGroup;
@@ -76,6 +77,8 @@
pipeline.addLast("ofmessageencoder", OFMessageEncoder.getInstance());
pipeline.addLast("ofmessagedecoder", OFMessageDecoder.getInstance());
+ pipeline.addLast("consolidateflush", new FlushConsolidationHandler(
+ FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true));
pipeline.addLast("idle", new IdleStateHandler(20, 25, 0));
pipeline.addLast("timeout", new ReadTimeoutHandler(30));
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowClassifierManager.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowClassifierManager.java
new file mode 100644
index 0000000..c4cd4b2
--- /dev/null
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowClassifierManager.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.ImmutableSet;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.net.DeviceId;
+import org.onosproject.openflow.controller.OpenFlowClassifier;
+import org.onosproject.openflow.controller.OpenFlowClassifierConfig;
+import org.onosproject.openflow.controller.OpenFlowEvent;
+import org.onosproject.openflow.controller.OpenFlowListener;
+import org.onosproject.openflow.controller.OpenFlowService;
+import org.slf4j.Logger;
+
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigEvent;
+import static org.onosproject.net.config.basics.SubjectFactories.DEVICE_SUBJECT_FACTORY;
+
+/**
+ * Manages the inventory of OpenFlow Classifiers in the system.
+ */
+@Component(immediate = true, service = OpenFlowService.class)
+public class OpenFlowClassifierManager extends ListenerRegistry<OpenFlowEvent, OpenFlowListener>
+ implements OpenFlowService {
+
+ private Logger log = getLogger(getClass());
+
+ private final InternalConfigListener listener = new InternalConfigListener();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected NetworkConfigService cfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected NetworkConfigRegistry cfgRegistry;
+
+ private static final String FEATURE_NAME = "classifiers";
+ private final Set<ConfigFactory<?, ?>> factories = ImmutableSet.of(
+ new ConfigFactory<DeviceId, OpenFlowClassifierConfig>(DEVICE_SUBJECT_FACTORY,
+ OpenFlowClassifierConfig.class, FEATURE_NAME,
+ true) {
+ @Override
+ public OpenFlowClassifierConfig createConfig() {
+ return new OpenFlowClassifierConfig();
+ }
+ }
+ );
+
+ private final Map<DeviceId, Set<OpenFlowClassifier>> classifiersMap = Maps.newConcurrentMap();
+
+ @Activate
+ private void activate() {
+ factories.forEach(cfgRegistry::registerConfigFactory);
+ cfgService.addListener(listener);
+
+ for (DeviceId subject : cfgService.getSubjects(DeviceId.class, OpenFlowClassifierConfig.class)) {
+ OpenFlowClassifierConfig config = cfgService.getConfig(subject, OpenFlowClassifierConfig.class);
+
+ if (config != null) {
+ updateClassifiers(config);
+ }
+ }
+
+ log.info("Started Openflow Manager");
+ }
+
+ @Deactivate
+ private void deactivate() {
+ cfgService.removeListener(listener);
+
+ factories.forEach(cfgRegistry::unregisterConfigFactory);
+ log.info("Stopped Openflow manager");
+
+ }
+
+ @Override
+ public void add(OpenFlowClassifier classifier) {
+ checkNotNull(classifier, "Classifier cannot be null");
+
+ OpenFlowClassifierConfig config =
+ cfgService.addConfig(classifier.deviceId(), OpenFlowClassifierConfig.class);
+ config.addClassifier(classifier);
+ cfgService.applyConfig(classifier.deviceId(), OpenFlowClassifierConfig.class, config.node());
+ }
+
+ @Override
+ public void remove(OpenFlowClassifier classifier) {
+ checkNotNull(classifier, "Classifier cannot be null");
+
+ OpenFlowClassifierConfig config = cfgService.getConfig(classifier.deviceId(), OpenFlowClassifierConfig.class);
+
+ if (config == null) {
+ return;
+ }
+
+ config.removeClassifier(classifier);
+ cfgService.applyConfig(classifier.deviceId(), OpenFlowClassifierConfig.class, config.node());
+ }
+
+ @Override
+ public Set<OpenFlowClassifier> getClassifiers() {
+ Set<OpenFlowClassifier> classifiers = Sets.newHashSet();
+
+ classifiersMap.values().forEach(c -> classifiers.addAll(c));
+
+ return classifiers;
+ }
+
+ @Override
+ public Set<OpenFlowClassifier> getClassifiersByDeviceId(DeviceId deviceId) {
+ return classifiersMap.get(deviceId);
+ }
+
+ @Override
+ public Set<OpenFlowClassifier> getClassifiersByDeviceIdAndQueue(DeviceId deviceId, int idQueue) {
+ Set<OpenFlowClassifier> classifiers = classifiersMap.get(deviceId);
+ if (classifiers == null) {
+ return null;
+ } else {
+ return classifiers.stream()
+ .filter(p -> p.idQueue() == idQueue)
+ .collect(Collectors.toSet());
+ }
+ }
+
+ private void updateClassifiers(OpenFlowClassifierConfig classfConfig) {
+ Set<OpenFlowClassifier> old = classifiersMap.put(classfConfig.subject(),
+ Sets.newHashSet(classfConfig.getClassifiers()));
+
+ if (old == null) {
+ old = Collections.emptySet();
+ }
+
+ for (OpenFlowClassifier classf : classfConfig.getClassifiers()) {
+ if (old.contains(classf)) {
+ old.remove(classf);
+ } else {
+ process(new OpenFlowEvent(OpenFlowEvent.Type.INSERT, classf));
+ }
+ }
+
+ for (OpenFlowClassifier classf : old) {
+ process(new OpenFlowEvent(OpenFlowEvent.Type.REMOVE, classf));
+ }
+ }
+
+ private void removeClassifiers(DeviceId deviceId) {
+ Set<OpenFlowClassifier> old = classifiersMap.remove(deviceId);
+
+ for (OpenFlowClassifier classf : old) {
+ process(new OpenFlowEvent(OpenFlowEvent.Type.REMOVE, classf));
+ }
+ }
+
+ /**
+ * Listener for network config events.
+ */
+ private class InternalConfigListener implements NetworkConfigListener {
+
+ @Override
+ public void event(NetworkConfigEvent event) {
+ if (event.configClass() == OpenFlowClassifierConfig.class) {
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ case CONFIG_UPDATED:
+ event.config().ifPresent(config -> updateClassifiers((OpenFlowClassifierConfig) config));
+ break;
+ case CONFIG_REMOVED:
+ removeClassifiers((DeviceId) event.subject());
+ break;
+ case CONFIG_REGISTERED:
+ case CONFIG_UNREGISTERED:
+ default:
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index ee9422f..4696778 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -30,12 +30,16 @@
import org.onosproject.openflow.config.OpenFlowDeviceConfig;
import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifierListener;
import org.onosproject.openflow.controller.OpenFlowController;
import org.onosproject.openflow.controller.OpenFlowEventListener;
import org.onosproject.openflow.controller.OpenFlowMessageListener;
import org.onosproject.openflow.controller.OpenFlowPacketContext;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.OpenFlowSwitchListener;
+import org.onosproject.openflow.controller.OpenFlowListener;
+import org.onosproject.openflow.controller.OpenFlowService;
+import org.onosproject.openflow.controller.OpenFlowEvent;
import org.onosproject.openflow.controller.PacketListener;
import org.onosproject.openflow.controller.RoleState;
import org.onosproject.openflow.controller.driver.OpenFlowAgent;
@@ -104,8 +108,25 @@
KEY_STORE_PASSWORD + "=" + KEY_STORE_PASSWORD_DEFAULT,
TRUST_STORE + "=" + TRUST_STORE_DEFAULT,
TRUST_STORE_PASSWORD + "=" + TRUST_STORE_PASSWORD_DEFAULT,
+ DEFAULT_QUEUE_SIZE + ":Integer=" + DEFAULT_QUEUE_SIZE_DEFAULT,
+ DEBAULT_BULK_SIZE + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N0 + ":Integer=" + QUEUE_SIZE_N0_DEFAULT,
+ BULK_SIZE_N0 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N1 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N1 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N2 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N2 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N3 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N3 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N4 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N4 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N5 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N5 + ":Integer=" + BULK_SIZE_DEFAULT,
+ QUEUE_SIZE_N6 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+ BULK_SIZE_N6 + ":Integer=" + BULK_SIZE_DEFAULT,
}
)
+
public class OpenFlowControllerImpl implements OpenFlowController {
private static final String APP_ID = "org.onosproject.openflow-base";
protected static final String SCHEME = "of";
@@ -131,12 +152,17 @@
/** Number of controller worker threads. */
private int workerThreads = WORKER_THREADS_DEFAULT;
- /** TLS mode for OpenFlow channel; options are: disabled [default], enabled, strict. */
+ /** TLS mode for OpenFlow channel; options are: disabled [default], enabled, strict. */
private String tlsMode;
/** File path to key store for TLS connections. */
private String keyStore;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected OpenFlowService openFlowManager;
+
+ private final OpenFlowListener openFlowListener = new InternalOpenFlowListener();
+
/** Key store password. */
private String keyStorePassword;
@@ -146,6 +172,54 @@
/** Trust store password. */
private String trustStorePassword;
+ /** Size of deafult queue. */
+ private int defaultQueueSize = DEFAULT_QUEUE_SIZE_DEFAULT;
+
+ /** Size of deafult bulk. */
+ private int defaultBulkSize = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N0. */
+ private int queueSizeN0 = QUEUE_SIZE_N0_DEFAULT;
+
+ /** Size of bulk N0. */
+ private int bulkSizeN0 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N1. */
+ private int queueSizeN1 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N1. */
+ private int bulkSizeN1 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N2. */
+ private int queueSizeN2 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N2. */
+ private int bulkSizeN2 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N3. */
+ private int queueSizeN3 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N3. */
+ private int bulkSizeN3 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N4. */
+ private int queueSizeN4 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N4. */
+ private int bulkSizeN4 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N5. */
+ private int queueSizeN5 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N5. */
+ private int bulkSizeN5 = BULK_SIZE_DEFAULT;
+
+ /** Size of queue N6. */
+ private int queueSizeN6 = QUEUE_SIZE_DEFAULT;
+
+ /** Size of bulk N6. */
+ private int bulkSizeN6 = BULK_SIZE_DEFAULT;
+
protected ExecutorService executorMsgs =
Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d", log));
@@ -179,6 +253,8 @@
protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
+ protected Set<OpenFlowClassifierListener> ofClassifierListener = new CopyOnWriteArraySet<>();
+
protected Set<OpenFlowMessageListener> ofMessageListener = new CopyOnWriteArraySet<>();
protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
@@ -266,6 +342,7 @@
netCfgService.addListener(netCfgListener);
ctrl.setConfigParams(context.getProperties());
ctrl.start(agent, driverService, netCfgService);
+ openFlowManager.addListener(openFlowListener);
}
private void cleanup() {
@@ -276,6 +353,7 @@
connectedSwitches.clear();
activeMasterSwitches.clear();
activeEqualSwitches.clear();
+ openFlowManager.removeListener(openFlowListener);
}
@Deactivate
@@ -334,6 +412,16 @@
}
@Override
+ public void addClassifierListener(OpenFlowClassifierListener listener) {
+ this.ofClassifierListener.add(listener);
+ }
+
+ @Override
+ public void removeClassifierListener(OpenFlowClassifierListener listener) {
+ this.ofClassifierListener.remove(listener);
+ }
+
+ @Override
public void addMessageListener(OpenFlowMessageListener listener) {
ofMessageListener.add(listener);
}
@@ -840,6 +928,16 @@
l.receivedRoleReply(dpid, requested, response);
}
}
+
+ @Override
+ public void addClassifierListener(OpenFlowClassifierListener listener) {
+ ofClassifierListener.add(listener);
+ }
+
+ @Override
+ public void removeClassifierListener(OpenFlowClassifierListener listener) {
+ ofClassifierListener.remove(listener);
+ }
}
/**
@@ -862,4 +960,28 @@
}
}
}
+
+ private class InternalOpenFlowListener implements OpenFlowListener {
+ public void event(OpenFlowEvent event) {
+ try {
+ switch (event.type()) {
+ case INSERT:
+ for (OpenFlowClassifierListener listener : ofClassifierListener) {
+ listener.handleClassifiersAdd(event.subject());
+ }
+ break;
+ case REMOVE:
+ for (OpenFlowClassifierListener listener : ofClassifierListener) {
+ listener.handleClassifiersRemove(event.subject());
+ }
+ break;
+ default:
+ log.warn("Unknown OpenFlow classifier event type: {}", event.type());
+ break;
+ }
+ } catch (Exception e) {
+ log.error("Internal OpenFlowListener exception: {}", e.getMessage());
+ }
+ }
+ }
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OsgiPropertyConstants.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OsgiPropertyConstants.java
index a152239..9dc3809 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OsgiPropertyConstants.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OsgiPropertyConstants.java
@@ -46,4 +46,26 @@
public static final String TRUST_STORE_PASSWORD = "trustStorePassword";
public static final String TRUST_STORE_PASSWORD_DEFAULT = "";
+ public static final String DEFAULT_QUEUE_SIZE = "defaultQueueSize";
+ public static final String DEBAULT_BULK_SIZE = "defaultBulkSize";
+ public static final String QUEUE_SIZE_N0 = "queueSizeN0";
+ public static final String BULK_SIZE_N0 = "bulkSizeN0";
+ public static final String QUEUE_SIZE_N1 = "queueSizeN1";
+ public static final String BULK_SIZE_N1 = "bulkSizeN1";
+ public static final String QUEUE_SIZE_N2 = "queueSizeN2";
+ public static final String BULK_SIZE_N2 = "bulkSizeN2";
+ public static final String QUEUE_SIZE_N3 = "queueSizeN3";
+ public static final String BULK_SIZE_N3 = "bulkSizeN3";
+ public static final String QUEUE_SIZE_N4 = "queueSizeN4";
+ public static final String BULK_SIZE_N4 = "bulkSizeN4";
+ public static final String QUEUE_SIZE_N5 = "queueSizeN5";
+ public static final String BULK_SIZE_N5 = "bulkSizeN5";
+ public static final String QUEUE_SIZE_N6 = "queueSizeN6";
+ public static final String BULK_SIZE_N6 = "bulkSizeN6";
+
+ public static final int DEFAULT_QUEUE_SIZE_DEFAULT = 5000;
+ public static final int QUEUE_SIZE_N0_DEFAULT = 1000;
+ public static final int BULK_SIZE_DEFAULT = 100;
+ public static final int QUEUE_SIZE_DEFAULT = 1;
+
}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplTest.java
index 16fea6b..f853b98 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplTest.java
@@ -38,6 +38,7 @@
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.OpenFlowSwitchListener;
import org.onosproject.openflow.controller.RoleState;
+import org.onosproject.openflow.controller.OpenFlowService;
import org.osgi.service.component.ComponentContext;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
@@ -139,6 +140,10 @@
EasyMock.createMock(CoreService.class);
controller.coreService = mockCoreService;
+ OpenFlowService mockOpenFlowService =
+ EasyMock.createMock(OpenFlowService.class);
+ controller.openFlowManager = mockOpenFlowService;
+
ComponentConfigService mockConfigService =
EasyMock.createMock(ComponentConfigService.class);
expect(mockConfigService.getProperties(anyObject())).andReturn(ImmutableSet.of());