Modified FlowManager to use FlowPusher instead of OFMessageDamper and FlowSwitchOperation.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 6c7a2bb..2511a3f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -7,6 +7,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.openflow.protocol.*;
 import org.openflow.protocol.action.*;
@@ -39,10 +40,15 @@
     //
     protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
     protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250;	// ms
+    
+    protected static final long SLEEP_MILLI_SEC = 3;
+    protected static final int SLEEP_NANO_SEC = 0;
 
     public static final short PRIORITY_DEFAULT = 100;
     public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0;	// infinity
     public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0;	// infinite
+    
+    
 
 	public enum QueueState {
 		READY,
@@ -53,7 +59,7 @@
 		QueueState state;
 		
 		// Max rate of sending message (bytes/sec). 0 implies no limitation.
-		long max_rate = 0;
+		long max_rate = Long.MAX_VALUE;
 		long last_sent_time = 0;
 		long last_sent_size = 0;
 		
@@ -88,13 +94,25 @@
 	private BasicFactory factory = null;
 	private Thread thread = null;
 	
+	private boolean isStopped = false;
+	private boolean isMsgAdded = false;
 	
 	private class FlowPusherProcess implements Runnable {
+		
 		@Override
 		public void run() {
+			log.debug("Begin Flow Pusher Process");
 			
 			while (true) {
-				for (Map.Entry<IOFSwitch,SwitchQueue> entry : queues.entrySet()) {
+				Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
+				synchronized (queues) {
+					entries = queues.entrySet();
+				}
+				
+				// Set taint flag to false at this moment.
+				isMsgAdded = false;
+				
+				for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
 					IOFSwitch sw = entry.getKey();
 					SwitchQueue queue = entry.getValue();
 
@@ -105,18 +123,21 @@
 					}
 					
 					synchronized (queue) {
-						// TODO send multiple messages at once
+						log.debug("Queue size : {}", queue.size());
 						
-						while (! queue.isEmpty()) {
-							OFMessage msg = queue.poll();
-							// check sending rate and determine it to be sent or not
-							long current_time = System.nanoTime();
-							
-							// if need to send, call IOFSwitch#write()
-							if (queue.isSendable(current_time)) {
+						// check sending rate and determine it to be sent or not
+						long current_time = System.nanoTime();
+						
+						if (queue.isSendable(current_time)) {
+							// TODO send multiple messages at once
+							while (! queue.isEmpty()) {
+								OFMessage msg = queue.poll();
+								
+								// if need to send, call IOFSwitch#write()
 								try {
 									messageDamper.write(sw, msg, context);
 									queue.updateRate(current_time, msg);
+									log.debug("Pusher sends message : {}", msg);
 								} catch (IOException e) {
 									// TODO Auto-generated catch block
 									e.printStackTrace();
@@ -127,28 +148,49 @@
 				}
 				
 				// sleep while all queues are empty
-				boolean sleep = true;
-				do {
+				while (! isMsgAdded) {
+					if (isStopped) {
+						log.debug("Pusher Process finished.");
+						return;
+					}
 					
-					// TODO check if queues are empty
-				} while (sleep);
+					try {
+						Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
+					} catch (InterruptedException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				}
+				
+				log.debug("Exit sleep loop.");
+				
+				if (isStopped) {
+					log.debug("Pusher Process finished.");
+					return;
+				}
 			}
 		}
 	}
 	
-	public void init(FloodlightContext context, BasicFactory factory) {
+	public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
 		this.context = context;
 		this.factory = factory;
-		messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
-			    EnumSet.of(OFType.FLOW_MOD),
-			    OFMESSAGE_DAMPER_TIMEOUT);
+		
+		if (damper != null) {
+			messageDamper = damper;
+		} else {
+			// use default value
+			messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+				    EnumSet.of(OFType.FLOW_MOD),
+				    OFMESSAGE_DAMPER_TIMEOUT);
+		}
 	}
 	/**
 	 * Begin processing queue.
 	 */
 	public void start() {
-		if (context == null || factory == null) {
-			// not yet initialized
+		if (factory == null) {
+			log.error("FlowPusher not yet initialized.");
 			return;
 		}
 		
@@ -196,7 +238,18 @@
 	 */
 	public void stop() {
 		if (thread != null && thread.isAlive()) {
-			// TODO tell thread to halt
+			isStopped = true;
+		}
+	}
+	
+	public void setRate(IOFSwitch sw, long rate) {
+		SwitchQueue queue = getQueue(sw);
+		if (queue == null) {
+			return;
+		}
+		
+		if (rate > 0) {
+			queue.max_rate = rate;
 		}
 	}
 	
@@ -208,13 +261,19 @@
 	public boolean send(IOFSwitch sw, OFMessage msg) {
 		SwitchQueue queue = getQueue(sw);
 		if (queue == null) {
-			queues.put(sw, new SwitchQueue());
+			queue = new SwitchQueue();
+			queue.state = QueueState.READY;
+			synchronized (queues) {
+				queues.put(sw, queue);
+			}
 		}
 		
 		synchronized (queue) {
 			queue.add(msg);
 		}
 		
+		isMsgAdded = true;
+		
 		return true;
 	}