Added barrier function to FlowPusher.
Fixed divided by zero in FlowPusher.
Replaced Runnable in FlowPusher by Thread.
Replaced Thread.sleep() in FlowPusher by Semaphore.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 6379f26..f43a83e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -47,6 +48,8 @@
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ private static boolean barrierIfEmpty = false;
+
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
@@ -96,11 +99,20 @@
return true;
}
+ if (current == last_sent_time) {
+ return false;
+ }
+
// Check if sufficient time (from aspect of rate) elapsed or not.
long rate = last_sent_size / (current - last_sent_time);
return (rate < max_rate);
}
+ /**
+ * Log time and size of last sent data.
+ * @param current Time to be sent.
+ * @param size Size of sent data (in bytes).
+ */
void logSentData(long current, long size) {
last_sent_time = current;
last_sent_size = size;
@@ -113,7 +125,7 @@
private FloodlightContext context = null;
private BasicFactory factory = null;
- private Map<Long, FlowPusherProcess> threadMap = null;
+ private Map<Long, FlowPusherThread> threadMap = null;
private Map<Long, Map<Integer, OFBarrierReplyFuture>>
barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
@@ -124,26 +136,31 @@
* @author Naoki Shiota
*
*/
- private class FlowPusherProcess implements Runnable {
+ private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
= new HashMap<IOFSwitch,SwitchQueue>();
- private boolean isStopped = false;
- private boolean isMsgAdded = false;
+ private Semaphore mutex = new Semaphore(0);
@Override
public void run() {
log.debug("Begin Flow Pusher Process");
while (true) {
+ try {
+ // wait for message pushed to queue
+ mutex.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.debug("FlowPusherThread is interrupted");
+ return;
+ }
+
Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
synchronized (queues) {
entries = queues.entrySet();
}
- // Set taint flag to false at this moment.
- isMsgAdded = false;
-
for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
IOFSwitch sw = entry.getKey();
SwitchQueue queue = entry.getValue();
@@ -163,15 +180,14 @@
int i = 0;
while (! queue.isEmpty()) {
// Number of messages excess the limit
- if (++i >= MAX_MESSAGE_SEND) {
+ if (i >= MAX_MESSAGE_SEND) {
// Messages remains in queue
- isMsgAdded = true;
+ mutex.release();
break;
}
+ ++i;
OFMessage msg = queue.poll();
-
- // if need to send, call IOFSwitch#write()
try {
messageDamper.write(sw, msg, context);
log.debug("Pusher sends message : {}", msg);
@@ -183,27 +199,13 @@
}
sw.flush();
queue.logSentData(current_time, size);
+
+ if (queue.isEmpty() && barrierIfEmpty) {
+ barrier(sw);
+ }
}
}
}
-
- // sleep while all queues are empty
- while (! (isMsgAdded || isStopped)) {
- try {
- Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
- } catch (InterruptedException e) {
- e.printStackTrace();
- log.error("Thread.sleep failed");
- }
- }
-
- log.debug("Exit sleep loop.");
-
- if (isStopped) {
- log.debug("Pusher Process finished.");
- return;
- }
-
}
}
}
@@ -261,12 +263,11 @@
return;
}
- threadMap = new HashMap<Long,FlowPusherProcess>();
+ threadMap = new HashMap<Long,FlowPusherThread>();
for (long i = 0; i < number_thread; ++i) {
- FlowPusherProcess runnable = new FlowPusherProcess();
- threadMap.put(i, runnable);
+ FlowPusherThread thread = new FlowPusherThread();
- Thread thread = new Thread(runnable);
+ threadMap.put(i, thread);
thread.start();
}
}
@@ -335,10 +336,8 @@
return;
}
- for (FlowPusherProcess runnable : threadMap.values()) {
- if (! runnable.isStopped) {
- runnable.isStopped = true;
- }
+ for (FlowPusherThread t : threadMap.values()) {
+ t.interrupt();
}
}
@@ -366,7 +365,7 @@
*/
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
- FlowPusherProcess proc = getProcess(sw);
+ FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
if (queue == null) {
@@ -382,8 +381,10 @@
log.debug("Message is pushed : {}", msg);
}
- proc.isMsgAdded = true;
-
+ if (proc.mutex.availablePermits() == 0) {
+ proc.mutex.release();
+ }
+
return true;
}
@@ -1041,14 +1042,18 @@
@Override
public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
// TODO creation of message and future should be moved to OFSwitchImpl
+
+ if (sw == null) {
+ return null;
+ }
- OFMessage msg = factory.getMessage(OFType.BARRIER_REQUEST);
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
msg.setXid(sw.getNextTransactionId());
add(sw, msg);
// TODO create Future object of message
OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
-
+
synchronized (barrierFutures) {
Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
if (map == null) {
@@ -1056,7 +1061,7 @@
barrierFutures.put(sw.getId(), map);
}
map.put(msg.getXid(), future);
- log.debug("Created future for {}", msg.getXid());
+ log.debug("Inserted future for {}", msg.getXid());
}
return future;
@@ -1076,7 +1081,7 @@
return sw.getId() % number_thread;
}
- protected FlowPusherProcess getProcess(IOFSwitch sw) {
+ protected FlowPusherThread getProcess(IOFSwitch sw) {
long hash = getHash(sw);
return threadMap.get(hash);
@@ -1099,11 +1104,6 @@
@Override
public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
- // This check can be skipped (since Controller must filter).
-// if (msg.getType() != OFType.BARRIER_REPLY) {
-// return Command.CONTINUE;
-// }
-
Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
if (map == null) {
return Command.CONTINUE;