Refactor the logic on collecting OpenFlow message statistics

Change-Id: I34c209c0ca90cb094ed5f82c96a8a43d3519b807
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
index c516055..df85811 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowController.java
@@ -65,13 +65,6 @@
     OpenFlowSwitch getEqualSwitch(Dpid dpid);
 
     /**
-     * If this set to be true, all incoming events are monitored.
-     * Other wise, only stats related incoming events are monitored
-     * @param monitor monitoring flag
-     */
-    void monitorAllEvents(boolean monitor);
-
-    /**
      * Register a listener for meta events that occur to OF
      * devices.
      * @param listener the listener to notify
@@ -86,6 +79,20 @@
     void removeListener(OpenFlowSwitchListener listener);
 
     /**
+     * Register a listener for all OF msg types.
+     *
+     * @param listener the listener to notify
+     */
+    void addMessageListener(OpenFlowMessageListener listener);
+
+    /**
+     * Unregister a listener for all OF msg types.
+     *
+     * @param listener the listener to notify
+     */
+    void removeMessageListener(OpenFlowMessageListener listener);
+
+    /**
      * Register a listener for packet events.
      * @param priority the importance of this listener, lower values are more important
      * @param listener the listener to notify
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowMessageListener.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowMessageListener.java
new file mode 100644
index 0000000..1442e06
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowMessageListener.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import org.projectfloodlight.openflow.protocol.OFMessage;
+
+import java.util.List;
+
+/**
+ * Notifies providers about all OpenFlow messages.
+ */
+public interface OpenFlowMessageListener {
+
+    /**
+     * Handles all incoming OpenFlow messages.
+     *
+     * @param dpid the switch where the message generated
+     * @param msg raw OpenFlow message
+     */
+    void handleIncomingMessage(Dpid dpid, OFMessage msg);
+
+    /**
+     * Handles all outgoing OpenFlow messages.
+     *
+     * @param dpid the switch where the message to be sent
+     * @param msgs a collection of raw OpenFlow message
+     */
+    void handleOutgoingMessage(Dpid dpid, List<OFMessage> msgs);
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
index 0b11f9c..206ffad 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
@@ -157,18 +157,4 @@
      * @return string representation of the connection to the device
      */
     String channelId();
-
-    /**
-     * Registers a listener for OF msg events.
-     *
-     * @param listener the listener to notify
-     */
-    void addEventListener(OpenFlowEventListener listener);
-
-    /**
-     * Unregisters a listener.
-     *
-     * @param listener the listener to unregister
-     */
-    void removeEventListener(OpenFlowEventListener listener);
 }
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index c3d6f44..465dd06 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -22,7 +22,6 @@
 import org.onosproject.net.Device;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.openflow.controller.Dpid;
-import org.onosproject.openflow.controller.OpenFlowEventListener;
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
@@ -37,7 +36,6 @@
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
 import org.projectfloodlight.openflow.protocol.OFRoleReply;
 import org.projectfloodlight.openflow.protocol.OFRoleRequest;
-import org.projectfloodlight.openflow.protocol.OFType;
 import org.projectfloodlight.openflow.protocol.OFVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,16 +46,10 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static org.onlab.util.Tools.groupedThreads;
-
 /**
  * An abstract representation of an OpenFlow switch. Can be extended by others
  * to serve as a base for their vendor specific representation of a switch.
@@ -90,11 +82,6 @@
     protected OFFeaturesReply features;
     protected OFDescStatsReply desc;
 
-    protected Set<OpenFlowEventListener> ofOutgoingMsgListener = new CopyOnWriteArraySet<>();
-
-    protected ExecutorService executorMsgs =
-            Executors.newCachedThreadPool(groupedThreads("onos/of", "event-outgoing-msg-stats-%d", log));
-
     // messagesPendingMastership is used as synchronization variable for
     // all mastership related changes. In this block, mastership (including
     // role update) will have either occurred or not.
@@ -133,9 +120,9 @@
            a synchronization primitive, because the message would have just been
            dropped anyway.
         */
+
         if (role == RoleState.MASTER) {
             // fast path send when we are master
-
             sendMsgsOnChannel(msgs);
             return;
         }
@@ -166,23 +153,10 @@
         }
     }
 
-    private void countOutgoingMsg(List<OFMessage> msgs) {
-        // listen to outgoing control messages only if listeners are registered
-        if (ofOutgoingMsgListener.size() != 0) {
-            msgs.forEach(m -> {
-                if (m.getType() == OFType.PACKET_OUT ||
-                        m.getType() == OFType.FLOW_MOD ||
-                        m.getType() == OFType.STATS_REQUEST) {
-                    executorMsgs.execute(new OFMessageHandler(dpid, m));
-                }
-            });
-        }
-    }
-
     private void sendMsgsOnChannel(List<OFMessage> msgs) {
         if (channel.isConnected()) {
             channel.write(msgs);
-            countOutgoingMsg(msgs);
+            agent.processDownstreamMessage(dpid, msgs);
         } else {
             log.warn("Dropping messages for switch {} because channel is not connected: {}",
                      dpid, msgs);
@@ -335,16 +309,6 @@
     }
 
     @Override
-    public void addEventListener(OpenFlowEventListener listener) {
-        ofOutgoingMsgListener.add(listener);
-    }
-
-    @Override
-    public void removeEventListener(OpenFlowEventListener listener) {
-        ofOutgoingMsgListener.remove(listener);
-    }
-
-    @Override
     public OFFactory factory() {
         return OFFactories.getFactory(ofVersion);
     }
@@ -535,25 +499,4 @@
                 ? channel.getRemoteAddress() : "?")
                 + " DPID[" + ((getStringId() != null) ? getStringId() : "?") + "]]";
     }
-
-    /**
-     * OpenFlow message handler for outgoing control messages.
-     */
-    protected final class OFMessageHandler implements Runnable {
-
-        protected final OFMessage msg;
-        protected final Dpid dpid;
-
-        public OFMessageHandler(Dpid dpid, OFMessage msg) {
-            this.msg = msg;
-            this.dpid = dpid;
-        }
-
-        @Override
-        public void run() {
-            for (OpenFlowEventListener listener : ofOutgoingMsgListener) {
-                listener.handleMessage(dpid, msg);
-            }
-        }
-    }
 }
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
index 6f918e8..964c79a 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowAgent.java
@@ -20,6 +20,8 @@
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFMessage;
 
+import java.util.List;
+
 /**
  * Responsible for keeping track of the current set of switches
  * connected to the system. As well as whether they are in Master
@@ -84,6 +86,14 @@
     void removeConnectedSwitch(Dpid dpid);
 
     /**
+     * Notify OpenFlow message listeners on all outgoing message event.
+     *
+     * @param dpid the dpid the message sent to
+     * @param m the collection of messages to sent out
+     */
+    void processDownstreamMessage(Dpid dpid, List<OFMessage> m);
+
+    /**
      * Process a message coming from a switch.
      *
      * @param dpid the dpid the message came on.