Use typed queues for OF message processing

Process OF messages through 8 queues. Output queue for messages
controlled per OF Agent with help of message classifiers.

Queues can be configured through component configuration mechanism
for "org.onosproject.openflow.controller.impl.OpenFlowControllerImpl"
component.

Classifiers can be configured through NetworkConfig API in the following
form:

      {
        "devices": {
           "of:0000000000000001": {
               "classifiers": [{
                   "ethernet-type":"LLDP",
                   "target-queue":0
               },{
                   "ethernet-type":"BDDP",
                   "target-queue":0
               },{
                   "ethernet-type":"0x1234",
                   "target-queue":1
               }]
           }
        }
      }

Where "target_queue" is queue number from 0 to 7 (7 is default queue),
"ethernet_type" is a type of a packet either in "0xFFFF" from or enum
name as defined in the "org.onlab.packet.EthType.EtherType" enum.

Change-Id: I0512ef653d90c36f00289014872170c1a8aa5204
diff --git a/protocols/openflow/api/BUILD b/protocols/openflow/api/BUILD
index bc5f55a..5e113b2 100644
--- a/protocols/openflow/api/BUILD
+++ b/protocols/openflow/api/BUILD
@@ -1,4 +1,4 @@
-COMPILE_DEPS = CORE_DEPS + NETTY + [
+COMPILE_DEPS = CORE_DEPS + NETTY + JACKSON + [
     "//core/common:onos-core-common",
     "@openflowj//jar",
     "@io_netty_netty_transport//jar",
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifier.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifier.java
new file mode 100644
index 0000000..6b746a0
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifier.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import org.onosproject.net.DeviceId;
+import java.util.Objects;
+
+/**
+ * Represents to OpenFlow messages classifier.
+ */
+public final class OpenFlowClassifier {
+
+    private final short ethernetType;
+    private final int idQueue;
+    private final DeviceId deviceId;
+
+    /**
+     * Builder of the OpenFlow classifier.
+     */
+    public static class Builder {
+        private Short ethernetType = 0;
+        private int idQueue;
+        private DeviceId deviceId;
+
+        /**
+         * Builder constructor for OpenFlow classifier.
+         *
+         * @param deviceId the device id
+         * @param idQueue  the queue id
+         */
+        public Builder(DeviceId deviceId, int idQueue) {
+            this.deviceId = deviceId;
+            this.idQueue = idQueue;
+        }
+
+        /**
+         * Sets the ethernet type for the OpenFlow classifier that will be built.
+         *
+         * @param ethernetType the ethernet type
+         * @return this builder
+         */
+        public Builder ethernetType(short ethernetType) {
+            this.ethernetType = ethernetType;
+            return this;
+        }
+
+        /**
+         * Builds the OpenFlow classifier from the accumulated parameters.
+         *
+         * @return OpenFlow classifier instance
+         */
+        public OpenFlowClassifier build() {
+            return new OpenFlowClassifier(this);
+        }
+    }
+
+    private OpenFlowClassifier(Builder builder) {
+        this.idQueue = builder.idQueue;
+        this.ethernetType = builder.ethernetType;
+        this.deviceId = builder.deviceId;
+    }
+
+    /**
+     * Gets the ethernet type matched by the classifier.
+     *
+     * @return matched packet ethernet type
+     */
+    public short ethernetType() {
+        return this.ethernetType;
+    }
+
+    /**
+     * Gets the id of source OpenFlow device matched by the classifier.
+     *
+     * @return connected device id
+     */
+    public DeviceId deviceId() {
+        return this.deviceId;
+    }
+
+    /**
+     * Gets the queue id targeted by the classifier.
+     *
+     * @return target queue id
+     */
+    public int idQueue() {
+        return this.idQueue;
+    }
+
+    /**
+     * Compares OpenFlow classifiers.
+     *
+     * @param o object that we want to compare to
+     * @return equality check result
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof OpenFlowClassifier)) {
+            return false;
+        }
+        OpenFlowClassifier classifier = (OpenFlowClassifier) o;
+        return this.ethernetType == classifier.ethernetType()
+               && this.idQueue == classifier.idQueue()
+               && this.deviceId.equals(classifier.deviceId());
+    }
+
+    /**
+     * Calculates hashCode of the OpenFlow Classifier object.
+     *
+     * @return hash of the OpenFlow Classifier
+     */
+    @Override
+    public int hashCode() {
+        return Objects.hash(deviceId, idQueue, ethernetType);
+    }
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierConfig.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierConfig.java
new file mode 100644
index 0000000..8a44589
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierConfig.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.openflow.controller;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Sets;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.Config;
+import org.onlab.packet.EthType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Configuration for classifiers.
+ */
+public class OpenFlowClassifierConfig extends Config<DeviceId> {
+    private static Logger log = LoggerFactory.getLogger(OpenFlowClassifierConfig.class);
+
+    public static final String TARGET_QUEUE = "target-queue";
+    public static final String ETHER_TYPE = "ethernet-type";
+
+    private static final String CONFIG_VALUE_ERROR = "Error parsing config value";
+    private static final String CLASSF_NULL_ERROR = "Classifier cannot be null";
+
+    private short etherValue(String etherType) throws IllegalArgumentException {
+        short etherTypeValue;
+        try {
+            if (etherType.startsWith("0x")) {
+                Integer e = Integer.valueOf(etherType.substring(2), 16);
+                if (e < 0 || e > 0xFFFF) {
+                    throw new IllegalArgumentException("EtherType value out of range");
+                }
+                etherTypeValue = e.shortValue();
+            } else {
+                etherTypeValue = EthType.EtherType.valueOf(etherType).ethType().toShort();
+            }
+        } catch (IllegalArgumentException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Failed to parse ethernet type string");
+        }
+        return etherTypeValue;
+    }
+
+    @Override
+    public boolean isValid() {
+        for (JsonNode node : array) {
+            if (!hasOnlyFields((ObjectNode) node, TARGET_QUEUE, ETHER_TYPE)) {
+                return false;
+            }
+
+            ObjectNode obj = (ObjectNode) node;
+
+            if (!(isString(obj, ETHER_TYPE, FieldPresence.MANDATORY) &&
+                  isIntegralNumber(obj, TARGET_QUEUE, FieldPresence.MANDATORY, 0, 7))) {
+                return false;
+            }
+
+            try {
+                etherValue(node.path(ETHER_TYPE).asText());
+            } catch (Exception e) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Retrieves all classifiers configured on this port.
+     *
+     * @return set of classifiers
+     */
+    public Set<OpenFlowClassifier> getClassifiers() {
+        Set<OpenFlowClassifier> classifiers = Sets.newHashSet();
+
+        for (JsonNode classfNode : array) {
+            DeviceId deviceId = this.subject();
+            short ethernetType = etherValue(classfNode.path(ETHER_TYPE).asText());
+            int idQueue = Integer.valueOf(classfNode.path(TARGET_QUEUE).asText());
+
+            OpenFlowClassifier classf =
+                new OpenFlowClassifier.Builder(deviceId, idQueue).ethernetType(ethernetType).build();
+            classifiers.add(classf);
+        }
+
+        return classifiers;
+    }
+
+    /**
+     * Adds a classifier to the config.
+     *
+     * @param classf classifier to add
+     */
+    public void addClassifier(OpenFlowClassifier classf) {
+        checkNotNull(classf, CLASSF_NULL_ERROR);
+        checkArgument(classf.deviceId().equals(this.subject()));
+
+        ObjectNode classfNode = array.addObject();
+
+        EthType.EtherType e = EthType.EtherType.lookup(classf.ethernetType());
+        if (e.equals(EthType.EtherType.UNKNOWN)) {
+            classfNode.put(ETHER_TYPE, String.format("0x%04x", classf.ethernetType()));
+        } else {
+            classfNode.put(ETHER_TYPE, e.name());
+        }
+        classfNode.put(TARGET_QUEUE, classf.idQueue());
+    }
+
+    /**
+     * Removes a classifier from the config.
+     *
+     * @param classf classifier to remove
+     */
+    public void removeClassifier(OpenFlowClassifier classf) {
+        checkNotNull(classf, CLASSF_NULL_ERROR);
+        checkArgument(classf.deviceId().equals(this.subject()));
+
+        Iterator<JsonNode> it = array.iterator();
+        while (it.hasNext()) {
+            JsonNode node = it.next();
+            if (etherValue(node.path(ETHER_TYPE).asText()) == classf.ethernetType()
+                && Integer.valueOf(node.path(TARGET_QUEUE).asText()) == classf.idQueue()) {
+                it.remove();
+                break;
+            }
+        }
+    }
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierListener.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierListener.java
new file mode 100644
index 0000000..895ddd5
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowClassifierListener.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+/**
+ * Notifies providers about OpenFlow Classifiers modifications.
+ */
+public interface OpenFlowClassifierListener {
+
+    /**
+     * Handle classifier add action.
+     *
+     * @param classifier the OpenFlow classifier
+     */
+    void handleClassifiersAdd(OpenFlowClassifier classifier);
+
+    /**
+     * Handle classifier remove action.
+     *
+     * @param classifier the OpenFlow classifier
+     */
+    void handleClassifiersRemove(OpenFlowClassifier classifier);
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
index e5d31596..3e153d7 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
@@ -152,4 +152,18 @@
      * @param dpid the switch to set the role for.
      */
     void setRole(Dpid dpid, RoleState role);
+
+    /**
+     * Remove OpenFlow classifier listener from runtime store of classifiers listener.
+     *
+     * @param listener the OpenFlow classifier to remove
+     */
+    void removeClassifierListener(OpenFlowClassifierListener listener);
+
+    /**
+     * Add OpenFlow classifier listener to runtime store of classifiers listener.
+     *
+     * @param listener the OpenFlow classifier listener
+     */
+    void addClassifierListener(OpenFlowClassifierListener listener);
 }
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowEvent.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowEvent.java
new file mode 100644
index 0000000..2aeaec1
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Represent OpenFlow Classifiers events.
+ */
+public class OpenFlowEvent extends AbstractEvent<OpenFlowEvent.Type, OpenFlowClassifier> {
+
+    /**
+     * Enum of OpenFlow event type.
+     */
+    public enum Type {
+        /**
+         * Signifies that a new packet classifier has been added.
+         */
+        INSERT,
+        /**
+         * Signifies that a packet classifier has been removed.
+         */
+        REMOVE
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param type the OpenFlow event type
+     * @param subject the OpenFlow update data
+     */
+    public OpenFlowEvent(Type type, OpenFlowClassifier subject) {
+        super(type, subject);
+    }
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowListener.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowListener.java
new file mode 100644
index 0000000..5f1a271
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Notifies providers about OpenFlow Classifiers events.
+ */
+public interface OpenFlowListener extends EventListener<OpenFlowEvent> {
+
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowService.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowService.java
new file mode 100644
index 0000000..b702c23
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowService.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import java.util.Set;
+import org.onosproject.event.ListenerService;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Abstraction of an OpenFlow classifiers management.
+ */
+public interface OpenFlowService extends ListenerService<OpenFlowEvent, OpenFlowListener> {
+
+    /**
+     * Adds the OpenFlow classifier to the information base.
+     *
+     * @param classifier the OpenFlow classifier
+     */
+    void add(OpenFlowClassifier classifier);
+
+    /**
+     * Removes the OpenFlow classifier from the information base.
+     *
+     * @param classifier classifier to remove
+     */
+    void remove(OpenFlowClassifier classifier);
+
+    /**
+     * Gets all OpenFlow classifiers in the system.
+     *
+     * @return set of OpenFlow classifiers
+     */
+    Set<OpenFlowClassifier> getClassifiers();
+
+    /**
+     * Obtains OpenFlow classifiers in the system by device id.
+     *
+     * @param deviceId the device id
+     * @return set of OpenFlow classifiers
+     */
+    Set<OpenFlowClassifier> getClassifiersByDeviceId(DeviceId deviceId);
+
+    /**
+     * Obtains OpenFlow classifiers in the system by device id and number of queue.
+     *
+     * @param deviceId the device id
+     * @param idQueue the queue id
+     * @return set of OpenFlow classifiers
+     */
+    Set<OpenFlowClassifier> getClassifiersByDeviceIdAndQueue(DeviceId deviceId, int idQueue);
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
index c380322..1677e5e 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
@@ -60,5 +60,17 @@
      */
     CharSequence sessionInfo();
 
+    /**
+     * Add classifier to runtime store of classifiers.
+     *
+     * @param classifier the OpenFlow classifier to add
+     */
+    void addClassifier(OpenFlowClassifier classifier);
 
+    /**
+     * Remove classifier from runtime store of classifiers.
+     *
+     * @param classifier the OpenFlow classifier to remove
+     */
+    void removeClassifier(OpenFlowClassifier classifier);
 }
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 0ba3bd9..8431068 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.onosproject.openflow.controller.driver;
 
 import com.google.common.base.MoreObjects;
@@ -23,6 +22,8 @@
 import org.onosproject.net.Device;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifier;
+import org.onosproject.openflow.controller.OpenFlowClassifierListener;
 import org.onosproject.openflow.controller.OpenFlowSession;
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
@@ -98,6 +99,8 @@
 
     private OFMeterFeaturesStatsReply meterfeatures;
 
+    protected OpenFlowClassifierListener classifierListener = new InternalClassifierListener();
+
     // messagesPendingMastership is used as synchronization variable for
     // all mastership related changes. In this block, mastership (including
     // role update) will have either occurred or not.
@@ -301,7 +304,11 @@
 
     @Override
     public final boolean connectSwitch() {
-        return this.agent.addConnectedSwitch(dpid, this);
+        boolean status = this.agent.addConnectedSwitch(dpid, this);
+        if (status) {
+            this.agent.addClassifierListener(classifierListener);
+        }
+        return status;
     }
 
     @Override
@@ -339,6 +346,7 @@
     @Override
     public final void removeConnectedSwitch() {
         this.agent.removeConnectedSwitch(dpid);
+        this.agent.removeClassifierListener(classifierListener);
     }
 
     @Override
@@ -566,4 +574,17 @@
                 .add("dpid", dpid)
                 .toString();
     }
+
+    private class InternalClassifierListener implements OpenFlowClassifierListener {
+
+        @Override
+        public void handleClassifiersAdd(OpenFlowClassifier classifier) {
+            channel.addClassifier(classifier);
+        }
+
+        @Override
+        public void handleClassifiersRemove(OpenFlowClassifier classifier) {
+            channel.removeClassifier(classifier);
+        }
+    }
 }
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
index 37c43f3..4451b19 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
@@ -16,6 +16,7 @@
 package org.onosproject.openflow.controller.driver;
 
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifierListener;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFMessage;
@@ -109,4 +110,18 @@
      * @param response role reply from the switch
      */
     void returnRoleReply(Dpid dpid, RoleState requested, RoleState response);
+
+    /**
+     * Add OpenFlow classifier listener.
+     *
+     * @param listener the OpenFlow classifier listener
+     */
+    void addClassifierListener(OpenFlowClassifierListener listener);
+
+    /**
+     * Remove OpenFlow classifier listener.
+     *
+     * @param listener the OpenFlow classifier listener
+     */
+    void removeClassifierListener(OpenFlowClassifierListener listener);
 }
diff --git a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
index a3b8e5c..5056840 100644
--- a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
+++ b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/OpenflowControllerAdapter.java
@@ -103,4 +103,12 @@
     @Override
     public void removeEventListener(OpenFlowEventListener listener) {
     }
+
+    @Override
+    public void removeClassifierListener(OpenFlowClassifierListener listener) {
+    }
+
+    @Override
+    public void addClassifierListener(OpenFlowClassifierListener listener) {
+    }
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
index 6cf1ddc..c71de08 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.onosproject.openflow.controller.impl;
 
 import com.google.common.base.MoreObjects;
@@ -96,6 +95,12 @@
 
     private static final short MIN_KS_LENGTH = 6;
 
+    //Default queues settings
+    private static final short DEFAULT_QUEUE_SIZE = 5000;
+    private static final short FIRST_QUEUE_SIZE = 1000;
+    private static final short DEFAULT_BULK_SIZE = 100;
+    private static final short DEFAULT_QUEUE_ID = 7;
+
     protected HashMap<String, String> controllerNodeIPsCache;
 
     private ChannelGroup cg;
@@ -103,6 +108,8 @@
     // Configuration options
     protected List<Integer> openFlowPorts = ImmutableList.of(6633, 6653);
     protected int workerThreads = 0;
+    protected int[] cfgQueueSizes = {FIRST_QUEUE_SIZE, 0, 0, 0, 0, 0, 0, DEFAULT_QUEUE_SIZE};
+    protected int[] cfgBulkSizes = new int[8];
 
     // Start time of the controller
     protected long systemStartTime;
@@ -129,7 +136,17 @@
     private DriverService driverService;
     private NetworkConfigRegistry netCfgService;
 
+    public Controller() {
+        Arrays.fill(cfgBulkSizes, DEFAULT_BULK_SIZE);
+    }
 
+    public int getQueueSize(int queueId) {
+        return cfgQueueSizes[queueId];
+    }
+
+    public int getBulkSize(int queueId) {
+        return cfgBulkSizes[queueId];
+    }
 
     // **************
     // Initialization
@@ -221,6 +238,29 @@
                     .channel(NioServerSocketChannel.class);
     }
 
+    public void setQueueParams(Dictionary<?, ?> properties, String sizeParamName, String bulkParamName, int queueId) {
+        String queueSize = get(properties, sizeParamName);
+        if (!Strings.isNullOrEmpty(queueSize)) {
+            int size = Integer.parseInt(queueSize);
+            if (size > 0) {
+                this.cfgQueueSizes[queueId] = size;
+            } else {
+                throw new IllegalArgumentException(
+                    String.format("%s value must be either a positive integer value", sizeParamName));
+            }
+        }
+        String bulkSize = get(properties, bulkParamName);
+        if (!Strings.isNullOrEmpty(bulkSize)) {
+            int bulk = Integer.parseInt(bulkSize);
+            if (bulk > 0) {
+                this.cfgBulkSizes[queueId] = bulk;
+            } else {
+                throw new IllegalArgumentException(
+                    String.format("%s value must be either a positive integer value", bulkParamName));
+            }
+        }
+    }
+
     public void setConfigParams(Dictionary<?, ?> properties) {
         boolean restartRequired = setOpenFlowPorts(properties);
         restartRequired |= setWorkerThreads(properties);
@@ -273,6 +313,16 @@
             this.workerThreads = Integer.parseInt(threads);
         }
         log.debug("Number of worker threads set to {}", this.workerThreads);
+
+        setQueueParams(properties, "defaultQueueSize", "defaultBulkSize", DEFAULT_QUEUE_ID);
+        setQueueParams(properties, "queueSizeN0", "bulkSizeN0", 0);
+        setQueueParams(properties, "queueSizeN1", "bulkSizeN1", 1);
+        setQueueParams(properties, "queueSizeN2", "bulkSizeN2", 2);
+        setQueueParams(properties, "queueSizeN3", "bulkSizeN3", 3);
+        setQueueParams(properties, "queueSizeN4", "bulkSizeN4", 4);
+        setQueueParams(properties, "queueSizeN5", "bulkSizeN5", 5);
+        setQueueParams(properties, "queueSizeN6", "bulkSizeN6", 6);
+
         return oldValue != this.workerThreads; // restart if number of threads has changed
     }
 
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/LinkedBlockingMessagesQueue.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/LinkedBlockingMessagesQueue.java
new file mode 100644
index 0000000..518cfcc
--- /dev/null
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/LinkedBlockingMessagesQueue.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller.impl;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class wrap existing class LinkedBlockingQueue for solution problem
+ * with handling and processed received messages in onos.
+ *
+ * @see java.util.concurrent.LinkedBlockingQueue
+ */
+public class LinkedBlockingMessagesQueue<T> {
+
+    /**
+     * Identifier of queue.
+     */
+    private int idQueue;
+
+    /**
+     * Size of queue.
+     */
+    private int sizeOfQueue;
+
+    /**
+     * Maximal bulk of messages that will be processed.
+     */
+    private int bulk;
+
+    /**
+     * Queue of messages.
+     */
+    private BlockingQueue<T> queue;
+
+    /**
+     * Constructor.
+     *
+     * @param idQueue     Identifier of queue
+     * @param sizeOfQueue Size of queue
+     * @param bulk        Maximal bulk of messages that will be processed
+     */
+    public LinkedBlockingMessagesQueue(int idQueue, int sizeOfQueue, int bulk) {
+        this.idQueue = idQueue;
+        this.sizeOfQueue = sizeOfQueue;
+        this.queue = new LinkedBlockingQueue<>(this.sizeOfQueue);
+        this.bulk = bulk;
+    }
+
+    /**
+     * Returns the identifier of this queue.
+     *
+     * @return the id of this queue
+     */
+    public int idQueue() {
+        return idQueue;
+    }
+
+    /**
+     * Return the size of this queue.
+     *
+     * @return the size of this queue
+     */
+    public int sizeOfQueue() {
+        return sizeOfQueue;
+    }
+
+    /**
+     * Set size for this queue.
+     *
+     * @param sizeOfQueue Size of queue
+     */
+    public void setSizeOfQueue(int sizeOfQueue) {
+        this.sizeOfQueue = sizeOfQueue;
+        this.queue = new LinkedBlockingQueue<>(this.sizeOfQueue);
+    }
+
+    /**
+     * Offer new message to this queue.
+     *
+     * @param message  elemet to add
+     * @return <code>true</code> if the element was added to this queue, else <code>false</code>
+     */
+    public boolean offer(T message) {
+        return this.queue.offer(message);
+    }
+
+    /**
+     * Transfer bulk of elements from this queue to the <code>messages</code> collection.
+     *
+     * @param messages  the collection to transfer bulk of elements from this queue
+     * @return the numbers of elements transfered
+     */
+    public int drainTo(Collection<? super T> messages) {
+        return this.queue.drainTo(messages, this.bulk);
+    }
+
+    /**
+     * Return the elements count in this queue.
+     *
+     * @return the elements count
+     */
+    public int size() {
+        return this.queue.size();
+    }
+
+    /**
+     * Return the maximal bulk of messages for this queue.
+     *
+     * @return maximal bulk of messages that will be processed
+     */
+    public int bulk() {
+        return bulk;
+    }
+
+    /**
+     * Set the maximal bulk of messages for this queue.
+     *
+     * @param bulk Maximal bulk of messages that will be processed
+     */
+    public void setBulk(int bulk) {
+        this.bulk = bulk;
+    }
+
+}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 264629e..5a95188 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -16,30 +16,51 @@
 
 package org.onosproject.openflow.controller.impl;
 
+import static org.onlab.packet.Ethernet.TYPE_BSN;
+import static org.onlab.packet.Ethernet.TYPE_LLDP;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.openflow.controller.Dpid.uri;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.security.cert.Certificate;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.packet.DefaultInboundPacket;
+import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifier;
+import org.onosproject.openflow.controller.OpenFlowPacketContext;
+import org.onosproject.openflow.controller.OpenFlowService;
 import org.onosproject.openflow.controller.OpenFlowSession;
 import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
 import org.onosproject.openflow.controller.driver.SwitchStateException;
@@ -106,11 +127,12 @@
 
     private static final String RESET_BY_PEER = "Connection reset by peer";
     private static final String BROKEN_PIPE = "Broken pipe";
+    private static final int NUM_OF_QUEUES = 8;
 
     private final Controller controller;
     private OpenFlowSwitchDriver sw;
     private long thisdpid; // channelHandler cached value of connected switch id
-
+    private DeviceId deviceId;
     private Channel channel;
     private String channelId;
 
@@ -152,15 +174,38 @@
      */
     private int handshakeTransactionIds = -1;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private OpenFlowService openFlowManager;
 
-
-    private static final int MSG_READ_BUFFER = 5000;
+    private static final int BACKLOG_READ_BUFFER_DEFAULT = 1000;
 
     /**
-     * OFMessage dispatch queue.
+     * Map with all LinkedBlockingMessagesQueue queues which contains OFMessages.
      */
-    private final BlockingQueue<OFMessage> dispatchQueue =
-            new LinkedBlockingQueue<>(MSG_READ_BUFFER);
+    private Map<Integer, LinkedBlockingMessagesQueue<OFMessage>> dispatchQueuesMapProducer = new ConcurrentHashMap<>();
+
+    /**
+     * OFMessage classifiers map.
+     */
+    private List<Set<OpenFlowClassifier>> messageClassifiersMapProducer =
+            new CopyOnWriteArrayList<Set<OpenFlowClassifier>>();
+
+
+    /**
+     * Lock held by take, poll, etc.
+     */
+    private final ReentrantLock takeLock = new ReentrantLock();
+
+    /**
+     * Wait queue for waiting takes.
+     */
+    private final Condition notEmpty = takeLock.newCondition();
+
+    /**
+     * Current number of elements in enabled sub-queues.
+     */
+    private final AtomicInteger totalCount = new AtomicInteger();
+
 
     /**
      * Single thread executor for OFMessage dispatching.
@@ -181,7 +226,7 @@
      * <p>
      * Should only be touched from the Channel I/O thread
      */
-    private final Deque<OFMessage> dispatchBacklog = new ArrayDeque<>();
+    private final Deque<OFMessage> dispatchBacklog;
 
     /**
      * Create a new unconnected OFChannelHandler.
@@ -194,6 +239,17 @@
         this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
         this.portDescReplies = new ArrayList<>();
         duplicateDpidFound = Boolean.FALSE;
+        //Initialize queues and classifiers
+        dispatchBacklog = new LinkedBlockingDeque<>(BACKLOG_READ_BUFFER_DEFAULT);
+        for (int i = 0; i < NUM_OF_QUEUES; i++) {
+            if (controller.getQueueSize(i) > 0) {
+                dispatchQueuesMapProducer.put(i,
+                        new LinkedBlockingMessagesQueue<>(i, controller.getQueueSize(i), controller.getBulkSize(i)));
+            }
+            if (i != NUM_OF_QUEUES) {
+                messageClassifiersMapProducer.add(i, new CopyOnWriteArraySet<>());
+            }
+        }
     }
 
 
@@ -324,6 +380,7 @@
                     h.channel.disconnect();
                     return;
                 }
+                h.deviceId = DeviceId.deviceId(uri(h.thisdpid));
                 log.debug("Received features reply for switch at {} with dpid {}",
                         h.getSwitchInfoString(), h.thisdpid);
 
@@ -461,6 +518,7 @@
                     throws IOException, SwitchStateException {
                 illegalMessageReceived(h, m);
             }
+
             @Override
             void processOFStatisticsReply(OFChannelHandler h,
                     OFStatsReply  m)
@@ -564,6 +622,8 @@
                 if (h.sw.isDriverHandshakeComplete()) {
                     if (!h.sw.connectSwitch()) {
                         disconnectDuplicate(h);
+                    } else {
+                        h.initClassifiers();
                     }
                     handlePendingPortStatusMessages(h);
                     h.setState(ACTIVE);
@@ -1421,63 +1481,152 @@
 
     /**
      * Is this a state in which the handshake has completed?
+     *
      * @return true if the handshake is complete
      */
     public boolean isHandshakeComplete() {
         return this.state.isHandshakeComplete();
     }
 
-    private void dispatchMessage(OFMessage m) {
-
-        if (dispatchBacklog.isEmpty()) {
-            if (!dispatchQueue.offer(m)) {
-                // queue full
-                channel.config().setAutoRead(false);
-                // put it on the head of backlog
-                dispatchBacklog.addFirst(m);
-                return;
+    /**
+     * Increment totalCount variable and send signal to executor.
+     */
+    private void incrementAndSignal() {
+        try {
+            totalCount.incrementAndGet();
+            takeLock.lockInterruptibly();
+            try {
+                notEmpty.signal();
+            } finally {
+                takeLock.unlock();
             }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Try to push OpenFlow message to queue.
+     *
+     * @param message OpenFlow message
+     * @param idQueue id of Queue
+     * @return true if message was successful added to queue
+     */
+    private boolean pushMessageToQueue(OFMessage message, int idQueue) {
+        if (!dispatchQueuesMapProducer.get(idQueue).offer(message)) {
+            return false;
         } else {
-            dispatchBacklog.addLast(m);
+            incrementAndSignal();
+            return true;
         }
+    }
 
+    /**
+     * Process backlog - move messages from backlog to default queue.
+     *
+     * @return true if whole backlog was processed, otherwise false
+     */
+    private boolean processDispatchBacklogQueue() {
         while (!dispatchBacklog.isEmpty()) {
-            OFMessage msg = dispatchBacklog.pop();
-
-            if (!dispatchQueue.offer(msg)) {
-                // queue full
-                channel.config().setAutoRead(false);
-                // put it back to the head of backlog
-                dispatchBacklog.addFirst(msg);
-                return;
+            OFMessage msgFromBacklog = dispatchBacklog.removeFirst();
+            if (!pushMessageToQueue(msgFromBacklog, NUM_OF_QUEUES - 1)) {
+                dispatchBacklog.addFirst(msgFromBacklog);
+                return false;
             }
         }
+        return true;
 
+    }
+
+    /**
+     * Parse OpenFlow message context for get Ethernet packet.
+     *
+     * @param message OpenFlow message
+     * @return parsed Ethernet packet
+     */
+    private Ethernet parsePacketInMessage(OFMessage message) {
+        OpenFlowPacketContext pktCtx = DefaultOpenFlowPacketContext
+                .packetContextFromPacketIn(sw, (OFPacketIn) message);
+        DeviceId id = DeviceId.deviceId(Dpid.uri(pktCtx.dpid().value()));
+        DefaultInboundPacket inPkt = new DefaultInboundPacket(
+                new ConnectPoint(id, PortNumber.portNumber(pktCtx.inPort())),
+                pktCtx.parsed(), ByteBuffer.wrap(pktCtx.unparsed()),
+                pktCtx.cookie());
+        return inPkt.parsed();
+    }
+
+    /**
+     * Classify the Ethernet packet for membership on one of the queues.
+     *
+     * @param packet ethernet packet
+     * @return Id of destination Queue
+     */
+    private int classifyEthernetPacket(Ethernet packet) {
+        for (Set<OpenFlowClassifier> classifiers : this.messageClassifiersMapProducer) {
+            for (OpenFlowClassifier classifier : classifiers) {
+                if (classifier.ethernetType() == packet.getEtherType()) {
+                    return classifier.idQueue();
+                }
+            }
+        }
+        return NUM_OF_QUEUES - 1;
+    }
+
+    /**
+     * Process messages from dispatch queues.
+     *
+     * @param queuesSize count of messages in all queues
+     */
+    private void processMessages(int queuesSize) {
+        List<OFMessage> msgs = new ArrayList<>();
+        int processed;
+        do {
+            processed = 0;
+            while (processed < queuesSize) {
+                for (LinkedBlockingMessagesQueue<OFMessage> queue :
+                        dispatchQueuesMapProducer.values()) {
+                    processed += queue.drainTo(msgs);
+                }
+            }
+
+            msgs.forEach(sw::handleMessage);
+            msgs.clear();
+            /* Decrement conditional variable */
+            queuesSize = totalCount.addAndGet(-1 * processed);
+        } while (queuesSize > 0);
+    }
+
+    private void dispatchMessage(OFMessage m) {
+        log.debug("Begin dispatch OpenFlow Message");
+        boolean backlogEmpty = processDispatchBacklogQueue();
+        if (m.getType() == OFType.PACKET_IN) {
+            Ethernet pkt = parsePacketInMessage(m);
+            pushMessageToQueue(m, classifyEthernetPacket(pkt));
+        } else {
+            if (!backlogEmpty || !pushMessageToQueue(m, NUM_OF_QUEUES - 1)) {
+                dispatchBacklog.offer(m);
+            }
+        }
 
         if (dispatcherHandle.isDone()) {
             // dispatcher terminated for some reason, restart
-
             dispatcherHandle = dispatcher.submit((Runnable) () -> {
                 try {
-                    List<OFMessage> msgs = new ArrayList<>();
                     for (;;) {
-                        // wait for new message
-                        OFMessage msg = dispatchQueue.take();
-                        sw.handleMessage(msg);
-
-                        while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
-                            if (!channel.config().isAutoRead()) {
-                                channel.config().setAutoRead(true);
+                        int tc = 0;
+                        takeLock.lockInterruptibly();
+                        try {
+                            while ((tc = totalCount.get()) == 0) {
+                                notEmpty.await();
                             }
-                            msgs.forEach(sw::handleMessage);
-                            msgs.clear();
+                        } finally {
+                            takeLock.unlock();
                         }
 
-                        if (!channel.config().isAutoRead()) {
-                            channel.config().setAutoRead(true);
-                        }
+                        processMessages(tc);
                     }
                 } catch (InterruptedException e) {
+                    log.error("executor thread InterruptedException: {}", e);
                     Thread.currentThread().interrupt();
                     // interrupted. gracefully shutting down
                     return;
@@ -1697,4 +1846,50 @@
         return channelId;
     }
 
+    @Override
+    public void addClassifier(OpenFlowClassifier classifier) {
+        if (this.deviceId.equals(classifier.deviceId())) {
+            log.debug("Add OpenFlow Classifier for switch {} to queue {} with type {}",
+                     classifier.deviceId().toString(), classifier.idQueue(), classifier.ethernetType());
+            this.messageClassifiersMapProducer.get(classifier.idQueue()).add(classifier);
+        }
+    }
+
+    @Override
+    public void removeClassifier(OpenFlowClassifier classifier) {
+        if (this.deviceId.equals(classifier.deviceId())) {
+            log.debug("Remove OpenFlow Classifier for switch {} from queue {} with type {}",
+                      classifier.deviceId().toString(), classifier.idQueue(), classifier.ethernetType());
+            this.messageClassifiersMapProducer.get(classifier.idQueue()).remove(classifier);
+        }
+    }
+
+    /**
+     * Init classifier configuration for the switch. Use stored configuration if exist.
+     * Otherwise add LLDP and BDDP classifiers for Queue N0.
+     */
+    private void initClassifiers() {
+        try {
+            openFlowManager = DefaultServiceDirectory.getService(OpenFlowService.class);
+            DeviceId did = DeviceId.deviceId(uri(thisdpid));
+            Set<OpenFlowClassifier> classifiers = openFlowManager.getClassifiersByDeviceId(did);
+            if (classifiers == null) {
+                OpenFlowClassifier classifier =
+                        new OpenFlowClassifier.Builder(did, 0).ethernetType(TYPE_LLDP).build();
+                openFlowManager.add(classifier);
+                classifier = new OpenFlowClassifier.Builder(did, 0).ethernetType(TYPE_BSN).build();
+                openFlowManager.add(classifier);
+            } else {
+                this.messageClassifiersMapProducer.forEach((v) -> {
+                        v.clear();
+                });
+                classifiers.forEach((c) -> {
+                        messageClassifiersMapProducer.get(c.idQueue()).add(c);
+                });
+            }
+        } catch (Exception e) {
+            log.error("Initialize default classifier failed: {}", e.toString());
+            e.printStackTrace();
+        }
+    }
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
index b03f007..a78e6a6 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
@@ -23,6 +23,7 @@
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.concurrent.EventExecutorGroup;
@@ -76,6 +77,8 @@
         pipeline.addLast("ofmessageencoder", OFMessageEncoder.getInstance());
         pipeline.addLast("ofmessagedecoder", OFMessageDecoder.getInstance());
 
+        pipeline.addLast("consolidateflush", new FlushConsolidationHandler(
+                           FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true));
         pipeline.addLast("idle", new IdleStateHandler(20, 25, 0));
         pipeline.addLast("timeout", new ReadTimeoutHandler(30));
 
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowClassifierManager.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowClassifierManager.java
new file mode 100644
index 0000000..c4cd4b2
--- /dev/null
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowClassifierManager.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.ImmutableSet;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.net.DeviceId;
+import org.onosproject.openflow.controller.OpenFlowClassifier;
+import org.onosproject.openflow.controller.OpenFlowClassifierConfig;
+import org.onosproject.openflow.controller.OpenFlowEvent;
+import org.onosproject.openflow.controller.OpenFlowListener;
+import org.onosproject.openflow.controller.OpenFlowService;
+import org.slf4j.Logger;
+
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigEvent;
+import static org.onosproject.net.config.basics.SubjectFactories.DEVICE_SUBJECT_FACTORY;
+
+/**
+ * Manages the inventory of OpenFlow Classifiers in the system.
+ */
+@Component(immediate = true, service = OpenFlowService.class)
+public class OpenFlowClassifierManager extends ListenerRegistry<OpenFlowEvent, OpenFlowListener>
+    implements OpenFlowService {
+
+    private Logger log = getLogger(getClass());
+
+    private final InternalConfigListener listener = new InternalConfigListener();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetworkConfigService cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetworkConfigRegistry cfgRegistry;
+
+    private static final String FEATURE_NAME = "classifiers";
+    private final Set<ConfigFactory<?, ?>> factories = ImmutableSet.of(
+            new ConfigFactory<DeviceId, OpenFlowClassifierConfig>(DEVICE_SUBJECT_FACTORY,
+                    OpenFlowClassifierConfig.class, FEATURE_NAME,
+                    true) {
+                @Override
+                public OpenFlowClassifierConfig createConfig() {
+                    return new OpenFlowClassifierConfig();
+                }
+            }
+    );
+
+    private final Map<DeviceId, Set<OpenFlowClassifier>> classifiersMap = Maps.newConcurrentMap();
+
+    @Activate
+    private void activate() {
+        factories.forEach(cfgRegistry::registerConfigFactory);
+        cfgService.addListener(listener);
+
+        for (DeviceId subject : cfgService.getSubjects(DeviceId.class, OpenFlowClassifierConfig.class)) {
+            OpenFlowClassifierConfig config = cfgService.getConfig(subject, OpenFlowClassifierConfig.class);
+
+            if (config != null) {
+                updateClassifiers(config);
+            }
+        }
+
+        log.info("Started Openflow Manager");
+    }
+
+    @Deactivate
+    private void deactivate() {
+        cfgService.removeListener(listener);
+
+        factories.forEach(cfgRegistry::unregisterConfigFactory);
+        log.info("Stopped Openflow manager");
+
+    }
+
+    @Override
+    public void add(OpenFlowClassifier classifier) {
+        checkNotNull(classifier, "Classifier cannot be null");
+
+        OpenFlowClassifierConfig config =
+                cfgService.addConfig(classifier.deviceId(), OpenFlowClassifierConfig.class);
+        config.addClassifier(classifier);
+        cfgService.applyConfig(classifier.deviceId(), OpenFlowClassifierConfig.class, config.node());
+    }
+
+    @Override
+    public void remove(OpenFlowClassifier classifier) {
+        checkNotNull(classifier, "Classifier cannot be null");
+
+        OpenFlowClassifierConfig config = cfgService.getConfig(classifier.deviceId(), OpenFlowClassifierConfig.class);
+
+        if (config == null) {
+            return;
+        }
+
+        config.removeClassifier(classifier);
+        cfgService.applyConfig(classifier.deviceId(), OpenFlowClassifierConfig.class, config.node());
+    }
+
+    @Override
+    public Set<OpenFlowClassifier> getClassifiers() {
+        Set<OpenFlowClassifier> classifiers = Sets.newHashSet();
+
+        classifiersMap.values().forEach(c -> classifiers.addAll(c));
+
+        return classifiers;
+    }
+
+    @Override
+    public Set<OpenFlowClassifier> getClassifiersByDeviceId(DeviceId deviceId) {
+        return classifiersMap.get(deviceId);
+    }
+
+    @Override
+    public Set<OpenFlowClassifier> getClassifiersByDeviceIdAndQueue(DeviceId deviceId, int idQueue) {
+        Set<OpenFlowClassifier> classifiers = classifiersMap.get(deviceId);
+        if (classifiers == null) {
+            return null;
+        } else {
+            return classifiers.stream()
+                   .filter(p -> p.idQueue() == idQueue)
+                   .collect(Collectors.toSet());
+        }
+    }
+
+    private void updateClassifiers(OpenFlowClassifierConfig classfConfig) {
+        Set<OpenFlowClassifier> old = classifiersMap.put(classfConfig.subject(),
+                                                         Sets.newHashSet(classfConfig.getClassifiers()));
+
+        if (old == null) {
+            old = Collections.emptySet();
+        }
+
+        for (OpenFlowClassifier classf : classfConfig.getClassifiers()) {
+            if (old.contains(classf)) {
+                old.remove(classf);
+            } else {
+                process(new OpenFlowEvent(OpenFlowEvent.Type.INSERT, classf));
+            }
+        }
+
+        for (OpenFlowClassifier classf : old) {
+            process(new OpenFlowEvent(OpenFlowEvent.Type.REMOVE, classf));
+        }
+    }
+
+    private void removeClassifiers(DeviceId deviceId) {
+        Set<OpenFlowClassifier> old = classifiersMap.remove(deviceId);
+
+        for (OpenFlowClassifier classf : old) {
+            process(new OpenFlowEvent(OpenFlowEvent.Type.REMOVE, classf));
+        }
+    }
+
+    /**
+     * Listener for network config events.
+     */
+    private class InternalConfigListener implements NetworkConfigListener {
+
+        @Override
+        public void event(NetworkConfigEvent event) {
+            if (event.configClass() == OpenFlowClassifierConfig.class) {
+                switch (event.type()) {
+                case CONFIG_ADDED:
+                case CONFIG_UPDATED:
+                    event.config().ifPresent(config -> updateClassifiers((OpenFlowClassifierConfig) config));
+                    break;
+                case CONFIG_REMOVED:
+                    removeClassifiers((DeviceId) event.subject());
+                    break;
+                case CONFIG_REGISTERED:
+                case CONFIG_UNREGISTERED:
+                default:
+                    break;
+                }
+            }
+        }
+    }
+}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index ee9422f..4696778 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -30,12 +30,16 @@
 import org.onosproject.openflow.config.OpenFlowDeviceConfig;
 import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifierListener;
 import org.onosproject.openflow.controller.OpenFlowController;
 import org.onosproject.openflow.controller.OpenFlowEventListener;
 import org.onosproject.openflow.controller.OpenFlowMessageListener;
 import org.onosproject.openflow.controller.OpenFlowPacketContext;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
+import org.onosproject.openflow.controller.OpenFlowListener;
+import org.onosproject.openflow.controller.OpenFlowService;
+import org.onosproject.openflow.controller.OpenFlowEvent;
 import org.onosproject.openflow.controller.PacketListener;
 import org.onosproject.openflow.controller.RoleState;
 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
@@ -104,8 +108,25 @@
                 KEY_STORE_PASSWORD + "=" + KEY_STORE_PASSWORD_DEFAULT,
                 TRUST_STORE + "=" + TRUST_STORE_DEFAULT,
                 TRUST_STORE_PASSWORD + "=" + TRUST_STORE_PASSWORD_DEFAULT,
+                DEFAULT_QUEUE_SIZE + ":Integer=" + DEFAULT_QUEUE_SIZE_DEFAULT,
+                DEBAULT_BULK_SIZE + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N0 + ":Integer=" + QUEUE_SIZE_N0_DEFAULT,
+                BULK_SIZE_N0 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N1 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N1 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N2 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N2 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N3 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N3 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N4 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N4 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N5 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N5 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N6 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N6 + ":Integer=" + BULK_SIZE_DEFAULT,
         }
 )
+
 public class OpenFlowControllerImpl implements OpenFlowController {
     private static final String APP_ID = "org.onosproject.openflow-base";
     protected static final String SCHEME = "of";
@@ -131,12 +152,17 @@
     /** Number of controller worker threads. */
     private int workerThreads = WORKER_THREADS_DEFAULT;
 
-      /** TLS mode for OpenFlow channel; options are: disabled [default], enabled, strict. */
+    /** TLS mode for OpenFlow channel; options are: disabled [default], enabled, strict. */
     private String tlsMode;
 
     /** File path to key store for TLS connections. */
     private String keyStore;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected OpenFlowService openFlowManager;
+
+    private final OpenFlowListener openFlowListener = new InternalOpenFlowListener();
+
     /** Key store password. */
     private String keyStorePassword;
 
@@ -146,6 +172,54 @@
     /** Trust store password. */
     private String trustStorePassword;
 
+    /** Size of deafult queue. */
+    private int defaultQueueSize = DEFAULT_QUEUE_SIZE_DEFAULT;
+
+    /** Size of deafult bulk. */
+    private int defaultBulkSize = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N0. */
+    private int queueSizeN0 = QUEUE_SIZE_N0_DEFAULT;
+
+    /** Size of bulk N0. */
+    private int bulkSizeN0 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N1. */
+    private int queueSizeN1 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N1. */
+    private int bulkSizeN1 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N2. */
+    private int queueSizeN2 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N2. */
+    private int bulkSizeN2 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N3. */
+    private int queueSizeN3 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N3. */
+    private int bulkSizeN3 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N4. */
+    private int queueSizeN4 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N4. */
+    private int bulkSizeN4 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N5. */
+    private int queueSizeN5 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N5. */
+    private int bulkSizeN5 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N6. */
+    private int queueSizeN6 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N6. */
+    private int bulkSizeN6 = BULK_SIZE_DEFAULT;
+
     protected ExecutorService executorMsgs =
         Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d", log));
 
@@ -179,6 +253,8 @@
 
     protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
 
+    protected Set<OpenFlowClassifierListener> ofClassifierListener = new CopyOnWriteArraySet<>();
+
     protected Set<OpenFlowMessageListener> ofMessageListener = new CopyOnWriteArraySet<>();
 
     protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
@@ -266,6 +342,7 @@
         netCfgService.addListener(netCfgListener);
         ctrl.setConfigParams(context.getProperties());
         ctrl.start(agent, driverService, netCfgService);
+        openFlowManager.addListener(openFlowListener);
     }
 
     private void cleanup() {
@@ -276,6 +353,7 @@
         connectedSwitches.clear();
         activeMasterSwitches.clear();
         activeEqualSwitches.clear();
+        openFlowManager.removeListener(openFlowListener);
     }
 
     @Deactivate
@@ -334,6 +412,16 @@
     }
 
     @Override
+    public void addClassifierListener(OpenFlowClassifierListener listener) {
+        this.ofClassifierListener.add(listener);
+    }
+
+    @Override
+    public void removeClassifierListener(OpenFlowClassifierListener listener) {
+        this.ofClassifierListener.remove(listener);
+    }
+
+    @Override
     public void addMessageListener(OpenFlowMessageListener listener) {
         ofMessageListener.add(listener);
     }
@@ -840,6 +928,16 @@
                 l.receivedRoleReply(dpid, requested, response);
             }
         }
+
+        @Override
+        public void addClassifierListener(OpenFlowClassifierListener listener) {
+            ofClassifierListener.add(listener);
+        }
+
+        @Override
+        public void removeClassifierListener(OpenFlowClassifierListener listener) {
+            ofClassifierListener.remove(listener);
+        }
     }
 
     /**
@@ -862,4 +960,28 @@
             }
         }
     }
+
+    private class InternalOpenFlowListener implements OpenFlowListener {
+        public void event(OpenFlowEvent event) {
+            try {
+                switch (event.type()) {
+                case INSERT:
+                    for (OpenFlowClassifierListener listener : ofClassifierListener) {
+                        listener.handleClassifiersAdd(event.subject());
+                    }
+                    break;
+                case REMOVE:
+                    for (OpenFlowClassifierListener listener : ofClassifierListener) {
+                        listener.handleClassifiersRemove(event.subject());
+                    }
+                    break;
+                default:
+                    log.warn("Unknown OpenFlow classifier event type: {}", event.type());
+                    break;
+                }
+            } catch (Exception e) {
+                log.error("Internal OpenFlowListener exception: {}", e.getMessage());
+            }
+        }
+    }
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OsgiPropertyConstants.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OsgiPropertyConstants.java
index a152239..9dc3809 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OsgiPropertyConstants.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OsgiPropertyConstants.java
@@ -46,4 +46,26 @@
     public static final String TRUST_STORE_PASSWORD = "trustStorePassword";
     public static final String TRUST_STORE_PASSWORD_DEFAULT = "";
 
+    public static final String DEFAULT_QUEUE_SIZE = "defaultQueueSize";
+    public static final String DEBAULT_BULK_SIZE = "defaultBulkSize";
+    public static final String QUEUE_SIZE_N0 = "queueSizeN0";
+    public static final String BULK_SIZE_N0 = "bulkSizeN0";
+    public static final String QUEUE_SIZE_N1 = "queueSizeN1";
+    public static final String BULK_SIZE_N1 = "bulkSizeN1";
+    public static final String QUEUE_SIZE_N2 = "queueSizeN2";
+    public static final String BULK_SIZE_N2 = "bulkSizeN2";
+    public static final String QUEUE_SIZE_N3 = "queueSizeN3";
+    public static final String BULK_SIZE_N3 = "bulkSizeN3";
+    public static final String QUEUE_SIZE_N4 = "queueSizeN4";
+    public static final String BULK_SIZE_N4 = "bulkSizeN4";
+    public static final String QUEUE_SIZE_N5 = "queueSizeN5";
+    public static final String BULK_SIZE_N5 = "bulkSizeN5";
+    public static final String QUEUE_SIZE_N6 = "queueSizeN6";
+    public static final String BULK_SIZE_N6 = "bulkSizeN6";
+
+    public static final int DEFAULT_QUEUE_SIZE_DEFAULT = 5000;
+    public static final int QUEUE_SIZE_N0_DEFAULT = 1000;
+    public static final int BULK_SIZE_DEFAULT = 100;
+    public static final int QUEUE_SIZE_DEFAULT = 1;
+
 }
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplTest.java
index 16fea6b..f853b98 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplTest.java
@@ -38,6 +38,7 @@
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
 import org.onosproject.openflow.controller.RoleState;
+import org.onosproject.openflow.controller.OpenFlowService;
 import org.osgi.service.component.ComponentContext;
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
 
@@ -139,6 +140,10 @@
                 EasyMock.createMock(CoreService.class);
         controller.coreService = mockCoreService;
 
+        OpenFlowService mockOpenFlowService =
+                EasyMock.createMock(OpenFlowService.class);
+        controller.openFlowManager = mockOpenFlowService;
+
         ComponentConfigService mockConfigService =
                 EasyMock.createMock(ComponentConfigService.class);
         expect(mockConfigService.getProperties(anyObject())).andReturn(ImmutableSet.of());