Fixed misappropriated usage of Semaphore class by being replaced with Condition class.
Change-Id: I6b054fabf634dcb8a28f6ecfac2aba8370050638
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index d3476c7..3a0407f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -14,7 +14,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -271,33 +273,39 @@
*
*/
private class FlowPusherThread extends Thread {
- private Map<IOFSwitch,SwitchQueue> queues
+ private Map<IOFSwitch,SwitchQueue> assignedQueues
= new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
- // reusable latch used for waiting for arrival of message
- private Semaphore mutex = new Semaphore(0);
+ final Lock queuingLock = new ReentrantLock();
+ final Condition messagePushed = queuingLock.newCondition();
@Override
public void run() {
this.setName("FlowPusherThread " + this.getId() );
while (true) {
- try {
- // wait for message pushed to queue
- mutex.acquire();
- } catch (InterruptedException e) {
- // Interrupted to be shut down (not an error)
- log.debug("FlowPusherThread is interrupted");
- return;
+ while (! queuesHasMessageToSend()) {
+ queuingLock.lock();
+
+ try {
+ // wait for message pushed to queue
+ messagePushed.await();
+ } catch (InterruptedException e) {
+ // Interrupted to be shut down (not an error)
+ log.debug("FlowPusherThread is interrupted");
+ return;
+ } finally {
+ queuingLock.unlock();
+ }
}
// for safety of concurrent access, copy set of key objects
- Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
- for (IOFSwitch sw : queues.keySet()) {
+ Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
+ for (IOFSwitch sw : assignedQueues.keySet()) {
keys.add(sw);
}
for (IOFSwitch sw : keys) {
- SwitchQueue queue = queues.get(sw);
+ SwitchQueue queue = assignedQueues.get(sw);
if (sw == null || queue == null) {
continue;
@@ -305,16 +313,9 @@
synchronized (queue) {
processQueue(sw, queue, MAX_MESSAGE_SEND);
- if (! queue.hasMessageToSend()) {
+ if (queue.toBeDeleted && ! queue.hasMessageToSend()) {
// remove queue if flagged to be.
- if (queue.toBeDeleted) {
- queues.remove(sw);
- }
- } else {
- // Free the latch if message remains in queue
- if (mutex.availablePermits() == 0) {
- mutex.release();
- }
+ assignedQueues.remove(sw);
}
}
}
@@ -365,6 +366,25 @@
queue.logSentData(current_time, size);
}
}
+
+ private boolean queuesHasMessageToSend() {
+ for (SwitchQueue queue : assignedQueues.values()) {
+ if (queue.hasMessageToSend()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void notifyMessagePushed() {
+ queuingLock.lock();
+ try {
+ messagePushed.signal();
+ } finally {
+ queuingLock.unlock();
+ }
+ }
}
/**
@@ -403,7 +423,8 @@
this.context = context;
this.factory = factory;
this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
- IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
+ IFloodlightProviderService flservice
+ = modContext.getServiceImpl(IFloodlightProviderService.class);
flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
if (damper != null) {
@@ -467,9 +488,8 @@
// Free the latch if queue has any messages
FlowPusherThread thread = getProcessingThread(sw);
- if (queue.hasMessageToSend() &&
- thread.mutex.availablePermits() == 0) {
- thread.mutex.release();
+ if (queue.hasMessageToSend()) {
+ thread.notifyMessagePushed();
}
return true;
}
@@ -532,7 +552,7 @@
FlowPusherThread proc = getProcessingThread(sw);
queue = new SwitchQueue();
queue.state = QueueState.READY;
- proc.queues.put(sw, queue);
+ proc.assignedQueues.put(sw, queue);
return queue;
}
@@ -547,7 +567,7 @@
FlowPusherThread proc = getProcessingThread(sw);
if (forceStop) {
- SwitchQueue queue = proc.queues.remove(sw);
+ SwitchQueue queue = proc.assignedQueues.remove(sw);
if (queue == null) {
return false;
}
@@ -889,7 +909,8 @@
* @return
*/
protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
- FlowPusherThread proc = getProcessingThread(sw);
+ FlowPusherThread thread = getProcessingThread(sw);
+
SwitchQueue queue = getQueue(sw);
// create queue at first addition of message
@@ -898,6 +919,7 @@
}
SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+
synchronized (queue) {
queue.add(entry,priority);
if (log.isTraceEnabled()) {
@@ -905,9 +927,7 @@
}
}
- if (proc.mutex.availablePermits() == 0) {
- proc.mutex.release();
- }
+ thread.notifyMessagePushed();
return true;
}
@@ -972,7 +992,7 @@
return null;
}
- return th.queues.get(sw);
+ return th.assignedQueues.get(sw);
}
/**