Fixed misappropriated usage of Semaphore class by being replaced with Condition class.

Change-Id: I6b054fabf634dcb8a28f6ecfac2aba8370050638
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index d3476c7..3a0407f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -14,7 +14,9 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.openflow.protocol.*;
 import org.openflow.protocol.action.*;
@@ -271,33 +273,39 @@
 	 *
 	 */
 	private class FlowPusherThread extends Thread {
-		private Map<IOFSwitch,SwitchQueue> queues
+		private Map<IOFSwitch,SwitchQueue> assignedQueues
 			= new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
 		
-		// reusable latch used for waiting for arrival of message
-		private Semaphore mutex = new Semaphore(0);
+		final Lock queuingLock = new ReentrantLock();
+		final Condition messagePushed = queuingLock.newCondition();
 		
 		@Override
 		public void run() {
 			this.setName("FlowPusherThread " + this.getId() );
 			while (true) {
-				try {
-					// wait for message pushed to queue
-					mutex.acquire();
-				} catch (InterruptedException e) {
-					// Interrupted to be shut down (not an error)
-					log.debug("FlowPusherThread is interrupted");
-					return;
+				while (! queuesHasMessageToSend()) {
+					queuingLock.lock();
+					
+					try {
+						// wait for message pushed to queue
+						messagePushed.await();
+					} catch (InterruptedException e) {
+						// Interrupted to be shut down (not an error)
+						log.debug("FlowPusherThread is interrupted");
+						return;
+					} finally {
+						queuingLock.unlock();
+					}
 				}
 				
 				// for safety of concurrent access, copy set of key objects
-				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
-				for (IOFSwitch sw : queues.keySet()) {
+				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
+				for (IOFSwitch sw : assignedQueues.keySet()) {
 					keys.add(sw);
 				}
 				
 				for (IOFSwitch sw : keys) {
-					SwitchQueue queue = queues.get(sw);
+					SwitchQueue queue = assignedQueues.get(sw);
 
 					if (sw == null || queue == null) {
 						continue;
@@ -305,16 +313,9 @@
 					
 					synchronized (queue) {
 						processQueue(sw, queue, MAX_MESSAGE_SEND);
-						if (! queue.hasMessageToSend()) {
+						if (queue.toBeDeleted && ! queue.hasMessageToSend()) {
 							// remove queue if flagged to be.
-							if (queue.toBeDeleted) {
-								queues.remove(sw);
-							}
-						} else {
-							// Free the latch if message remains in queue
-							if (mutex.availablePermits() == 0) {
-								mutex.release();
-							}
+							assignedQueues.remove(sw);
 						}
 					}
 				}
@@ -365,6 +366,25 @@
 				queue.logSentData(current_time, size);
 			}
 		}
+		
+		private boolean queuesHasMessageToSend() {
+			for (SwitchQueue queue : assignedQueues.values()) {
+				if (queue.hasMessageToSend()) {
+					return true;
+				}
+			}
+
+			return false;
+		}
+		
+		private void notifyMessagePushed() {
+			queuingLock.lock();
+			try {
+				messagePushed.signal();
+			} finally {
+				queuingLock.unlock();
+			}
+		}
 	}
 	
 	/**
@@ -403,7 +423,8 @@
 		this.context = context;
 		this.factory = factory;
 		this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
-		IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
+		IFloodlightProviderService flservice
+			= modContext.getServiceImpl(IFloodlightProviderService.class);
 		flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
 		
 		if (damper != null) {
@@ -467,9 +488,8 @@
 				
 				// Free the latch if queue has any messages
 				FlowPusherThread thread = getProcessingThread(sw);
-				if (queue.hasMessageToSend() &&
-						thread.mutex.availablePermits() == 0) {
-					thread.mutex.release();
+				if (queue.hasMessageToSend()) {
+					thread.notifyMessagePushed();
 				}
 				return true;
 			}
@@ -532,7 +552,7 @@
 		FlowPusherThread proc = getProcessingThread(sw);
 		queue = new SwitchQueue();
 		queue.state = QueueState.READY;
-		proc.queues.put(sw, queue);
+		proc.assignedQueues.put(sw, queue);
 		
 		return queue;
 	}
@@ -547,7 +567,7 @@
 		FlowPusherThread proc = getProcessingThread(sw);
 		
 		if (forceStop) {
-			SwitchQueue queue = proc.queues.remove(sw);
+			SwitchQueue queue = proc.assignedQueues.remove(sw);
 			if (queue == null) {
 				return false;
 			}
@@ -889,7 +909,8 @@
 	 * @return
 	 */
 	protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
-		FlowPusherThread proc = getProcessingThread(sw);
+		FlowPusherThread thread = getProcessingThread(sw);
+		
 		SwitchQueue queue = getQueue(sw);
 
 		// create queue at first addition of message
@@ -898,6 +919,7 @@
 		}
 		
 		SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+
 		synchronized (queue) {
 			queue.add(entry,priority);
 			if (log.isTraceEnabled()) {
@@ -905,9 +927,7 @@
 			}
 		}
 		
-		if (proc.mutex.availablePermits() == 0) {
-			proc.mutex.release();
-		}
+		thread.notifyMessagePushed();
 
 		return true;
 	}
@@ -972,7 +992,7 @@
 			return null;
 		}
 		
-		return th.queues.get(sw);
+		return th.assignedQueues.get(sw);
 	}
 	
 	/**