Created FlowQueueTable basic structure.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 07cb719..940620e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -18,6 +18,7 @@
*/
public class FlowPusher {
private FloodlightContext context;
+ private FlowQueueTable flowQueueTable = null;
private Thread thread;
/**
@@ -26,37 +27,50 @@
* @author Naoki Shiota
*
*/
- private static class QueueState {
+ private static class RateInfo {
long last_sent_time = 0;
long last_sent_size = 0;
- long max_rate;
}
- private Map<IOFSwitch, Queue<OFMessage> > queues =
- new HashMap<IOFSwitch, Queue<OFMessage> >();
- private Map<Queue<OFMessage>, QueueState> queue_states =
- new HashMap<Queue<OFMessage>, QueueState>();
+ private Map<Long, RateInfo> queue_rateinfos =
+ new HashMap<Long, RateInfo>();
private class FlowPusherProcess implements Runnable {
@Override
public void run() {
+ if (flowQueueTable == null) {
+ return;
+ }
+
while (true) {
- for (Map.Entry<IOFSwitch, Queue<OFMessage> > entry : queues.entrySet()) {
- // pick one FlowEntry event from a queue
- IOFSwitch sw = entry.getKey();
- Queue<OFMessage> queue = entry.getValue();
- if (entry == null || queue == null) {
+ for (IOFSwitch sw : flowQueueTable.getSwitches()) {
+ // Skip if queue is suspended
+ if (flowQueueTable.isQueueSusupended(sw)) {
+ continue;
+ }
+
+ // Skip if queue is locked
+ if (! flowQueueTable.lockQueueIfAvailable(sw)) {
+ continue;
+ }
+
+ long dpid = sw.getId();
+ Queue<OFMessage> queue = flowQueueTable.getQueue(sw);
+
+ if (queue == null) {
+ flowQueueTable.unlockQueue(sw);
continue;
}
OFMessage msg = queue.poll();
if (msg == null) {
+ flowQueueTable.unlockQueue(sw);
continue;
}
- QueueState state = queue_states.get(queue);
+ RateInfo state = queue_rateinfos.get(dpid);
if (state == null) {
- continue;
+ queue_rateinfos.put(dpid, new RateInfo());
}
// check sending rate and determine it to be sent or not
@@ -64,7 +78,7 @@
long rate = state.last_sent_size / (current_time - state.last_sent_time);
// if need to send, call IOFSwitch#write()
- if (rate < state.max_rate) {
+ if (rate < flowQueueTable.getQueueRate(sw)) {
try {
sw.write(msg, context);
state.last_sent_time = current_time;
@@ -74,6 +88,8 @@
e.printStackTrace();
}
}
+
+ flowQueueTable.unlockQueue(sw);
}
// sleep while all queues are empty
@@ -85,23 +101,20 @@
}
}
- public FlowPusher(FloodlightContext context) {
+ public FlowPusher(FlowQueueTable table, FloodlightContext context) {
+ flowQueueTable = table;
this.context = context;
}
- public void assignQueue(IOFSwitch sw, Queue<OFMessage> queue, long max_rate) {
- queues.put(sw, queue);
- QueueState state = new QueueState();
- state.max_rate = max_rate;
- queue_states.put(queue, state);
- }
-
public void startProcess() {
thread = new Thread(new FlowPusherProcess());
thread.start();
}
public void stopProcess() {
- // TODO tell thread to halt
+ if (thread != null && thread.isAlive()) {
+ // TODO tell thread to halt
+ }
}
+
}