Created FlowQueueTable basic structure.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 07cb719..940620e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -18,6 +18,7 @@
*/
public class FlowPusher {
private FloodlightContext context;
+ private FlowQueueTable flowQueueTable = null;
private Thread thread;
/**
@@ -26,37 +27,50 @@
* @author Naoki Shiota
*
*/
- private static class QueueState {
+ private static class RateInfo {
long last_sent_time = 0;
long last_sent_size = 0;
- long max_rate;
}
- private Map<IOFSwitch, Queue<OFMessage> > queues =
- new HashMap<IOFSwitch, Queue<OFMessage> >();
- private Map<Queue<OFMessage>, QueueState> queue_states =
- new HashMap<Queue<OFMessage>, QueueState>();
+ private Map<Long, RateInfo> queue_rateinfos =
+ new HashMap<Long, RateInfo>();
private class FlowPusherProcess implements Runnable {
@Override
public void run() {
+ if (flowQueueTable == null) {
+ return;
+ }
+
while (true) {
- for (Map.Entry<IOFSwitch, Queue<OFMessage> > entry : queues.entrySet()) {
- // pick one FlowEntry event from a queue
- IOFSwitch sw = entry.getKey();
- Queue<OFMessage> queue = entry.getValue();
- if (entry == null || queue == null) {
+ for (IOFSwitch sw : flowQueueTable.getSwitches()) {
+ // Skip if queue is suspended
+ if (flowQueueTable.isQueueSusupended(sw)) {
+ continue;
+ }
+
+ // Skip if queue is locked
+ if (! flowQueueTable.lockQueueIfAvailable(sw)) {
+ continue;
+ }
+
+ long dpid = sw.getId();
+ Queue<OFMessage> queue = flowQueueTable.getQueue(sw);
+
+ if (queue == null) {
+ flowQueueTable.unlockQueue(sw);
continue;
}
OFMessage msg = queue.poll();
if (msg == null) {
+ flowQueueTable.unlockQueue(sw);
continue;
}
- QueueState state = queue_states.get(queue);
+ RateInfo state = queue_rateinfos.get(dpid);
if (state == null) {
- continue;
+ queue_rateinfos.put(dpid, new RateInfo());
}
// check sending rate and determine it to be sent or not
@@ -64,7 +78,7 @@
long rate = state.last_sent_size / (current_time - state.last_sent_time);
// if need to send, call IOFSwitch#write()
- if (rate < state.max_rate) {
+ if (rate < flowQueueTable.getQueueRate(sw)) {
try {
sw.write(msg, context);
state.last_sent_time = current_time;
@@ -74,6 +88,8 @@
e.printStackTrace();
}
}
+
+ flowQueueTable.unlockQueue(sw);
}
// sleep while all queues are empty
@@ -85,23 +101,20 @@
}
}
- public FlowPusher(FloodlightContext context) {
+ public FlowPusher(FlowQueueTable table, FloodlightContext context) {
+ flowQueueTable = table;
this.context = context;
}
- public void assignQueue(IOFSwitch sw, Queue<OFMessage> queue, long max_rate) {
- queues.put(sw, queue);
- QueueState state = new QueueState();
- state.max_rate = max_rate;
- queue_states.put(queue, state);
- }
-
public void startProcess() {
thread = new Thread(new FlowPusherProcess());
thread.start();
}
public void stopProcess() {
- // TODO tell thread to halt
+ if (thread != null && thread.isAlive()) {
+ // TODO tell thread to halt
+ }
}
+
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowQueueTable.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowQueueTable.java
new file mode 100644
index 0000000..d3a6318
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowQueueTable.java
@@ -0,0 +1,210 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.protocol.OFMessage;
+
+/**
+ * Represents table of message queues attached to each switch.
+ * Each message should be ADD/DELETE of flow.
+ * (MODIFY of flow might be handled, but future work)
+ * @author Naoki Shiota
+ *
+ */
+public class FlowQueueTable {
+
+ public enum QueueState {
+ SYNCHRONIZED,
+ SYNCHRONIZING,
+ DELETED; // not in work and to be deleted
+ }
+
+ private class QueueInfo {
+ QueueState state;
+
+ // Max rate of sending message (bytes/sec). 0 implies no limitation.
+ long max_rate = 0;
+
+ // Is sending message suspended or not.
+ boolean suspended = false;
+ }
+
+ private Map< IOFSwitch, Queue<OFMessage> > queues
+ = new HashMap< IOFSwitch, Queue<OFMessage> >();
+ private Map<IOFSwitch, QueueInfo> queue_info
+ = new HashMap<IOFSwitch, QueueInfo>();
+
+ public FlowQueueTable() {
+ // TODO Auto-generated constructor stub
+ }
+
+ /**
+ * Add flow queue for given switch.
+ * Note queue should be given by caller so that caller can select data
+ * structure suitable for its processing.
+ * @param sw
+ * @param queue
+ */
+ public void addSwitchQueue(IOFSwitch sw, Queue<OFMessage> queue) {
+ QueueInfo info = new QueueInfo();
+
+ if (queues.containsKey(sw)) {
+ return;
+ }
+
+ queues.put(sw, queue);
+ queue_info.put(sw, info);
+ }
+
+ /**
+ * Delete flow queue for given switch.
+ * @param sw
+ */
+ public void deleteSwitchQueue(IOFSwitch sw) {
+ if (! queues.containsKey(sw)) {
+ return;
+ }
+
+ queues.remove(sw);
+ queue_info.remove(sw);
+ }
+
+ /**
+ * Get flow queue for given switch.
+ * @param sw
+ * @return
+ */
+ public Queue<OFMessage> getQueue(IOFSwitch sw) {
+ return queues.get(sw);
+ }
+
+ public Set<IOFSwitch> getSwitches() {
+ return queues.keySet();
+ }
+
+ /**
+ * Get state of flow queue for given switch.
+ * @param sw
+ */
+ public QueueState getQueueState(IOFSwitch sw) {
+ QueueInfo info = queue_info.get(sw);
+ if (info == null) {
+ return null;
+ }
+
+ return info.state;
+ }
+
+ /**
+ * Set state of flow queue for given switch.
+ * @param sw
+ * @param state
+ */
+ public void setQueueState(IOFSwitch sw, QueueState state) {
+ QueueInfo info = queue_info.get(sw);
+ if (info == null) {
+ return;
+ }
+
+ info.state = state;
+ }
+
+ /**
+ * Get maximum rate for given switch.
+ * @param sw
+ */
+ public long getQueueRate(IOFSwitch sw) {
+ QueueInfo info = queue_info.get(sw);
+ if (info == null) {
+ return 0;
+ }
+
+ return info.max_rate;
+ }
+
+ /**
+ * Set maximum rate for given switch.
+ * @param sw
+ * @param rate
+ */
+ public void setQueueRate(IOFSwitch sw, long rate) {
+ QueueInfo info = queue_info.get(sw);
+ if (info == null) {
+ return;
+ }
+
+ info.max_rate = rate;
+ }
+
+ /**
+ * Suspend sending message of a queue for given switch.
+ * @param sw
+ */
+ public void suspendQueue(IOFSwitch sw) {
+ setQueueSuspended(sw, true);
+ }
+
+ /**
+ * Resume sending message of a queue for given switch.
+ * @param sw
+ */
+ public void resumeQueue(IOFSwitch sw) {
+ setQueueSuspended(sw, false);
+ }
+
+ /**
+ * Check if queue is suspended or not.
+ * @param sw
+ * @return
+ */
+ public boolean isQueueSusupended(IOFSwitch sw) {
+ QueueInfo info = queue_info.get(sw);
+ if (info == null) {
+ // TODO error handling
+ return true;
+ }
+
+ return info.suspended;
+ }
+
+ private void setQueueSuspended(IOFSwitch sw, boolean suspended) {
+ QueueInfo info = queue_info.get(sw);
+ if (info == null) {
+ return;
+ }
+
+ info.suspended =suspended;
+ }
+
+ /**
+ * Get a lock for queue for given switch.
+ * If locked already, wait for unlock.
+ * @param sw
+ */
+ public void lockQueue(IOFSwitch sw) {
+ // TODO not yet implement
+ }
+
+ /**
+ * Get a lock for queue for given switch.
+ * If locked already, return false at once.
+ * @param sw
+ * @return
+ */
+ public boolean lockQueueIfAvailable(IOFSwitch sw) {
+ // TODO not yet implement
+ return false;
+ }
+
+ /** Release a lock for queue for given switch.
+ * @param sw
+ */
+ public void unlockQueue(IOFSwitch sw) {
+ // TODO not yet implement
+ }
+}