Use typed queues for OF message processing

Process OF messages through 8 queues. Output queue for messages
controlled per OF Agent with help of message classifiers.

Queues can be configured through component configuration mechanism
for "org.onosproject.openflow.controller.impl.OpenFlowControllerImpl"
component.

Classifiers can be configured through NetworkConfig API in the following
form:

      {
        "devices": {
           "of:0000000000000001": {
               "classifiers": [{
                   "ethernet-type":"LLDP",
                   "target-queue":0
               },{
                   "ethernet-type":"BDDP",
                   "target-queue":0
               },{
                   "ethernet-type":"0x1234",
                   "target-queue":1
               }]
           }
        }
      }

Where "target_queue" is queue number from 0 to 7 (7 is default queue),
"ethernet_type" is a type of a packet either in "0xFFFF" from or enum
name as defined in the "org.onlab.packet.EthType.EtherType" enum.

Change-Id: I0512ef653d90c36f00289014872170c1a8aa5204
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index ee9422f..4696778 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -30,12 +30,16 @@
 import org.onosproject.openflow.config.OpenFlowDeviceConfig;
 import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowClassifierListener;
 import org.onosproject.openflow.controller.OpenFlowController;
 import org.onosproject.openflow.controller.OpenFlowEventListener;
 import org.onosproject.openflow.controller.OpenFlowMessageListener;
 import org.onosproject.openflow.controller.OpenFlowPacketContext;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
+import org.onosproject.openflow.controller.OpenFlowListener;
+import org.onosproject.openflow.controller.OpenFlowService;
+import org.onosproject.openflow.controller.OpenFlowEvent;
 import org.onosproject.openflow.controller.PacketListener;
 import org.onosproject.openflow.controller.RoleState;
 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
@@ -104,8 +108,25 @@
                 KEY_STORE_PASSWORD + "=" + KEY_STORE_PASSWORD_DEFAULT,
                 TRUST_STORE + "=" + TRUST_STORE_DEFAULT,
                 TRUST_STORE_PASSWORD + "=" + TRUST_STORE_PASSWORD_DEFAULT,
+                DEFAULT_QUEUE_SIZE + ":Integer=" + DEFAULT_QUEUE_SIZE_DEFAULT,
+                DEBAULT_BULK_SIZE + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N0 + ":Integer=" + QUEUE_SIZE_N0_DEFAULT,
+                BULK_SIZE_N0 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N1 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N1 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N2 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N2 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N3 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N3 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N4 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N4 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N5 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N5 + ":Integer=" + BULK_SIZE_DEFAULT,
+                QUEUE_SIZE_N6 + ":Integer=" + QUEUE_SIZE_DEFAULT,
+                BULK_SIZE_N6 + ":Integer=" + BULK_SIZE_DEFAULT,
         }
 )
+
 public class OpenFlowControllerImpl implements OpenFlowController {
     private static final String APP_ID = "org.onosproject.openflow-base";
     protected static final String SCHEME = "of";
@@ -131,12 +152,17 @@
     /** Number of controller worker threads. */
     private int workerThreads = WORKER_THREADS_DEFAULT;
 
-      /** TLS mode for OpenFlow channel; options are: disabled [default], enabled, strict. */
+    /** TLS mode for OpenFlow channel; options are: disabled [default], enabled, strict. */
     private String tlsMode;
 
     /** File path to key store for TLS connections. */
     private String keyStore;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected OpenFlowService openFlowManager;
+
+    private final OpenFlowListener openFlowListener = new InternalOpenFlowListener();
+
     /** Key store password. */
     private String keyStorePassword;
 
@@ -146,6 +172,54 @@
     /** Trust store password. */
     private String trustStorePassword;
 
+    /** Size of deafult queue. */
+    private int defaultQueueSize = DEFAULT_QUEUE_SIZE_DEFAULT;
+
+    /** Size of deafult bulk. */
+    private int defaultBulkSize = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N0. */
+    private int queueSizeN0 = QUEUE_SIZE_N0_DEFAULT;
+
+    /** Size of bulk N0. */
+    private int bulkSizeN0 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N1. */
+    private int queueSizeN1 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N1. */
+    private int bulkSizeN1 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N2. */
+    private int queueSizeN2 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N2. */
+    private int bulkSizeN2 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N3. */
+    private int queueSizeN3 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N3. */
+    private int bulkSizeN3 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N4. */
+    private int queueSizeN4 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N4. */
+    private int bulkSizeN4 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N5. */
+    private int queueSizeN5 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N5. */
+    private int bulkSizeN5 = BULK_SIZE_DEFAULT;
+
+    /** Size of queue N6. */
+    private int queueSizeN6 = QUEUE_SIZE_DEFAULT;
+
+    /** Size of bulk N6. */
+    private int bulkSizeN6 = BULK_SIZE_DEFAULT;
+
     protected ExecutorService executorMsgs =
         Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d", log));
 
@@ -179,6 +253,8 @@
 
     protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
 
+    protected Set<OpenFlowClassifierListener> ofClassifierListener = new CopyOnWriteArraySet<>();
+
     protected Set<OpenFlowMessageListener> ofMessageListener = new CopyOnWriteArraySet<>();
 
     protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
@@ -266,6 +342,7 @@
         netCfgService.addListener(netCfgListener);
         ctrl.setConfigParams(context.getProperties());
         ctrl.start(agent, driverService, netCfgService);
+        openFlowManager.addListener(openFlowListener);
     }
 
     private void cleanup() {
@@ -276,6 +353,7 @@
         connectedSwitches.clear();
         activeMasterSwitches.clear();
         activeEqualSwitches.clear();
+        openFlowManager.removeListener(openFlowListener);
     }
 
     @Deactivate
@@ -334,6 +412,16 @@
     }
 
     @Override
+    public void addClassifierListener(OpenFlowClassifierListener listener) {
+        this.ofClassifierListener.add(listener);
+    }
+
+    @Override
+    public void removeClassifierListener(OpenFlowClassifierListener listener) {
+        this.ofClassifierListener.remove(listener);
+    }
+
+    @Override
     public void addMessageListener(OpenFlowMessageListener listener) {
         ofMessageListener.add(listener);
     }
@@ -840,6 +928,16 @@
                 l.receivedRoleReply(dpid, requested, response);
             }
         }
+
+        @Override
+        public void addClassifierListener(OpenFlowClassifierListener listener) {
+            ofClassifierListener.add(listener);
+        }
+
+        @Override
+        public void removeClassifierListener(OpenFlowClassifierListener listener) {
+            ofClassifierListener.remove(listener);
+        }
     }
 
     /**
@@ -862,4 +960,28 @@
             }
         }
     }
+
+    private class InternalOpenFlowListener implements OpenFlowListener {
+        public void event(OpenFlowEvent event) {
+            try {
+                switch (event.type()) {
+                case INSERT:
+                    for (OpenFlowClassifierListener listener : ofClassifierListener) {
+                        listener.handleClassifiersAdd(event.subject());
+                    }
+                    break;
+                case REMOVE:
+                    for (OpenFlowClassifierListener listener : ofClassifierListener) {
+                        listener.handleClassifiersRemove(event.subject());
+                    }
+                    break;
+                default:
+                    log.warn("Unknown OpenFlow classifier event type: {}", event.type());
+                    break;
+                }
+            } catch (Exception e) {
+                log.error("Internal OpenFlowListener exception: {}", e.getMessage());
+            }
+        }
+    }
 }