Added JavaDoc comments for FlorProgrammer related modules.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index ba3602c..6f85c5d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -40,17 +40,20 @@
import net.onrc.onos.ofcontroller.util.Port;
/**
- * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
- * messages to switches in proper rate.
+ * FlowPusher is a implementation of FlowPusherService.
+ * FlowPusher assigns one message queue instance for each one switch.
+ * Number of message processing threads is configurable by constructor, and
+ * one thread can handle multiple message queues. Each queue will be assigned to
+ * a thread according to hash function defined by getHash().
+ * Each processing thread reads messages from queues and sends it to switches
+ * in round-robin. Processing thread also calculates rate of sending to suppress
+ * excessive message sending.
* @author Naoki Shiota
*
*/
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
- // If this flag is true, barrier message will be sent when queue goes empty.
- private static boolean barrierIfEmpty = false;
-
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
@@ -71,7 +74,7 @@
}
/**
- * Message queue attached to a switch.
+ * SwitchQueue represents message queue attached to a switch.
* This consists of queue itself and variables used for limiting sending rate.
* @author Naoki Shiota
*
@@ -104,8 +107,7 @@
}
// Check if sufficient time (from aspect of rate) elapsed or not.
- long interval = (current - last_sent_time) / 1000000;
- long rate = last_sent_size / interval;
+ long rate = last_sent_size / (current - last_sent_time);
return (rate < max_rate);
}
@@ -126,7 +128,10 @@
private FloodlightContext context = null;
private BasicFactory factory = null;
+
+ // Map of threads versus dpid
private Map<Long, FlowPusherThread> threadMap = null;
+ // Map of Future objects versus dpid and transaction ID.
private Map<Long, Map<Integer, OFBarrierReplyFuture>>
barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
@@ -139,14 +144,12 @@
*/
private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
- = new HashMap<IOFSwitch,SwitchQueue>();
+ = new HashMap<IOFSwitch,SwitchQueue>();
private Semaphore mutex = new Semaphore(0);
@Override
public void run() {
- log.debug("Begin Flow Pusher Process");
-
while (true) {
try {
// wait for message pushed to queue
@@ -173,7 +176,7 @@
}
// check sending rate and determine it to be sent or not
- long current_time = System.nanoTime();
+ long current_time = System.currentTimeMillis();
long size = 0;
synchronized (queue) {
@@ -202,9 +205,7 @@
queue.logSentData(current_time, size);
if (queue.isEmpty()) {
- if (barrierIfEmpty) {
- barrier(sw);
- }
+ // remove queue if flagged to be.
if (queue.toBeDeleted) {
synchronized (queues) {
queues.remove(sw);
@@ -255,7 +256,7 @@
if (damper != null) {
messageDamper = damper;
} else {
- // use default value
+ // use default values
messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
EnumSet.of(OFType.FLOW_MOD),
OFMESSAGE_DAMPER_TIMEOUT);
@@ -280,10 +281,6 @@
}
}
- /**
- * Suspend sending messages to switch.
- * @param sw
- */
@Override
public boolean suspend(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -301,9 +298,6 @@
}
}
- /**
- * Resume sending messages to switch.
- */
@Override
public boolean resume(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -321,9 +315,6 @@
}
}
- /**
- * Check if given switch is suspended.
- */
@Override
public boolean isSuspended(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -384,10 +375,10 @@
}
@Override
- public boolean deleteQueue(IOFSwitch sw, boolean force) {
+ public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
FlowPusherThread proc = getProcess(sw);
- if (force) {
+ if (forceStop) {
synchronized (proc.queues) {
SwitchQueue queue = proc.queues.remove(sw);
if (queue == null) {
@@ -407,18 +398,12 @@
}
}
- /**
- * Add OFMessage to queue of the switch.
- * @param sw Switch to which message is sent.
- * @param msg Message to be sent.
- * @return true if succeed.
- */
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
- // create queue at first call
+ // create queue at first addition of message
if (queue == null) {
createQueue(sw);
queue = getQueue(sw);
@@ -436,13 +421,6 @@
return true;
}
- /**
- * Create OFMessage from given flow information and add it to the queue.
- * @param sw Switch to which message is sent.
- * @param flowObj FlowPath.
- * @param flowEntryObj FlowEntry.
- * @return true if succeed.
- */
@Override
public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
log.debug("sending : {}, {}", sw, flowObj);
@@ -748,13 +726,6 @@
return true;
}
- /**
- * Create OFMessage from given flow information and add it to the queue.
- * @param sw Switch to which message is sent.
- * @param flowPath FlowPath.
- * @param flowEntry FlowEntry.
- * @return true if succeed.
- */
@Override
public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
//