Modified FlowManager to use FlowPusher instead of OFMessageDamper and FlowSwitchOperation.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 6c7a2bb..2511a3f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -7,6 +7,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -39,10 +40,15 @@
//
protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
+
+ protected static final long SLEEP_MILLI_SEC = 3;
+ protected static final int SLEEP_NANO_SEC = 0;
public static final short PRIORITY_DEFAULT = 100;
public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
+
+
public enum QueueState {
READY,
@@ -53,7 +59,7 @@
QueueState state;
// Max rate of sending message (bytes/sec). 0 implies no limitation.
- long max_rate = 0;
+ long max_rate = Long.MAX_VALUE;
long last_sent_time = 0;
long last_sent_size = 0;
@@ -88,13 +94,25 @@
private BasicFactory factory = null;
private Thread thread = null;
+ private boolean isStopped = false;
+ private boolean isMsgAdded = false;
private class FlowPusherProcess implements Runnable {
+
@Override
public void run() {
+ log.debug("Begin Flow Pusher Process");
while (true) {
- for (Map.Entry<IOFSwitch,SwitchQueue> entry : queues.entrySet()) {
+ Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
+ synchronized (queues) {
+ entries = queues.entrySet();
+ }
+
+ // Set taint flag to false at this moment.
+ isMsgAdded = false;
+
+ for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
IOFSwitch sw = entry.getKey();
SwitchQueue queue = entry.getValue();
@@ -105,18 +123,21 @@
}
synchronized (queue) {
- // TODO send multiple messages at once
+ log.debug("Queue size : {}", queue.size());
- while (! queue.isEmpty()) {
- OFMessage msg = queue.poll();
- // check sending rate and determine it to be sent or not
- long current_time = System.nanoTime();
-
- // if need to send, call IOFSwitch#write()
- if (queue.isSendable(current_time)) {
+ // check sending rate and determine it to be sent or not
+ long current_time = System.nanoTime();
+
+ if (queue.isSendable(current_time)) {
+ // TODO send multiple messages at once
+ while (! queue.isEmpty()) {
+ OFMessage msg = queue.poll();
+
+ // if need to send, call IOFSwitch#write()
try {
messageDamper.write(sw, msg, context);
queue.updateRate(current_time, msg);
+ log.debug("Pusher sends message : {}", msg);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -127,28 +148,49 @@
}
// sleep while all queues are empty
- boolean sleep = true;
- do {
+ while (! isMsgAdded) {
+ if (isStopped) {
+ log.debug("Pusher Process finished.");
+ return;
+ }
- // TODO check if queues are empty
- } while (sleep);
+ try {
+ Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ log.debug("Exit sleep loop.");
+
+ if (isStopped) {
+ log.debug("Pusher Process finished.");
+ return;
+ }
}
}
}
- public void init(FloodlightContext context, BasicFactory factory) {
+ public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
this.context = context;
this.factory = factory;
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
+
+ if (damper != null) {
+ messageDamper = damper;
+ } else {
+ // use default value
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);
+ }
}
/**
* Begin processing queue.
*/
public void start() {
- if (context == null || factory == null) {
- // not yet initialized
+ if (factory == null) {
+ log.error("FlowPusher not yet initialized.");
return;
}
@@ -196,7 +238,18 @@
*/
public void stop() {
if (thread != null && thread.isAlive()) {
- // TODO tell thread to halt
+ isStopped = true;
+ }
+ }
+
+ public void setRate(IOFSwitch sw, long rate) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return;
+ }
+
+ if (rate > 0) {
+ queue.max_rate = rate;
}
}
@@ -208,13 +261,19 @@
public boolean send(IOFSwitch sw, OFMessage msg) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
- queues.put(sw, new SwitchQueue());
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ synchronized (queues) {
+ queues.put(sw, queue);
+ }
}
synchronized (queue) {
queue.add(msg);
}
+ isMsgAdded = true;
+
return true;
}