aggregate flow replies on io thread
Change-Id: I622290f213ee830cfab7e4bd4ad7a52f612b475e
diff --git a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 9ac8b5b..edd2f3a 100644
--- a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -17,6 +17,7 @@
import static org.onlab.util.Tools.namedThreads;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -25,6 +26,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -41,11 +43,15 @@
import org.onosproject.openflow.controller.driver.OpenFlowAgent;
import org.projectfloodlight.openflow.protocol.OFCircuitPortStatus;
import org.projectfloodlight.openflow.protocol.OFExperimenter;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
+import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPacketIn;
import org.projectfloodlight.openflow.protocol.OFPortDesc;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.projectfloodlight.openflow.protocol.OFStatsReply;
+import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
import org.projectfloodlight.openflow.protocol.OFStatsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +85,9 @@
protected Set<OpenFlowEventListener> ofEventListener = Sets.newHashSet();
+ protected Multimap<Dpid, OFFlowStatsEntry> fullStats =
+ ArrayListMultimap.create();
+
private final Controller ctrl = new Controller();
@Activate
@@ -160,6 +169,7 @@
@Override
public void processPacket(Dpid dpid, OFMessage msg) {
+ Collection<OFFlowStatsEntry> stats;
switch (msg.getType()) {
case PORT_STATUS:
for (OpenFlowSwitchListener l : ofSwitchListener) {
@@ -186,7 +196,15 @@
l.switchChanged(dpid);
}
}
- // fall through to invoke handler
+ stats = publishStats(dpid, reply);
+ if (stats != null) {
+ OFFlowStatsReply.Builder rep =
+ OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
+ rep.setEntries(Lists.newLinkedList(stats));
+ executor.submit(new OFMessageHandler(dpid, rep.build()));
+ }
+ break;
+
case FLOW_REMOVED:
case ERROR:
case BARRIER_REPLY:
@@ -218,6 +236,20 @@
}
}
+ private synchronized Collection<OFFlowStatsEntry> publishStats(Dpid dpid,
+ OFStatsReply reply) {
+ //TODO: Get rid of synchronized
+ if (reply.getStatsType() != OFStatsType.FLOW) {
+ return null;
+ }
+ final OFFlowStatsReply replies = (OFFlowStatsReply) reply;
+ fullStats.putAll(dpid, replies.getEntries());
+ if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
+ return fullStats.removeAll(dpid);
+ }
+ return null;
+ }
+
@Override
public void setRole(Dpid dpid, RoleState role) {
final OpenFlowSwitch sw = getSwitch(dpid);