Fixed concurrency problem in FlowPusher.
Divided queue processing code for readability.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 6f85c5d..349f69d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -5,6 +5,7 @@
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -146,6 +147,7 @@
 		private Map<IOFSwitch,SwitchQueue> queues
 			= new HashMap<IOFSwitch,SwitchQueue>();
 		
+		// reusable latch used for waiting for arrival of message
 		private Semaphore mutex = new Semaphore(0);
 		
 		@Override
@@ -160,14 +162,16 @@
 					return;
 				}
 				
-				Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
+				// for safety of concurrent access, copy all key objects
+				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
 				synchronized (queues) {
-					entries = queues.entrySet();
+					for (IOFSwitch sw : queues.keySet()) {
+						keys.add(sw);
+					}
 				}
 				
-				for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
-					IOFSwitch sw = entry.getKey();
-					SwitchQueue queue = entry.getValue();
+				for (IOFSwitch sw : keys) {
+					SwitchQueue queue = queues.get(sw);
 
 					// Skip if queue is suspended
 					if (sw == null || queue == null ||
@@ -175,48 +179,62 @@
 						continue;
 					}
 					
-					// check sending rate and determine it to be sent or not
-					long current_time = System.currentTimeMillis();
-					long size = 0;
-					
 					synchronized (queue) {
-						if (queue.isSendable(current_time)) {
-							int i = 0;
-							while (! queue.isEmpty()) {
-								// Number of messages excess the limit
-								if (i >= MAX_MESSAGE_SEND) {
-									// Messages remains in queue
-									mutex.release();
-									break;
-								}
-								++i;
-								
-								OFMessage msg = queue.poll();
-								try {
-									messageDamper.write(sw, msg, context);
-									log.debug("Pusher sends message : {}", msg);
-									size += msg.getLength();
-								} catch (IOException e) {
-									e.printStackTrace();
-									log.error("Exception in sending message ({}) : {}", msg, e);
+						processQueue(sw, queue, MAX_MESSAGE_SEND);
+						if (queue.isEmpty()) {
+							// remove queue if flagged to be.
+							if (queue.toBeDeleted) {
+								synchronized (queues) {
+									queues.remove(sw);
 								}
 							}
-							sw.flush();
-							queue.logSentData(current_time, size);
-							
-							if (queue.isEmpty()) {
-								// remove queue if flagged to be.
-								if (queue.toBeDeleted) {
-									synchronized (queues) {
-										queues.remove(sw);
-									}
-								}
+						} else {
+							// if some messages remains in queue, latch down
+							if (mutex.availablePermits() == 0) {
+								mutex.release();
 							}
 						}
 					}
 				}
 			}
 		}
+		
+		/**
+		 * Read messages from queue and send them to the switch.
+		 * If number of messages excess the limit, stop sending messages.
+		 * @param sw Switch to which messages will be sent.
+		 * @param queue Queue of messages.
+		 * @param max_msg Limitation of number of messages to be sent. If set to 0,
+		 *                all messages in queue will be sent.
+		 */
+		private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+			// check sending rate and determine it to be sent or not
+			long current_time = System.currentTimeMillis();
+			long size = 0;
+			
+			if (queue.isSendable(current_time)) {
+				int i = 0;
+				while (! queue.isEmpty()) {
+					// Number of messages excess the limit
+					if (0 < max_msg && max_msg <= i) {
+						break;
+					}
+					++i;
+					
+					OFMessage msg = queue.poll();
+					try {
+						messageDamper.write(sw, msg, context);
+						log.debug("Pusher sends message : {}", msg);
+						size += msg.getLength();
+					} catch (IOException e) {
+						e.printStackTrace();
+						log.error("Exception in sending message ({}) : {}", msg, e);
+					}
+				}
+				sw.flush();
+				queue.logSentData(current_time, size);
+			}
+		}
 	}
 	
 	/**
@@ -362,7 +380,7 @@
 		FlowPusherThread proc = getProcess(sw);
 		queue = new SwitchQueue();
 		queue.state = QueueState.READY;
-		synchronized (proc) {
+		synchronized (proc.queues) {
 			proc.queues.put(sw, queue);
 		}