[Falcon][ONOS-3537] Implement control message collecting logic w/ unit test

Change-Id: Ic21d476a5ad92d7ef739fa3c13dcc06e5cbf7c56
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 08444b1..098ff07 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -16,27 +16,33 @@
 
 package org.onosproject.openflow.controller.driver;
 
+import static org.onlab.util.Tools.groupedThreads;
+
 import com.google.common.collect.Lists;
 import org.jboss.netty.channel.Channel;
 import org.onlab.packet.IpAddress;
 import org.onosproject.net.Device;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowEventListener;
 import org.onosproject.openflow.controller.RoleState;
+
 import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
-import org.projectfloodlight.openflow.protocol.OFErrorMsg;
-import org.projectfloodlight.openflow.protocol.OFExperimenter;
-import org.projectfloodlight.openflow.protocol.OFFactories;
-import org.projectfloodlight.openflow.protocol.OFFactory;
 import org.projectfloodlight.openflow.protocol.OFFeaturesReply;
-import org.projectfloodlight.openflow.protocol.OFMessage;
-import org.projectfloodlight.openflow.protocol.OFNiciraControllerRoleRequest;
-import org.projectfloodlight.openflow.protocol.OFPortDesc;
 import org.projectfloodlight.openflow.protocol.OFPortDescStatsReply;
-import org.projectfloodlight.openflow.protocol.OFPortStatus;
-import org.projectfloodlight.openflow.protocol.OFRoleReply;
-import org.projectfloodlight.openflow.protocol.OFRoleRequest;
 import org.projectfloodlight.openflow.protocol.OFVersion;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFPortDesc;
+import org.projectfloodlight.openflow.protocol.OFExperimenter;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFRoleRequest;
+import org.projectfloodlight.openflow.protocol.OFNiciraControllerRoleRequest;
+import org.projectfloodlight.openflow.protocol.OFPortStatus;
+import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFRoleReply;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +52,10 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -82,6 +92,11 @@
     protected OFFeaturesReply features;
     protected OFDescStatsReply desc;
 
+    protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
+
+    protected ExecutorService executorMsgs =
+            Executors.newFixedThreadPool(2, groupedThreads("onos/of", "ctrl-msg-stats-%d"));
+
     private final AtomicReference<List<OFMessage>> messagesPendingMastership
             = new AtomicReference<>();
 
@@ -148,6 +163,15 @@
                          dpid, role, channel.isConnected(), msgs);
             }
         }
+
+        // listen to outgoing control messages
+        msgs.forEach(m -> {
+            if (m.getType() == OFType.PACKET_OUT ||
+                m.getType() == OFType.FLOW_MOD ||
+                m.getType() == OFType.STATS_REQUEST) {
+                executorMsgs.submit(new OFMessageHandler(dpid, m));
+            }
+        });
     }
 
     private void sendMsgsOnChannel(List<OFMessage> msgs) {
@@ -301,6 +325,16 @@
     }
 
     @Override
+    public void addEventListener(OpenFlowEventListener listener) {
+        ofEventListener.add(listener);
+    }
+
+    @Override
+    public void removeEventListener(OpenFlowEventListener listener) {
+        ofEventListener.remove(listener);
+    }
+
+    @Override
     public OFFactory factory() {
         return OFFactories.getFactory(ofVersion);
     }
@@ -491,4 +525,25 @@
                 ? channel.getRemoteAddress() : "?")
                 + " DPID[" + ((getStringId() != null) ? getStringId() : "?") + "]]";
     }
+
+    /**
+     * OpenFlow message handler for outgoing control messages.
+     */
+    protected final class OFMessageHandler implements Runnable {
+
+        protected final OFMessage msg;
+        protected final Dpid dpid;
+
+        public OFMessageHandler(Dpid dpid, OFMessage msg) {
+            this.msg = msg;
+            this.dpid = dpid;
+        }
+
+        @Override
+        public void run() {
+            for (OpenFlowEventListener listener : ofEventListener) {
+                listener.handleMessage(dpid, msg);
+            }
+        }
+    }
 }