Added JavaDoc comments for FlowPusher and FlowSynchronizer
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index f43a83e..ba3602c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -48,6 +48,7 @@
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ // If this flag is true, barrier message will be sent when queue goes empty.
private static boolean barrierIfEmpty = false;
// NOTE: Below are moved from FlowManager.
@@ -57,10 +58,6 @@
protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
- // Interval of sleep when queue is empty
- protected static final long SLEEP_MILLI_SEC = 10;
- protected static final int SLEEP_NANO_SEC = 0;
-
// Number of messages sent to switch at once
protected static final int MAX_MESSAGE_SEND = 100;
@@ -83,11 +80,14 @@
private class SwitchQueue extends ArrayDeque<OFMessage> {
QueueState state;
- // Max rate of sending message (bytes/sec). 0 implies no limitation.
+ // Max rate of sending message (bytes/ms). 0 implies no limitation.
long max_rate = 0; // 0 indicates no limitation
long last_sent_time = 0;
long last_sent_size = 0;
+ // "To be deleted" flag
+ boolean toBeDeleted = false;
+
/**
* Check if sending rate is within the rate
* @param current Current time
@@ -104,7 +104,8 @@
}
// Check if sufficient time (from aspect of rate) elapsed or not.
- long rate = last_sent_size / (current - last_sent_time);
+ long interval = (current - last_sent_time) / 1000000;
+ long rate = last_sent_size / interval;
return (rate < max_rate);
}
@@ -200,8 +201,15 @@
sw.flush();
queue.logSentData(current_time, size);
- if (queue.isEmpty() && barrierIfEmpty) {
- barrier(sw);
+ if (queue.isEmpty()) {
+ if (barrierIfEmpty) {
+ barrier(sw);
+ }
+ if (queue.toBeDeleted) {
+ synchronized (queues) {
+ queues.remove(sw);
+ }
+ }
}
}
}
@@ -341,11 +349,7 @@
}
}
- /**
- * Set sending rate to a switch.
- * @param sw Switch.
- * @param rate Rate in bytes/sec.
- */
+ @Override
public void setRate(IOFSwitch sw, long rate) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
@@ -356,6 +360,52 @@
queue.max_rate = rate;
}
}
+
+ @Override
+ public boolean createQueue(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue != null) {
+ return false;
+ }
+
+ FlowPusherThread proc = getProcess(sw);
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ synchronized (proc) {
+ proc.queues.put(sw, queue);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw) {
+ return deleteQueue(sw, false);
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw, boolean force) {
+ FlowPusherThread proc = getProcess(sw);
+
+ if (force) {
+ synchronized (proc.queues) {
+ SwitchQueue queue = proc.queues.remove(sw);
+ if (queue == null) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return false;
+ }
+ synchronized (queue) {
+ queue.toBeDeleted = true;
+ }
+ return true;
+ }
+ }
/**
* Add OFMessage to queue of the switch.
@@ -368,12 +418,10 @@
FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
+ // create queue at first call
if (queue == null) {
- queue = new SwitchQueue();
- queue.state = QueueState.READY;
- synchronized (proc) {
- proc.queues.put(sw, queue);
- }
+ createQueue(sw);
+ queue = getQueue(sw);
}
synchronized (queue) {
@@ -705,7 +753,7 @@
* @param sw Switch to which message is sent.
* @param flowPath FlowPath.
* @param flowEntry FlowEntry.
- * @return true if secceed.
+ * @return true if succeed.
*/
@Override
public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
@@ -1061,12 +1109,16 @@
barrierFutures.put(sw.getId(), map);
}
map.put(msg.getXid(), future);
- log.debug("Inserted future for {}", msg.getXid());
}
return future;
}
+ /**
+ * Get a queue attached to a switch.
+ * @param sw Switch object
+ * @return Queue object
+ */
protected SwitchQueue getQueue(IOFSwitch sw) {
if (sw == null) {
return null;
@@ -1075,12 +1127,22 @@
return getProcess(sw).queues.get(sw);
}
+ /**
+ * Get a hash value correspondent to a switch.
+ * @param sw Switch object
+ * @return Hash value
+ */
protected long getHash(IOFSwitch sw) {
// This code assumes DPID is sequentially assigned.
// TODO consider equalization algorithm
return sw.getId() % number_thread;
}
-
+
+ /**
+ * Get a Thread object which processes the queue attached to a switch.
+ * @param sw Switch object
+ * @return Thread object
+ */
protected FlowPusherThread getProcess(IOFSwitch sw) {
long hash = getHash(sw);
@@ -1119,4 +1181,5 @@
return Command.CONTINUE;
}
+
}