Added barrier function to FlowPusher.
Fixed divided by zero in FlowPusher.
Replaced Runnable in FlowPusher by Thread.
Replaced Thread.sleep() in FlowPusher by Semaphore.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 6379f26..f43a83e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -9,6 +9,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
 
 import org.openflow.protocol.*;
 import org.openflow.protocol.action.*;
@@ -47,6 +48,8 @@
 public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
 
+    private static boolean barrierIfEmpty = false;
+    
     // NOTE: Below are moved from FlowManager.
     // TODO: Values copied from elsewhere (class LearningSwitch).
     // The local copy should go away!
@@ -96,11 +99,20 @@
 				return true;
 			}
 			
+			if (current == last_sent_time) {
+				return false;
+			}
+			
 			// Check if sufficient time (from aspect of rate) elapsed or not.
 			long rate = last_sent_size / (current - last_sent_time);
 			return (rate < max_rate);
 		}
 		
+		/**
+		 * Log time and size of last sent data.
+		 * @param current Time to be sent.
+		 * @param size Size of sent data (in bytes).
+		 */
 		void logSentData(long current, long size) {
 			last_sent_time = current;
 			last_sent_size = size;
@@ -113,7 +125,7 @@
 
 	private FloodlightContext context = null;
 	private BasicFactory factory = null;
-	private Map<Long, FlowPusherProcess> threadMap = null;
+	private Map<Long, FlowPusherThread> threadMap = null;
 	private Map<Long, Map<Integer, OFBarrierReplyFuture>>
 		barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
 	
@@ -124,26 +136,31 @@
 	 * @author Naoki Shiota
 	 *
 	 */
-	private class FlowPusherProcess implements Runnable {
+	private class FlowPusherThread extends Thread {
 		private Map<IOFSwitch,SwitchQueue> queues
 		= new HashMap<IOFSwitch,SwitchQueue>();
 		
-		private boolean isStopped = false;
-		private boolean isMsgAdded = false;
+		private Semaphore mutex = new Semaphore(0);
 		
 		@Override
 		public void run() {
 			log.debug("Begin Flow Pusher Process");
 			
 			while (true) {
+				try {
+					// wait for message pushed to queue
+					mutex.acquire();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+					log.debug("FlowPusherThread is interrupted");
+					return;
+				}
+				
 				Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
 				synchronized (queues) {
 					entries = queues.entrySet();
 				}
 				
-				// Set taint flag to false at this moment.
-				isMsgAdded = false;
-				
 				for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
 					IOFSwitch sw = entry.getKey();
 					SwitchQueue queue = entry.getValue();
@@ -163,15 +180,14 @@
 							int i = 0;
 							while (! queue.isEmpty()) {
 								// Number of messages excess the limit
-								if (++i >= MAX_MESSAGE_SEND) {
+								if (i >= MAX_MESSAGE_SEND) {
 									// Messages remains in queue
-									isMsgAdded = true;
+									mutex.release();
 									break;
 								}
+								++i;
 								
 								OFMessage msg = queue.poll();
-								
-								// if need to send, call IOFSwitch#write()
 								try {
 									messageDamper.write(sw, msg, context);
 									log.debug("Pusher sends message : {}", msg);
@@ -183,27 +199,13 @@
 							}
 							sw.flush();
 							queue.logSentData(current_time, size);
+							
+							if (queue.isEmpty() && barrierIfEmpty) {
+								barrier(sw);
+							}
 						}
 					}
 				}
-				
-				// sleep while all queues are empty
-				while (! (isMsgAdded || isStopped)) {
-					try {
-						Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
-					} catch (InterruptedException e) {
-						e.printStackTrace();
-						log.error("Thread.sleep failed");
-					}
-				}
-				
-				log.debug("Exit sleep loop.");
-				
-				if (isStopped) {
-					log.debug("Pusher Process finished.");
-					return;
-				}
-
 			}
 		}
 	}
@@ -261,12 +263,11 @@
 			return;
 		}
 		
-		threadMap = new HashMap<Long,FlowPusherProcess>();
+		threadMap = new HashMap<Long,FlowPusherThread>();
 		for (long i = 0; i < number_thread; ++i) {
-			FlowPusherProcess runnable = new FlowPusherProcess();
-			threadMap.put(i, runnable);
+			FlowPusherThread thread = new FlowPusherThread();
 			
-			Thread thread = new Thread(runnable);
+			threadMap.put(i, thread);
 			thread.start();
 		}
 	}
@@ -335,10 +336,8 @@
 			return;
 		}
 		
-		for (FlowPusherProcess runnable : threadMap.values()) {
-			if (! runnable.isStopped) {
-				runnable.isStopped = true;
-			}
+		for (FlowPusherThread t : threadMap.values()) {
+			t.interrupt();
 		}
 	}
 	
@@ -366,7 +365,7 @@
 	 */
 	@Override
 	public boolean add(IOFSwitch sw, OFMessage msg) {
-		FlowPusherProcess proc = getProcess(sw);
+		FlowPusherThread proc = getProcess(sw);
 		SwitchQueue queue = proc.queues.get(sw);
 
 		if (queue == null) {
@@ -382,8 +381,10 @@
 			log.debug("Message is pushed : {}", msg);
 		}
 		
-		proc.isMsgAdded = true;
-		
+		if (proc.mutex.availablePermits() == 0) {
+			proc.mutex.release();
+		}
+
 		return true;
 	}
 	
@@ -1041,14 +1042,18 @@
 	@Override
 	public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
 		// TODO creation of message and future should be moved to OFSwitchImpl
+
+		if (sw == null) {
+			return null;
+		}
 		
-		OFMessage msg = factory.getMessage(OFType.BARRIER_REQUEST);
+		OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
 		msg.setXid(sw.getNextTransactionId());
 		add(sw, msg);
 
 		// TODO create Future object of message
 		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
-		
+
 		synchronized (barrierFutures) {
 			Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
 			if (map == null) {
@@ -1056,7 +1061,7 @@
 				barrierFutures.put(sw.getId(), map);
 			}
 			map.put(msg.getXid(), future);
-			log.debug("Created future for {}", msg.getXid());
+			log.debug("Inserted future for {}", msg.getXid());
 		}
 		
 		return future;
@@ -1076,7 +1081,7 @@
 		return sw.getId() % number_thread;
 	}
 	
-	protected FlowPusherProcess getProcess(IOFSwitch sw) {
+	protected FlowPusherThread getProcess(IOFSwitch sw) {
 		long hash = getHash(sw);
 		
 		return threadMap.get(hash);
@@ -1099,11 +1104,6 @@
 
 	@Override
 	public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
-		// This check can be skipped (since Controller must filter).
-//		if (msg.getType() != OFType.BARRIER_REPLY) {
-//			return Command.CONTINUE;
-//		}
-		
 		Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
 		if (map == null) {
 			return Command.CONTINUE;