Made FlowPusher multi-threaded.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index eef60a1..0f56c54 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -95,19 +95,20 @@
}
- private Map<IOFSwitch,SwitchQueue> queues
- = new HashMap<IOFSwitch,SwitchQueue>();
-
private OFMessageDamper messageDamper;
private FloodlightContext context = null;
private BasicFactory factory = null;
- private Thread thread = null;
+ private Map<Long, FlowPusherProcess> threadMap = null;
- private boolean isStopped = false;
- private boolean isMsgAdded = false;
+ private int number_thread = 1;
private class FlowPusherProcess implements Runnable {
+ private Map<IOFSwitch,SwitchQueue> queues
+ = new HashMap<IOFSwitch,SwitchQueue>();
+
+ private boolean isStopped = false;
+ private boolean isMsgAdded = false;
@Override
public void run() {
@@ -145,17 +146,17 @@
// if need to send, call IOFSwitch#write()
try {
messageDamper.write(sw, msg, context);
- size += msg.getLength();
log.debug("Pusher sends message : {}", msg);
+ size += msg.getLength();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
+ sw.flush();
queue.logSentData(current_time, size);
}
}
- sw.flush();
}
// sleep while all queues are empty
@@ -180,6 +181,14 @@
}
}
+ public FlowPusher() {
+
+ }
+
+ public FlowPusher(int number_thread) {
+ this.number_thread = number_thread;
+ }
+
public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
this.context = context;
this.factory = factory;
@@ -193,6 +202,7 @@
OFMESSAGE_DAMPER_TIMEOUT);
}
}
+
/**
* Begin processing queue.
*/
@@ -202,8 +212,14 @@
return;
}
- thread = new Thread(new FlowPusherProcess());
- thread.start();
+ threadMap = new HashMap<Long,FlowPusherProcess>();
+ for (long i = 0; i < number_thread; ++i) {
+ FlowPusherProcess runnable = new FlowPusherProcess();
+ threadMap.put(i, runnable);
+
+ Thread thread = new Thread(runnable);
+ thread.start();
+ }
}
/**
@@ -260,8 +276,14 @@
* End processing queue and exit thread.
*/
public void stop() {
- if (thread != null && thread.isAlive()) {
- isStopped = true;
+ if (threadMap == null) {
+ return;
+ }
+
+ for (FlowPusherProcess runnable : threadMap.values()) {
+ if (! runnable.isStopped) {
+ runnable.isStopped = true;
+ }
}
}
@@ -282,22 +304,23 @@
* @param msg
*/
public boolean add(IOFSwitch sw, OFMessage msg) {
- SwitchQueue queue = getQueue(sw);
+ FlowPusherProcess proc = getProcess(sw);
+ SwitchQueue queue = proc.queues.get(sw);
+
if (queue == null) {
queue = new SwitchQueue();
queue.state = QueueState.READY;
- synchronized (queues) {
- queues.put(sw, queue);
+ synchronized (proc) {
+ proc.queues.put(sw, queue);
}
}
- log.debug("Message is pushed : {}", msg);
-
synchronized (queue) {
queue.add(msg);
+ log.debug("Message is pushed : {}", msg);
}
- isMsgAdded = true;
+ proc.isMsgAdded = true;
return true;
}
@@ -929,6 +952,17 @@
return null;
}
- return queues.get(sw);
+ return getProcess(sw).queues.get(sw);
+ }
+
+ private long getHash(IOFSwitch sw) {
+ // TODO should consider equalization algorithm
+ return sw.getId() % number_thread;
+ }
+
+ private FlowPusherProcess getProcess(IOFSwitch sw) {
+ long hash = getHash(sw);
+
+ return threadMap.get(hash);
}
}