[Falcon][ONOS-3537] Implement control message collecting logic w/ unit test
Change-Id: Ic21d476a5ad92d7ef739fa3c13dcc06e5cbf7c56
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 08444b1..098ff07 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -16,27 +16,33 @@
package org.onosproject.openflow.controller.driver;
+import static org.onlab.util.Tools.groupedThreads;
+
import com.google.common.collect.Lists;
import org.jboss.netty.channel.Channel;
import org.onlab.packet.IpAddress;
import org.onosproject.net.Device;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowEventListener;
import org.onosproject.openflow.controller.RoleState;
+
import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
-import org.projectfloodlight.openflow.protocol.OFErrorMsg;
-import org.projectfloodlight.openflow.protocol.OFExperimenter;
-import org.projectfloodlight.openflow.protocol.OFFactories;
-import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFFeaturesReply;
-import org.projectfloodlight.openflow.protocol.OFMessage;
-import org.projectfloodlight.openflow.protocol.OFNiciraControllerRoleRequest;
-import org.projectfloodlight.openflow.protocol.OFPortDesc;
import org.projectfloodlight.openflow.protocol.OFPortDescStatsReply;
-import org.projectfloodlight.openflow.protocol.OFPortStatus;
-import org.projectfloodlight.openflow.protocol.OFRoleReply;
-import org.projectfloodlight.openflow.protocol.OFRoleRequest;
import org.projectfloodlight.openflow.protocol.OFVersion;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFPortDesc;
+import org.projectfloodlight.openflow.protocol.OFExperimenter;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFRoleRequest;
+import org.projectfloodlight.openflow.protocol.OFNiciraControllerRoleRequest;
+import org.projectfloodlight.openflow.protocol.OFPortStatus;
+import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFRoleReply;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +52,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -82,6 +92,11 @@
protected OFFeaturesReply features;
protected OFDescStatsReply desc;
+ protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
+
+ protected ExecutorService executorMsgs =
+ Executors.newFixedThreadPool(2, groupedThreads("onos/of", "ctrl-msg-stats-%d"));
+
private final AtomicReference<List<OFMessage>> messagesPendingMastership
= new AtomicReference<>();
@@ -148,6 +163,15 @@
dpid, role, channel.isConnected(), msgs);
}
}
+
+ // listen to outgoing control messages
+ msgs.forEach(m -> {
+ if (m.getType() == OFType.PACKET_OUT ||
+ m.getType() == OFType.FLOW_MOD ||
+ m.getType() == OFType.STATS_REQUEST) {
+ executorMsgs.submit(new OFMessageHandler(dpid, m));
+ }
+ });
}
private void sendMsgsOnChannel(List<OFMessage> msgs) {
@@ -301,6 +325,16 @@
}
@Override
+ public void addEventListener(OpenFlowEventListener listener) {
+ ofEventListener.add(listener);
+ }
+
+ @Override
+ public void removeEventListener(OpenFlowEventListener listener) {
+ ofEventListener.remove(listener);
+ }
+
+ @Override
public OFFactory factory() {
return OFFactories.getFactory(ofVersion);
}
@@ -491,4 +525,25 @@
? channel.getRemoteAddress() : "?")
+ " DPID[" + ((getStringId() != null) ? getStringId() : "?") + "]]";
}
+
+ /**
+ * OpenFlow message handler for outgoing control messages.
+ */
+ protected final class OFMessageHandler implements Runnable {
+
+ protected final OFMessage msg;
+ protected final Dpid dpid;
+
+ public OFMessageHandler(Dpid dpid, OFMessage msg) {
+ this.msg = msg;
+ this.dpid = dpid;
+ }
+
+ @Override
+ public void run() {
+ for (OpenFlowEventListener listener : ofEventListener) {
+ listener.handleMessage(dpid, msg);
+ }
+ }
+ }
}