Fixed concurrency problem in FlowPusher.
Divided queue processing code for readability.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 6f85c5d..349f69d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -5,6 +5,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -146,6 +147,7 @@
private Map<IOFSwitch,SwitchQueue> queues
= new HashMap<IOFSwitch,SwitchQueue>();
+ // reusable latch used for waiting for arrival of message
private Semaphore mutex = new Semaphore(0);
@Override
@@ -160,14 +162,16 @@
return;
}
- Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
+ // for safety of concurrent access, copy all key objects
+ Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
synchronized (queues) {
- entries = queues.entrySet();
+ for (IOFSwitch sw : queues.keySet()) {
+ keys.add(sw);
+ }
}
- for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
- IOFSwitch sw = entry.getKey();
- SwitchQueue queue = entry.getValue();
+ for (IOFSwitch sw : keys) {
+ SwitchQueue queue = queues.get(sw);
// Skip if queue is suspended
if (sw == null || queue == null ||
@@ -175,48 +179,62 @@
continue;
}
- // check sending rate and determine it to be sent or not
- long current_time = System.currentTimeMillis();
- long size = 0;
-
synchronized (queue) {
- if (queue.isSendable(current_time)) {
- int i = 0;
- while (! queue.isEmpty()) {
- // Number of messages excess the limit
- if (i >= MAX_MESSAGE_SEND) {
- // Messages remains in queue
- mutex.release();
- break;
- }
- ++i;
-
- OFMessage msg = queue.poll();
- try {
- messageDamper.write(sw, msg, context);
- log.debug("Pusher sends message : {}", msg);
- size += msg.getLength();
- } catch (IOException e) {
- e.printStackTrace();
- log.error("Exception in sending message ({}) : {}", msg, e);
+ processQueue(sw, queue, MAX_MESSAGE_SEND);
+ if (queue.isEmpty()) {
+ // remove queue if flagged to be.
+ if (queue.toBeDeleted) {
+ synchronized (queues) {
+ queues.remove(sw);
}
}
- sw.flush();
- queue.logSentData(current_time, size);
-
- if (queue.isEmpty()) {
- // remove queue if flagged to be.
- if (queue.toBeDeleted) {
- synchronized (queues) {
- queues.remove(sw);
- }
- }
+ } else {
+ // if some messages remains in queue, latch down
+ if (mutex.availablePermits() == 0) {
+ mutex.release();
}
}
}
}
}
}
+
+ /**
+ * Read messages from queue and send them to the switch.
+ * If number of messages excess the limit, stop sending messages.
+ * @param sw Switch to which messages will be sent.
+ * @param queue Queue of messages.
+ * @param max_msg Limitation of number of messages to be sent. If set to 0,
+ * all messages in queue will be sent.
+ */
+ private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+ // check sending rate and determine it to be sent or not
+ long current_time = System.currentTimeMillis();
+ long size = 0;
+
+ if (queue.isSendable(current_time)) {
+ int i = 0;
+ while (! queue.isEmpty()) {
+ // Number of messages excess the limit
+ if (0 < max_msg && max_msg <= i) {
+ break;
+ }
+ ++i;
+
+ OFMessage msg = queue.poll();
+ try {
+ messageDamper.write(sw, msg, context);
+ log.debug("Pusher sends message : {}", msg);
+ size += msg.getLength();
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error("Exception in sending message ({}) : {}", msg, e);
+ }
+ }
+ sw.flush();
+ queue.logSentData(current_time, size);
+ }
+ }
}
/**
@@ -362,7 +380,7 @@
FlowPusherThread proc = getProcess(sw);
queue = new SwitchQueue();
queue.state = QueueState.READY;
- synchronized (proc) {
+ synchronized (proc.queues) {
proc.queues.put(sw, queue);
}